금일은 ExecutorService 의 종료 방법과, Executor 스레드 풀관리 에 대해 알아보는 시간입니다.
서버 애플리케이션이 주문 로직을 처리한다고 가정할때 서버를 종료해야 된다면 진행중인 주문은 모두 완료한 다음에 새로운 주문 요청은 막아 서비스를 안정적으로 종료하는 것을 graceful shutdown 이라고 한다.
ExecutorService에는 종료와 관련된 다야한 메서드가 존재한다
void shutdown()
List shutdownNow()
boolena isShutdown()
boolean isTerminated()
shutdown(), shutdownNow() 호출 후, 모든 작업이 완료되었는지 확인한다.boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException
close()
shutdown() 과 같은 기능을 제공합니다. shutdownNow() 를 호출합니다. shutdown() 을 호출합니다.
Executor Service 는 새로운 요청을 거절한다.
거절시 java.util.concurrent.RejectedExecutionException 예외가 발생합니다.
shutdown()
shutdown() 을 호출합니다.shutdownNow()
List<Runnable> runnables = es.shutdownNow()close() 함수의 경우 하루를 기다려도 작업이 완료되지 않으면 shutdownNow() 를 호출하게 되도록 구현되어 있지만 빠른 종료를 위해 shutdownNow() 를 사용하여 사용자가 지정한 시간에 종료되도록 설정합니다.
해당 작업을 shutdown 하는 코드는 친절하게도 ExecutorServiced interface 의 공식 몌뉴얼을 보게되면 나와있습니다.

public class RunnableTask implements Runnable {
private final String name;
private int sleepMs = 1000;
public RunnableTask(String name) {
this.name = name;
}
public RunnableTask(String name, int sleepMs) {
this.name = name;
this.sleepMs = sleepMs;
}
@Override
public void run() {
log(name + " 시작");
sleep(sleepMs); // 작업 시간 시뮬레이션
log(name + " 완료");
}
}
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(2);
es.execute(new RunnableTask("taskA"));
es.execute(new RunnableTask("taskB"));
es.execute(new RunnableTask("taskC"));
es.execute(new RunnableTask("longTask", 100_000)); // 100초 대기
printState(es);
log("== shutdown 시작");
shutdownAndAwaitTermination(es);
log("== shutdown 완료");
printState(es);
}
private static void shutdownAndAwaitTermination(ExecutorService es) {
es.shutdown(); // non-blocking, 새로운 작업을 받지 않는다. 처리 중이거나, 큐에 이미 대기중인 작업은 처리한다. 이후에 풀의 스레드를 종료한다.
try {
// 이미 대기중인 작업들을 모두 완료할 때 까지 10초 기다린다.
if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
// 정상 종료가 너무 오래 걸리면...
log("서비스 정상 종료 실패 -> 강제 종료 시도");
es.shutdownNow();
// 작업이 취소될 때 까지 대기한다.
if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
log("서비스가 종료되지 않았습니다.");
}
}
} catch (InterruptedException e) {
// awaitTermination()으로 대기중인 현재 스레드가 인터럽트 될 수 있다.
es.shutdownNow();
}
}
if (!es.awaitTermination(10, TimeUnit.SECONDS)) 해당 조건문은 해당 스레드가 인터럽트를 못받도록 설계되어 있다면 개발자에게 알려주거나 java 를 강제로 종료하는 방법 이있습니다.
catch (InterruptedException e) 해당 코드는 현재 스레드에 인터럽트를 걸게 된다면 바로 종료되도록 구현 가능합니다.
shutdownNow() 메서드가 실행되게 된다면 RunnableTask 함수 안의 longTask 작업이 run() 메서드의 sleep 메서드에 의해서 InterruptException 이 발생하게 됩니다.
두번째 강제 종료인 es.shutdownNow() 함수 호출시에 10초를 기다리는 이유는 극단적이지만 인터럽트 후에 자원을 정리하는 작업을 기다려 주기 때문입니다.
서비스를 종료할 때 기본적으로 우아한 종료를 선택하고 우아한 종료가 되지 않으면 그다음 일정 시간 후에 강제 종료 하는 방식으로 접근하는게 좋을 것 같습니다.
해당 부분은 대량의 요청을 별도의 스레드에서 어떤식으로 처리해야 하는지 알아갈 수 있습니다.
ThreadPoolExecutor 해당 ExecutorService 의 기본 구현체의 속성에 대해 저번 시간에 알아보았습니다.
corePoolSize , maximumPoolsize 의 차이를 알아보겠습니다.
ExecutorUtils 에 해당 method 추가
public static void printState(ExecutorService executorService, String taskName) {
if (executorService instanceof ThreadPoolExecutor poolExecutor) {
int pool = poolExecutor.getPoolSize();
int active = poolExecutor.getActiveCount();
int queuedTasks = poolExecutor.getQueue().size();
long completedTask = poolExecutor.getCompletedTaskCount();
log(taskName + " -> [pool=" + pool + ", active=" + active + ", queuedTasks=" + queuedTasks + ", completedTask=" + completedTask + "]");
} else {
log(executorService);
}
}
Main Method
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
ExecutorService es = new ThreadPoolExecutor(2, 4,
3000, TimeUnit.MILLISECONDS, workQueue);
printState(es);
es.execute(new RunnableTask("task1"));
printState(es, "task1");
es.execute(new RunnableTask("task2"));
printState(es, "task2");
es.execute(new RunnableTask("task3"));
printState(es, "task3");
es.execute(new RunnableTask("task4"));
printState(es, "task4");
es.execute(new RunnableTask("task5"));
printState(es, "task5");
es.execute(new RunnableTask("task6"));
printState(es, "task6");
try {
es.execute(new RunnableTask("task7"));
} catch (RejectedExecutionException e) {
log("task7 실행 거절 예외 발생: " + e);
}
sleep(3000);
log("== 작업 수행 완료 ==");
printState(es);
sleep(3000);
log("== maximumPoolSize 대기 시간 초과 ==");
printState(es);
es.close();
log("== shutdown 완료 ==");
printState(es);
}
해당 코드를 실행하게 되면 task 2번까지 active 의 실행중인 스레드는 2개이며 task 4번까지 실행하면 queue 에 2개의 작업이 들어가게 됩니다.
작업을 6까지 실행하게 되면 maximumPool = 4 이므로 pool size 가 4까지 증가 하게 됩니다.
그다음 task 7번을 실행하게 되면 예외가 발생하게 됩니다. RejectedExecutionException
maximumPoolSize 가 지정된 시간인 3초가 지나게 되면 기보 PoolSize = 2 로 변하게 됩니다.
마지막으로 close() 함수가 호출되게 되면 원래의 기본스레드 = 2도 0으로 변하게 됩니다.
위의 내용을 순서 상으로 정리해보겠습니다.
Executor 는 스레드 풀에 스레드가 core 사이즈 만큼 있는지 확인한다.
core 사이즈 만큼 스레드가 이미 만들어져 있고, 스레드 풀에 사용할 수 있는 스레드가 없으면 이 경우 큐에 작
업을 보관한다.
큐가 가득차게 되면 초과 스레드를 만들어서 작업을 수행한다.
스레드 풀의 스레드도 max 사이즈 만큼 가득찼다면 큐에 작업도 가득ㅊ찬 상태이므로 RejectedExecutionException 예외가 발생합니다.
스레드가 차례로 작업을 완료하고 스레드 풀에 대기 상태로 돌아가게 됩니다.
스레드 풀의 스레드는 큐의 데이터를 획득하기 위해 대기한다.
초과 스레드들은 지정된 시간까지 작업ㅇ르 하지 않고 대기하며 제거 됩니다.
응답시간이 아주 중요한 서버라면 요청을 받기 전에 스레드를 스레드 풀에 미리 생성해 둘 수 있습니다.
이렇게 되면 스레드 생성 시간을 줄일 수 있습니다.
ThreadPoolExecutor.prestartAllCoreThreads() 을 사용하면 기본 스레드를 미리 생성할 수 있습니다.
ExecutorService 는 해당 메서드를 제공하지는 않습니다.
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(1000);
printState(es);
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) es;
poolExecutor.prestartAllCoreThreads();
printState(es);
}