Java - ExecutorService 종료, Executor 스레드 풀 관리

INHEES·2025년 1월 14일

금일은 ExecutorService 의 종료 방법과, Executor 스레드 풀관리 에 대해 알아보는 시간입니다.

목차

  • ExecutorService 종료
  • Executor 스레드 풀 관리
  • Executor 스레드 풀 관리 전략 소개

ExecutorService graceful shutdown

서버 애플리케이션이 주문 로직을 처리한다고 가정할때 서버를 종료해야 된다면 진행중인 주문은 모두 완료한 다음에 새로운 주문 요청은 막아 서비스를 안정적으로 종료하는 것을 graceful shutdown 이라고 한다.

ExecutorService에는 종료와 관련된 다야한 메서드가 존재한다

  • void shutdown()

    • 새로운 작업을 받지 않고, 이미 제출된 작업을 모두 완료한 후에 종료한다.
    • 논 블로킹 메서드(해당 메서드를 호출한 스레드는 대기하지 않고 즉시 다음 코드를 호출한다.)
  • List shutdownNow()

    • 실행 중인 작업을 중단하고, 대기 중인 작업ㅇ르 반환하며 즉시 종료한다.
    • main thread 가 해당 메서드를 기다리지 않는다.
    • 실행중인 작업을 중단하기 위해 인터럽트를 발생시킨다.
    • 논 블로킹 메서드
  • boolena isShutdown()

    • 서비스가 종료되었는지 확인한다.
  • boolean isTerminated()

    • shutdown(), shutdownNow() 호출 후, 모든 작업이 완료되었는지 확인한다.
  • boolean awaitTermination(long timeout, TimeUnit unit) throws
    InterruptedException

    • 서비스 종료시 모든 작업이 완료될 때까지 대기하며 시간이 주어진다.
    • 블로킹 메서드
  • close()

    • 자바 19버전부터 지원하는 서비스 종료 메서드로 shutdown() 과 같은 기능을 제공합니다.
    • 호출한 스레드에 인터럽트가발생해도 shutdownNow() 를 호출합니다.

처리중인 작업이 없는 경우

  • shutdown() 을 호출합니다.

  • Executor Service 는 새로운 요청을 거절한다.

  • 거절시 java.util.concurrent.RejectedExecutionException 예외가 발생합니다.

처리중인 작업이 있는 경우

shutdown()

  • shutdown() 을 호출합니다.
  • ExecutorService 는 새로운 요청을 거절합니다.
  • 스레드 풀의 스레드는 처리죽인 작업을 완료하며 큐에 남아있는 작업도 완료시킵니다.

shutdownNow()

  • ExecutorService 는 새로운 요청을 거절한다.
  • 큐를 비우면서, 큐에 있는 작업을 모두 꺼내서 컬렉션으로 반환한다
    	List<Runnable> runnables = es.shutdownNow()
  • 작업 중인 스레드에 인터럽트가 발생합니다
    - 큐에 대기중인 작업이 수행되지 않습니다.

구현

close() 함수의 경우 하루를 기다려도 작업이 완료되지 않으면 shutdownNow() 를 호출하게 되도록 구현되어 있지만 빠른 종료를 위해 shutdownNow() 를 사용하여 사용자가 지정한 시간에 종료되도록 설정합니다.

해당 작업을 shutdown 하는 코드는 친절하게도 ExecutorServiced interface 의 공식 몌뉴얼을 보게되면 나와있습니다.

  public class RunnableTask implements Runnable {

    private final String name;
    private int sleepMs = 1000;

    public RunnableTask(String name) {
        this.name = name;
    }

    public RunnableTask(String name, int sleepMs) {
        this.name = name;
        this.sleepMs = sleepMs;
    }

    @Override
    public void run() {
        log(name + " 시작");
        sleep(sleepMs); // 작업 시간 시뮬레이션
        log(name + " 완료");
    }
}
public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(2);
        es.execute(new RunnableTask("taskA"));
        es.execute(new RunnableTask("taskB"));
        es.execute(new RunnableTask("taskC"));
        es.execute(new RunnableTask("longTask", 100_000)); // 100초 대기
        printState(es);
        log("== shutdown 시작");
        shutdownAndAwaitTermination(es);
        log("== shutdown 완료");
        printState(es);
    }

    private static void shutdownAndAwaitTermination(ExecutorService es) {
        es.shutdown(); // non-blocking, 새로운 작업을 받지 않는다. 처리 중이거나, 큐에 이미 대기중인 작업은 처리한다. 이후에 풀의 스레드를 종료한다.
        try {
            // 이미 대기중인 작업들을 모두 완료할 때 까지 10초 기다린다.
            if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
                // 정상 종료가 너무 오래 걸리면...
                log("서비스 정상 종료 실패 -> 강제 종료 시도");
                es.shutdownNow();
                // 작업이 취소될 때 까지 대기한다.
                if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
                    log("서비스가 종료되지 않았습니다.");
                }
            }
        } catch (InterruptedException e) {
            // awaitTermination()으로 대기중인 현재 스레드가 인터럽트 될 수 있다.
            es.shutdownNow();
        }

    }  
  

if (!es.awaitTermination(10, TimeUnit.SECONDS)) 해당 조건문은 해당 스레드가 인터럽트를 못받도록 설계되어 있다면 개발자에게 알려주거나 java 를 강제로 종료하는 방법 이있습니다.

catch (InterruptedException e) 해당 코드는 현재 스레드에 인터럽트를 걸게 된다면 바로 종료되도록 구현 가능합니다.

shutdownNow() 메서드가 실행되게 된다면 RunnableTask 함수 안의 longTask 작업이 run() 메서드의 sleep 메서드에 의해서 InterruptException 이 발생하게 됩니다.

두번째 강제 종료인 es.shutdownNow() 함수 호출시에 10초를 기다리는 이유는 극단적이지만 인터럽트 후에 자원을 정리하는 작업을 기다려 주기 때문입니다.

정리

서비스를 종료할 때 기본적으로 우아한 종료를 선택하고 우아한 종료가 되지 않으면 그다음 일정 시간 후에 강제 종료 하는 방식으로 접근하는게 좋을 것 같습니다.


Executor 스레드 풀 관리

해당 부분은 대량의 요청을 별도의 스레드에서 어떤식으로 처리해야 하는지 알아갈 수 있습니다.

ThreadPoolExecutor 해당 ExecutorService 의 기본 구현체의 속성에 대해 저번 시간에 알아보았습니다.

corePoolSize , maximumPoolsize 의 차이를 알아보겠습니다.

ExecutorUtils 에 해당 method 추가

      public static void printState(ExecutorService executorService, String taskName) {
        if (executorService instanceof ThreadPoolExecutor poolExecutor) {
            int pool = poolExecutor.getPoolSize();
            int active = poolExecutor.getActiveCount();
            int queuedTasks = poolExecutor.getQueue().size();
            long completedTask = poolExecutor.getCompletedTaskCount();
            log(taskName + " -> [pool=" + pool + ", active=" + active + ", queuedTasks=" + queuedTasks + ", completedTask=" + completedTask + "]");
        } else {
            log(executorService);
        }
    }

Main Method

public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ExecutorService es = new ThreadPoolExecutor(2, 4,
                3000, TimeUnit.MILLISECONDS, workQueue);
        printState(es);

        es.execute(new RunnableTask("task1"));
        printState(es, "task1");

        es.execute(new RunnableTask("task2"));
        printState(es, "task2");

        es.execute(new RunnableTask("task3"));
        printState(es, "task3");

        es.execute(new RunnableTask("task4"));
        printState(es, "task4");

        es.execute(new RunnableTask("task5"));
        printState(es, "task5");

        es.execute(new RunnableTask("task6"));
        printState(es, "task6");

        try {
            es.execute(new RunnableTask("task7"));
        } catch (RejectedExecutionException e) {
            log("task7 실행 거절 예외 발생: " + e);
        }

        sleep(3000);
        log("== 작업 수행 완료 ==");
        printState(es);

        sleep(3000);
        log("== maximumPoolSize 대기 시간 초과 ==");
        printState(es);

        es.close();
        log("== shutdown 완료 ==");
        printState(es);
    }
  

해당 코드를 실행하게 되면 task 2번까지 active 의 실행중인 스레드는 2개이며 task 4번까지 실행하면 queue 에 2개의 작업이 들어가게 됩니다.

작업을 6까지 실행하게 되면 maximumPool = 4 이므로 pool size 가 4까지 증가 하게 됩니다.

그다음 task 7번을 실행하게 되면 예외가 발생하게 됩니다. RejectedExecutionException

maximumPoolSize 가 지정된 시간인 3초가 지나게 되면 기보 PoolSize = 2 로 변하게 됩니다.

마지막으로 close() 함수가 호출되게 되면 원래의 기본스레드 = 2도 0으로 변하게 됩니다.


Executor 스레드 풀 관리 - 분석

위의 내용을 순서 상으로 정리해보겠습니다.

  • Executor 는 스레드 풀에 스레드가 core 사이즈 만큼 있는지 확인한다.

    • core 사이즈 만큼 없다면 스레드를 하나 생성한다.
  • core 사이즈 만큼 스레드가 이미 만들어져 있고, 스레드 풀에 사용할 수 있는 스레드가 없으면 이 경우 큐에 작
    업을 보관한다.

  • 큐가 가득차게 되면 초과 스레드를 만들어서 작업을 수행한다.

    • max=4 : 기본 스레드 2개에 초과 스레드 2개 합계 총 4개 가능 (초과 스레드 = max - core )
  • 스레드 풀의 스레드도 max 사이즈 만큼 가득찼다면 큐에 작업도 가득ㅊ찬 상태이므로 RejectedExecutionException 예외가 발생합니다.

  • 스레드가 차례로 작업을 완료하고 스레드 풀에 대기 상태로 돌아가게 됩니다.

  • 스레드 풀의 스레드는 큐의 데이터를 획득하기 위해 대기한다.

  • 초과 스레드들은 지정된 시간까지 작업ㅇ르 하지 않고 대기하며 제거 됩니다.

스레드 미리 생성하기

응답시간이 아주 중요한 서버라면 요청을 받기 전에 스레드를 스레드 풀에 미리 생성해 둘 수 있습니다.

이렇게 되면 스레드 생성 시간을 줄일 수 있습니다.
ThreadPoolExecutor.prestartAllCoreThreads() 을 사용하면 기본 스레드를 미리 생성할 수 있습니다.

ExecutorService 는 해당 메서드를 제공하지는 않습니다.

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(1000);
        printState(es);
        ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) es;
        poolExecutor.prestartAllCoreThreads();
        printState(es);
    }  

참고자료

inflearn

profile
이유를 찾아보자

0개의 댓글