[Java] ThreadPool과 Executor 프레임워크(2)

MEUN·1일 전
0

우아한 종료, Graceful Shutdown

ExecutorService는 이미 진행중인 작업을 모두 완료한 다음 종료할 수 있도록 Graceful Shutdown을 아래 메소드로 지원한다.

유관 메소드

shutdown()

void shutdown();
  • 새로운 작업을 받지 않고, 이미 요청돤 작업을 모두 완료한 후 종료
  • Non-Blocking으로 동작

shutdownNow()

List<Runnable> shutdownNow();
  • 실행 중인 작업 중단 및 대기 중인 작업 반환 후 즉시 종료
  • 실행 중인 작업을 중단하기 위해 인터럽트 발생
  • Non-Blocking으로 동작

isShutdown()

boolean isShutdown();
  • 서비스 종료 여부 확인

isTerminated()

boolean isTerminated();
  • shutdown(), shutdownNow() 호출 후 모든 작업이 완료되었는지 확인

awaitTermination(long timeout, TimeUnit unit)

boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
  • 서비스 종료 시 모든 작업이 완료되기를 지정된 시간동안 대기
  • Blocking 방식으로 동작

close()

    @Override
    default void close() {
        boolean terminated = isTerminated();
        if (!terminated) {
            shutdown();
            boolean interrupted = false;
            while (!terminated) {
                try {
                    terminated = awaitTermination(1L, TimeUnit.DAYS);
                } catch (InterruptedException e) {
                    if (!interrupted) {
                        shutdownNow();
                        interrupted = true;
                    }
                }
            }
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }
  • shutdown() 호출 후 작업 완료를 하루 동안 대기, 이후에도 완료되지 않으면 shutdownNow() 호출
  • 호출한 Thread에 인터럽트가 발생한 경우에도 shutdownNow()가 호출됨
  • JDK 19부터 지원



ThreadPool 관리 전략

Executors 클래스에서는 ThreadPool을 효과적으로 관리하기 위한 여러 전략들을 제공한다.
이를 사용하거나 직접 정의할 수 있는데, 아래에서 상세히 알아본다.

기본 제공

newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
        return newSingleThreadExecutor(defaultThreadFactory());
    }
    
	public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new AutoShutdownDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
  • 단일 Thread를 사용하는 전략
  • 대기 큐의 크기는 무제한
  • 작업 실행 중 장애로 인해 단일 스레드 종료 시 새로운 Thread가 생성됨
  • 한 번에 두 개 이상의 작업이 활성화되지 않으며, 초과 Thread를 사용
  • newFixedThreadPool(1)과 달리 초과 Thread를 사용하도록 재구성할 수 없음

newSingleThreadScheduledExecutor

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
    
    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }
  • 지정 시간동안 대기 후 실행하거나 주기적으로 실행하도록 명령 예약 가능한 단일 Thread를 사용하는 Executor
  • 이외 특징은 newSingleThreadExecutor와 동일

newScheduledThreadPool

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
  • 지정 시간동안 대기 후 실행하거나 주기적으로 실행하도록 명령 예약 가능한 ThreadPool 생성

newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
	public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
  • 고정된 수의 Thread를 재사용하는 ThreadPool 생성
  • 모든 Thread가 활성 상태일 때 추가 작업 요청 시 Thread를 사용할 수 있을때까지 대기 큐에서 대기
  • 명시적으로 종료하기 전까지 Thread 유지
  • 고정된 수의 Thread를 사용하므로 시스템 자원 사용량 예측 가능
  • 대기 큐의 크기는 무제한이므로 처리 속도보다 작업이 쌓이는 속도가 더 빠른 경우 문제 발생 가능

newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }
  • 필요에 따라 새 Thread를 생성하지만 이전에 생성된 Thread를 사용할 수 있을 때 재사용하는 ThreadPool 생성
  • 일반적으로 수명이 짧은 비동기 작업을 많이 실행하는 프로그램의 성능 향상 가능
  • 사용 가능한 기존 Thread가 없는 경우 새 Thread가 생성되어 ThreadPool에 추가되며, 60초동안 사용되지 않으면 Thread 종료 후 캐시에서 제거됨
  • SynchronousQueue를 사용하여 별도 버퍼를 사용하지 않으며, 생산자가 소비자에게 직접 작업을 전달하고 반환하는 방식
  • 매우 유연한 전략이지만 시스템의 자원을 제한 없이 사용할 수 있어 주의가 필요함

newThreadPerTaskExecutor

    public static ExecutorService newThreadPerTaskExecutor(ThreadFactory threadFactory) {
        return ThreadPerTaskExecutor.create(threadFactory);
    }
  • 각 작업에 대해 새 Thread를 시작하는 Executor 생성
  • 생성하는 Thread 수에 제한 없음
  • Future에서 cancel(true) 호출 시 작업 실행 Thread 중단
  • JDK 21부터 지원

newVirtualThreadPerTaskExecutor

    public static ExecutorService newVirtualThreadPerTaskExecutor() {
        ThreadFactory factory = Thread.ofVirtual().factory();
        return newThreadPerTaskExecutor(factory);
    }
  • 각 작업에 대해 새 Virtual Thread를 시작하는 Executor 생성
  • 생성하는 Thread 수에 제한 없음
  • Virtual Thread를 생성하는 ThreadFactory를 사용하여 newThreadPerTaskExecutor()를 호출하는 방식
  • JDK 21부터 지원

newWorkStealingPool

    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
    
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
  • 지정된 병렬 처리 수준을 지원하기 위해 충분한 Thread를 유지하는 ThreadPool 생성
    • 병렬 처리 수준은 작업 처리에 참여 중이거나 참여할 수 있는 최대 Thread 수에 해당
  • 경합을 줄이기 위해 여러 개의 대기 큐 사용 가능
  • 실제 Thread 수는 동적으로 증가/감소
  • 요청된 작업의 실행 순서를 보장하지 않음



사용자 정의

ThreadPoolExecutor의 생성자를 통해 직접 수치를 조정하면서 Executor를 생성하는 방식이다.
적정 수치는 성능 테스트 등을 통해 검증할 수 있다.

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
                              ... 중략 ...
	}

단, 1편에서 말한 것처럼 maximumPoolSize의 경우 workQueue가 모두 찬 상태에서 작업이 요청된 경우 maximumPoolSize까지 Thread가 생성된다는 의미이다.
maximumPoolSize를 의도한대로 사용하기 위해서는 workQueue의 사이즈가 제한되어 있는지 확인하는 것이 좋다.



ThreadFactory

Executors에서 기본 제공되는 전략을 사용할 때 ThreadFactory를 지정하지 않는 경우 기본적으로 DefaultThreadFactory를 사용하도록 설정된다.

    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

DefaultThreadFactoryThreadFactory 인터페이스를 구현하고 있다.

public interface ThreadFactory {

    Thread newThread(Runnable r);
}

상세 구현은 아래와 같다.

    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            @SuppressWarnings("removal")
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }


ThreadPoolExecutor 예외 정책

ThreadPoolExecutor의 생성자를 통해 대기 큐가 꽉 찼을 때 새로운 작업이 요청된 경우 처리 방안을 RejectedExecutionHandler를 통해 구현할 수 있다.

ThreadPoolExecutor에는 기본 핸들러로 AbortPolicy가 등록되어 있다.

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

기본 제공

AbortPolicy

    public static class AbortPolicy implements RejectedExecutionHandler {
    
        public AbortPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
  • RejectedExecutionException 예외를 발생시켜 새로운 요청 거절

DiscardPolicy

    public static class DiscardPolicy implements RejectedExecutionHandler {
    
        public DiscardPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
  • 새로운 작업 요청 시 무시

DiscardOldestPolicy

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    
        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
  • 새로운 작업 요청 시 가장 오래된 요청을 무시
  • 다른 Thread가 작업이 종료되기를 기다리거나 작업 실패를 기록해야 하는 경우 유용하지 않음

CallerRunsPolicy

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
    
        public CallerRunsPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
  • 새로운 작업을 요청한 Thread가 직접 작업 실행



사용자 정의

RejectedExecutionHandler를 직접 구현하여 필요한 기능을 구현한다.

public interface RejectedExecutionHandler {

    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}


참고 자료

0개의 댓글