서버 애플리케이션이 고객의 주문을 처리하고 있는 도중에 갑자기 서버 재시작되면 고객의 주문이 제대로 진행되지 않으므로 이상적인 방향으로 새로운 주문 요청은 막고 이미 진행 중인 주문은 완료한 후 서버를 재시작 하는 것입니다.
이렇게 문제없이 우아하게 종료하는 방식을 graceful shutdown 이라 합니다.
void shutdown
(non-blocking -> 대기하지 않고 즉시 다음 코드를 호출)
List<Runnable> shutdownNow()
(non-blocking)
close()
shutdownNow()
를 호출합니다.shutdownNow()
를 호출서비스 상태를 확인하는 메서드로 boolean isShutdown()
, boolean isTerminated()
가 있습니다. 서비스가 종료되었는지 확인, shutdown()
, shutdownNow()
호출 후, 모든 작업이 완료되었는지 확인
서비스를 종료하는데 메인 메서드가 shutdown을 호출한다고 해서 이 스레드 풀이 완전히 종료될 때까지 기다리지 않고 다음 로직을 호출합니다. 근데 스레드풀이 완전히 종료될 때까지 기다리겠다고 하면 boolean awaitTermination(long timeout, TimeUnit unit)
(blocking) 하면 됩니다.
서비스 종료 시 모든 작업이 완료될 때까지 대기합니다. 이때 지정된 시간까지만 대기.
서비스가 너무 늦게 종료되거나, 종료되지 않는 문제가 발생할 수 있습니다. 그러면 우아하게 종료하는 시간을 정해서 정해진 시간 이후에는 무언가 문제가 있다고 가정하고 shutdownNow()
를 호출해서 작업들을 강제로 종료합니다.
close()
는 하루를 기다리는데 문제가 생겨서 돌아가는 의미없는 하루를 보내다 종료하기 때문에 디테일하게 shutdown()
과 shutdownNow()
를 호출해야 합니다.
실행 결과
16:25:13.026 [pool-1-thread-2] taskB 시작
16:25:13.026 [pool-1-thread-1] taskA 시작
16:25:13.026 [ main] [pool = 2, active = 2, queueTasks = 2, completedTaskCount = 0]
16:25:13.028 [ main] -- shutdown 시작 --
16:25:14.029 [pool-1-thread-1] taskA 완료
16:25:14.030 [pool-1-thread-1] taskC 시작
16:25:14.040 [pool-1-thread-2] taskB 완료
16:25:14.041 [pool-1-thread-2] longTask 시작
16:25:15.036 [pool-1-thread-1] taskC 완료
16:25:23.029 [ main] 서비스 정상 종료 실패 -> 강제 종료 시도
16:25:23.031 [pool-1-thread-2] 인터럽트 발생, sleep interrupted
16:25:23.032 [ main] -- shutdown 완료 --
Exception in thread "pool-1-thread-2" java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted
16:25:23.039 [ main] [pool = 0, active = 0, queueTasks = 0, completedTaskCount = 4]
at util.ThreadUtils.sleep(ThreadUtils.java:15)
at thread.executor.RunnableTask.run(RunnableTask.java:25)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.InterruptedException: sleep interrupted
at java.base/java.lang.Thread.sleep0(Native Method)
at java.base/java.lang.Thread.sleep(Thread.java:509)
at util.ThreadUtils.sleep(ThreadUtils.java:12)
... 4 more
longTask가 끝나지 않아서 10초 기다립니다. 그러면 서비스 정상 종료 실패가 발생하고 강제 종료를 시도합니다. 강제 종료 시도하면 shutdownNow()
를 호출하고 인터럽트를 겁니다.
그러면 인터럽트 예외 터지기까지 완료됩니다.
이 예제에서 작업 처리에 필요한 시간은 taskA,B,C는 1초, longTask는 100초 입니다. shutdown()
을 하면 새로운 작업을 받지 않고 큐에 이미 대기 중인 작업은 처리하고 풀의 스레드는 종료합니다.
shutdown()
은 blocking 메서드가 아니기 때문에 서비스가 종료될 대까지 메인 스레드가 대기하지 않습니다.
if (!es.awaitTermination(10, TimeUnit.SECONDS))
이 부분이 blocking 메서드로 메인 스레드는 대기하여 서비스 종료를 10초간 기다립니다. 10초 안에 완료되면 true. longTask는 오래 걸리기 때문에 false를 반환합니다.
정상 종료가 오래 걸리므로 강제 종료에 들어가고 awaitTermination()
으로 기다립니다.
마지막으로 인터럽트가 발생하면서 작업 종료하고 강제 종료가 완료됩니다.
근데
shutdownNow()
를 호출한 다음에 10초간 또 기다리는 이유
shutdownNow()
가 인터럽트를 호출하지만 호출하더라도 여러가지 이유로 작업에 시간이 걸릴 수 있기 때문에 기다립니다. 인터럽트가 터지면 catch로 잡고나서 문제가 생겼을 때 자원을 정리하거나 로그를 남기는 간단한 작업들을 더 수행할 수 있기 때문에 시간을 기다려줍니다.
강제 종료 후에 10초간 대기해도 작업이 완료되지 않으면 서비스가 종료되지 않았다고 개발자가 인지할 수 있는 로그를 남겨야 합니다. 그래야 개발자가 수정 가능합니다.
corePoolSize
, maximumPoolSize
ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
ExecutorService es = new ThreadPoolExecutor(2, 4,
3000, TimeUnit.MILLISECONDS, workQueue);
총 task7까지 작업을 추가했습니다.
task를 4개 실행한다면 기본 스레드 2개 다 만들어졌고 BlockingQueue에도 2개가 모두 차게 되면 더 이상 들어갈 자리가 없게 됩니다.
여기서 추가해서 task를 넣게 되면 그제야 스레드 풀의 스레드가 하나 더 만들어집니다.
알고자 하는 maximumPoolSize
는 기본 스레드 전부, Queue까지 전부 다 차서 도저히 방법이 없을 때 맥시멈까지 늘어납니다.
맥시멈까지 했는데도 작업이 추가가 되면 RejectedExecutionException
예외가 터집니다.
실행 결과
20:10:03.397 [ main] [pool = 0, active = 0, queueTasks = 0, completedTaskCount = 0]
20:10:03.399 [pool-1-thread-1] task1 시작
20:10:03.403 [ main] task1 -> [pool = 1, active = 1, queueTasks = 0, completedTaskCount = 0]
20:10:03.403 [pool-1-thread-2] task2 시작
20:10:03.403 [ main] task2 -> [pool = 2, active = 2, queueTasks = 0, completedTaskCount = 0]
20:10:03.404 [ main] task3 -> [pool = 2, active = 2, queueTasks = 1, completedTaskCount = 0]
20:10:03.404 [ main] task4 -> [pool = 2, active = 2, queueTasks = 2, completedTaskCount = 0]
20:10:03.404 [ main] task5 -> [pool = 3, active = 3, queueTasks = 2, completedTaskCount = 0]
20:10:03.404 [pool-1-thread-3] task5 시작
20:10:03.404 [ main] task6 -> [pool = 4, active = 4, queueTasks = 2, completedTaskCount = 0]
20:10:03.404 [pool-1-thread-4] task6 시작
20:10:03.404 [ main] task7 실행 거절 예외 발생: java.util.concurrent.RejectedExecutionException: ~~
task 1,2,3,4,5,6 작업시작/완료
이후 추가로 3초 이후에 직업이 모두 완료되면 풀 사이즈는 4개, Queue에 있는 작업도 모두 꺼내서 완료하면 총 6개의 작업이 끝나게 됩니다. 스레드 풀에는 최대 크기인 4만큼 스레드가 대기하고 있는 상태입니다.
여기서 또 3초를 대기하면 이 3초 동안은 일이 없는 상태여서(작업 모두 완료해서) 기본 2개였던 스레드가 맥시멈으로 4개까지 추가됐었는데 추가된 스레드 2개가 자동으로 제거됩니다.
그래서 4개까지 늘어났던 스레드 풀의 사이즈가 기본 수만큼 다시 정상적으로 줄어듭니다.
실행 결과
20:10:06.410 [ main] -- 작업 수행 완료 --
20:10:06.411 [ main] [pool = 4, active = 0, queueTasks = 0, completedTaskCount = 6]
20:10:09.416 [ main] -- maximumPoolSize 대기 시간 초과
20:10:09.417 [ main] [pool = 2, active = 0, queueTasks = 0, completedTaskCount = 6]
20:10:09.418 [ main] -- shutdown 완료 --
20:10:09.419 [ main] [pool = 0, active = 0, queueTasks = 0, completedTaskCount = 6]
스레드 풀의 경우 최대 4개까지 스레드 수를 증가해서 사용했는데 이렇게 기본 수를 초과해서 만들어진 스레드를 "초과 스레드" 라고 합니다. 이 초과 스레드를 긴급한 상황에 사용하는데 이 긴급한 상황이라는 것은 Queue까지 가득 차서 더 이상 할 수 있는 일이 없을 때를 말합니다. Queue가 가득 차지 않으면 절대로 초과 스레드는 늘어나지 않습니다.
TimeUnit.MILLISECONDS
는 초과 스레드가 생존할 수 있는 대기 시간을 뜻합니다. 이 시간 동안 초과 스레드가 처리할 작업이 없다면 초과 스레드는 제거됩니다.
응답 시간이 아주 중요한 서버라면, 서버가 고객의 처음 요청을 받기 전에 미리 스레드를 만들어서 작업 수행하는 방식을 하고 싶을 수가 있습니다.
ThreadPoolExecutor.prestartAllCoreThreads()
를 사용하면 기본 스레드를 미리 생성할 수 있습니다.
예를 들어 서버가 첨 시작되고 트래픽이 많은 서비스라면 서버를 여러 대 운영할텐데 그 중에 한 대 서버를 올릴 때 이 서버에 갑자기 사용자가 확 몰리는 경우가 생길 수 있습니다. 스레드가 없다면 순간적으로 빠르게 스레드를 생성해야 하기 때문에 조금이나마 시간을 아낄 수 있습니다.
ThreadPoolExecutor
로 스레드 풀을 생성해서 기본 스레드, 최대 스레드, 초과 스레드 생존 대기 시간을 설정해서 조절해서 스레드 풀 전략을 사용했습니다.
대표적인 Executor
클래스의 기본 전략이 있습니다.
newSingleThreadPool()
: 단일 스레드 풀 전략newFixedThreadPool(nThreads)
: 고정 스레드 풀 전략newCachedThreadPool()
: 캐시 스레드 풀 전략newFixedThreadPool(nThreads)
- 고정 프레드 풀 전략스레드 수가 고정 -> CPU 메모리 리소스가 어느 정도 예측 가능한 안정적인 방식(CPU 메모리 리소스 예측 가능)
스레드 수가 늘어나지는 않음 -> 추가적인 작업은 큐에 저장
😡 단점
newCachedThreadPool()
- 캐시 스레드 풀 전략기본 스레드 X, 60초 생존 주기를 가진 초과 스레드만 사용하고 초과 스레드의 수는 제한이 없습니다. 큐에 작업을 저장하지 않고 SynchronousQueue
라는 큐를 사용합니다.
SynchronousQueue
는 버퍼 사이즈가 0인 대신 생산자 요청을 소비자가 직접 바로 받아서 스레드 풀의 소비자가 직접 받아서 바로 처리하는 방식입니다(생산자의 작업을 소비자 스레드에게 직접 전달). 모든 요청이 대기하지 않고 스레드가 바로 작동하면서 처리하여 빠른 처리가 가능합니다.
그래서 버퍼없이 생산자 소비자 스레드 간의 직거래처럼 생산자와 소비자를 동기화해버리는 큐입니다.
ExecutorService es = Executors.newFixedThreadPool(2);
생산할 때 생산자는 블로킹 상태가 됩니다. 이 블로킹 상태는 소비자 스레드가 작업 요청을 할 때까지 유지됩니다. 소비자 스레드가 값을 달라고 해야 실제로 생산자로부터 값을 바로 가져갑니다.
작업 100개 넣으면 스레드 100개 만들어져서 돌아갑니다.
빠른 처리 가능하고 초과 스레드의 수도 제한이 없어 CPU, 메모리 자원만 허용한다면 시스템의 자원을 최대로 사용할 수 있습니다. 초과 스레드가 60초 동안 생존하기 때문에 작업 수에 맞춰 적절한 수의 스레드가 재사용됩니다.
그래서 요청이 갑자기 증가해도 스레드도 갑자기 증가하고 요청이 줄어들면 스레드도 점점 줄어듭니다. (매우 유연한 전략)
😡 단점
세분화된 전략을 사용해서 사용자가 점점 확대되고 요청이 폭증하는 상황을 방지해야 합니다.
1. 일반 : CPU, 메모리 자원을 예측할 수 있도록 고정 크기의 스레드로 서비스를 안정적으로 운영
2. 긴급 : 요청이 폭증하면 긴급하게 스레드를 추가로 투입해서 작업을 빠르게 처리
3. 거절 : 요청이 폭증해서 긴급 대응도 어렵다면 사용자의 요청을 거절
평소에는 안정적으로 운영하다가 사용자 요청이 폭증하면 긴급하게 스레드를 더 투입해서 긴급한 상황을 해결하는 방법입니다. 물론 긴급 상황에는 CPU, 메모리 자원을 더 사용하기 때문에 또 적정 수준을 찾아야 합니다.
그래서 처리 가능한 수준의 요청만 처리하고 나머지는 거절해야 합니다. 어떻게든 시스템이 다운되는 최악의 상황은 피해야 합니다
ThreadPoolExecutor es = new ThreadPoolExecutor(100, 200, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
100개의 기본 스레드를 사용하지만 무제한이 아닌 딱 1000개까지만 작업 큐에 대기하도록 하고 1000개가 넘어가면 긴급한 상황이니까 초과 스레드를 100개 더 투입하는 방식입니다. 만약 이래도 해결이 안 된다면 거절하는 정책입니다.
결과 시나리오
1. 일반 : 1000개 이하의 작업이 큐에 대기 -> 100개의 기본 스레드가 처리
2. 긴급 : 큐에 담긴 작업이 1000개를 초과 -> 100개의 기본 스레드 + 100개의 초과 스레드가 처리
3. 거절 : 초과 스레드 투입하지만 큐에 담긴 1000개의 작업을 추과 스레드도 넘어가서 예외 발생
작업 거절하는 예외 정책
RejectedExecutionException
발생AbortPolicy()
의 경우 // 큐에 다 담기고 나서 Reject가 되지 않게 하기 위함 -> 빠른 Reject 받기
ThreadPoolExecutor es = new ThreadPoolExecutor(1, 1,
0, TimeUnit.SECONDS, new SynchronousQueue<>(),
new ThreadPoolExecutor.AbortPolicy());
AbortPolicy()
는 RejectedExecutionHandler
의 구현체입니다. new ThreadPoolExecutor()
의 마지막 파라미터로 RejectedExecutionHandler
가 있습니다.
스레드 풀 보고 더 이상 요청을 받지 못하면 rejectedExecution
을 호출합니다.
AbortPolicy() 구현체
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
CallerRunsPolicy() 구현체
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
@FunctionalInterface
public interface Runnable {
/**
* Runs this operation.
*/
void run();
}
r.run();
은 Runnable 메서드를 직접 호출합니다. 이 run()
을 다른 스레드가 아닌 메인 메서드(내)가 직접 메서드 호출로 부릅니다.
class MyRejectedExecutionHandler implements RejectedExecutionHandler {
static AtomicInteger count = new AtomicInteger(0);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 실패에 대한 것 보다는 거절된 누적 작업이 몇 개인지 알아보기
int i = count.incrementAndGet();
log("[경고] - 거절된 누적 작업 수 : " + i);
}
}
이렇게 직접 만들어 버리면 그만!!
submit()
으로 호출해서 작업을 못하게 되면 rejectedException
을 호출하고 메인 스레드가 실행해버립니다. 그래서 거절된 작업을 버리지만, 대신 경로 로그를 남겨서 개발자가 문제를 인지할 수 있도록 할 수 있습니다.