ExecutorService
는 이미 진행중인 작업을 모두 완료한 다음 종료할 수 있도록 Graceful Shutdown을 아래 메소드로 지원한다.
shutdown()
void shutdown();
shutdownNow()
List<Runnable> shutdownNow();
isShutdown()
boolean isShutdown();
isTerminated()
boolean isTerminated();
shutdown()
, shutdownNow()
호출 후 모든 작업이 완료되었는지 확인awaitTermination(long timeout, TimeUnit unit)
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
close()
@Override
default void close() {
boolean terminated = isTerminated();
if (!terminated) {
shutdown();
boolean interrupted = false;
while (!terminated) {
try {
terminated = awaitTermination(1L, TimeUnit.DAYS);
} catch (InterruptedException e) {
if (!interrupted) {
shutdownNow();
interrupted = true;
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
shutdown()
호출 후 작업 완료를 하루 동안 대기, 이후에도 완료되지 않으면 shutdownNow()
호출shutdownNow()
가 호출됨ThreadPool
관리 전략Executors
클래스에서는 ThreadPool
을 효과적으로 관리하기 위한 여러 전략들을 제공한다.
이를 사용하거나 직접 정의할 수 있는데, 아래에서 상세히 알아본다.
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return newSingleThreadExecutor(defaultThreadFactory());
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new AutoShutdownDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
newFixedThreadPool(1)
과 달리 초과 Thread를 사용하도록 재구성할 수 없음newSingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
Executor
newSingleThreadExecutor
와 동일newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
SynchronousQueue
를 사용하여 별도 버퍼를 사용하지 않으며, 생산자가 소비자에게 직접 작업을 전달하고 반환하는 방식newThreadPerTaskExecutor
public static ExecutorService newThreadPerTaskExecutor(ThreadFactory threadFactory) {
return ThreadPerTaskExecutor.create(threadFactory);
}
Executor
생성Future
에서 cancel(true)
호출 시 작업 실행 Thread 중단newVirtualThreadPerTaskExecutor
public static ExecutorService newVirtualThreadPerTaskExecutor() {
ThreadFactory factory = Thread.ofVirtual().factory();
return newThreadPerTaskExecutor(factory);
}
Executor
생성ThreadFactory
를 사용하여 newThreadPerTaskExecutor()
를 호출하는 방식newWorkStealingPool
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
ThreadPoolExecutor
의 생성자를 통해 직접 수치를 조정하면서 Executor
를 생성하는 방식이다.
적정 수치는 성능 테스트 등을 통해 검증할 수 있다.
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
... 중략 ...
}
단, 1편에서 말한 것처럼
maximumPoolSize
의 경우workQueue
가 모두 찬 상태에서 작업이 요청된 경우maximumPoolSize
까지 Thread가 생성된다는 의미이다.
maximumPoolSize
를 의도한대로 사용하기 위해서는workQueue
의 사이즈가 제한되어 있는지 확인하는 것이 좋다.
ThreadFactory
Executors
에서 기본 제공되는 전략을 사용할 때 ThreadFactory
를 지정하지 않는 경우 기본적으로 DefaultThreadFactory
를 사용하도록 설정된다.
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
DefaultThreadFactory
는 ThreadFactory
인터페이스를 구현하고 있다.
public interface ThreadFactory {
Thread newThread(Runnable r);
}
상세 구현은 아래와 같다.
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
@SuppressWarnings("removal")
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
ThreadPoolExecutor
예외 정책ThreadPoolExecutor
의 생성자를 통해 대기 큐가 꽉 찼을 때 새로운 작업이 요청된 경우 처리 방안을 RejectedExecutionHandler
를 통해 구현할 수 있다.
ThreadPoolExecutor
에는 기본 핸들러로 AbortPolicy
가 등록되어 있다.
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
AbortPolicy
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
RejectedExecutionException
예외를 발생시켜 새로운 요청 거절DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
RejectedExecutionHandler
를 직접 구현하여 필요한 기능을 구현한다.
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}