Java - Executor 스레드 풀 관리 전략

INHEES·2025년 1월 15일

금일은 Java 의 다양한 Executor 스레드 풀 관리 전략에 대해 알아보겠습니다.

목차

  • 고정 풀 전략
  • 캐시 풀전략
  • 사용자 정의 풀전략
  • Executor 예외 정책

고정 풀 전략

Executors 클래스를 통해 3가지 기본 스레드를 기본 전력을 제공합니다.

  • newSingleThreadPool(): 단일 스레드 풀 전략
    • 스레드 풀에 기본 스레드 1개만 사용합니다.
    • 큐 사이즈에 제한이 없다.(LinkedBlockingQueue)
    • 간단히 사용하기에 테스트 용도로 사용합니다.
      new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<Runnable>())
  • newFixedThreadPool(nThreads): 고정 스레드 풀 전략
  • newCachedThreadPool(): 캐시 스레드 풀 전략

newFixedThreadPool(nThreads)

  • 스레들 풀에 nThreads 만큼의 기본 스레드를 생성하며 초과 스레드는 생성하지 않습니다.
  • 큐 사이즈에 제한이 없습니다.
  • 스레드 수가 고정되어 있기 때문에, cpu, 메모리 리소스가 어느정도 예측 가능한 안정적인 방식입니다.
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
 new LinkedBlockingQueue<Runnable>())  
    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(2);
        //ExecutorService es = new ThreadPoolExecutor(2, nThreads,
        //        0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

        log("pool 생성");
        printState(es);

        for (int i = 1; ; i++) {
            String taskName = "task" + i;
            es.execute(new RunnableTask(taskName)); // implements Runnable
            printState(es, taskName);
        }
        es.close();
        log("== shutdown 완료 ==");

    } 
                              

이 방식의 가장 큰 장점은 스레드 수가 고정되어서 cpu, 메모리 리소스가 어느 정도 예측이 가능합니다. 일반적인 상황에 가장 안정적으로 서비스를 이용 가능합니다.

고정 스레드 전략을 사용하게 된다면 추후에 트래픽이 증가하게되면 고정된 스레드 풀에 의해 큐에 무한히 작업이 쌓이고 사용자가 느린 경험을 하게 될 것이다.

결국에는 서버 자원은 여유가 있지만, 사용자가 점점 느려지는 문제가 발생하게 됩니다


캐시 풀 전략

newCachedThreadPool()

  • 기본 스레드를 사용하지 않고 , 60초 생존 주기를 가진 초과 스레드만 사용합니다.

  • 초과 스레드 수의 제한이 없다.

  • 큐에 작업을 저장하지 않는다 (SynchronousQueue)

    • 생산자의 요청을 스레드 풀의 소비자 스레드가 직접 받아서 바로 처리합니다.
  • 모든 요청이 대기하지 않고 스레드가 바로바로 처리합니다.

    1.ExecutorService es = Executors.newCachedThreadPool();
    
    2.new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    

SynchronousQueue

  • BlockingQueue 인터페이스의 구현체중 하나입니다.
  • 이 큐는 내부에 저장공간이 없고 생산자의 작업을 소비자 스레드에게 직접 전달한다 (Redis pub/sub 기능과 비슷 합니다.)
  • 이름 그대로 생상자와 소비자를 동기화 하는 큐이다.

특징

  • 캐시 스레드 풀 전략은 매우 빠르고, 유연한 전략입니다.
  • 기본 스레드도 없고, 대기 큐에 작업이 쌓이지 않습니다.
  • 트래픽에 따라 자동 으로 스레드의 수는 스케일링 됩니다.
  • 초과 스레드가 생성되며 풀에 대기하는 초과 스레드가 있다면 재사용 됩니다.

주의할 점으로는 사용자가 점점 증가하면서 스레드 사용량도 함께 늘어나며 cpu 메모리의 사용량도 자연스럽게 증가하며 자원의 한계가 있기 때문에 적절한 시점에 시스템을 증설해야 합니다. 그렇지 않으며 시스템이 다운 될 수 있습니다.

캐시 스레드 풀 전략은 서버의 자원을 최대한 사용하지만, 서버가 감당할 수 있는 임계점을 파악해야 합니다.


사용자 정의 풀 전략

해당 전략은 쉽게 말해서 사용자의 요청이 갑자기 증가하면 긴급하게 스레드를 투입하는 방법입니다.

처리 가능한 수준의 사용자 요청만 처리하고 나머지 요청은 거절해야 해야 서버가 다운되는 최악의 상황을 피할 수 있는 전략입니다.

  ExecutorService es = new ThreadPoolExecutor(100, 200, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000));

해당 코드를 보면 기본 스레드를 100개 사용하고 긴급한 상황이면 생명주기가 60초인 100개의 초과 스레드를 사용할 수 있습니다.

1000r개의 작업이 큐에 대기할 수 있습니다.

    static final int TASK_SIZE = 1100; // 1. 일반
    //static final int TASK_SIZE = 1200; // 2. 긴급
    //static final int TASK_SIZE = 1201; // 3. 거절

    public static void main(String[] args) {
        ExecutorService es = new ThreadPoolExecutor(100, 200,
                60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
        printState(es);

        long startMs = System.currentTimeMillis();
        for (int i = 1; i (= TASK_SIZE; i++) {
            String taskName = "task" + i;
            try {
                es.execute(new RunnableTask(taskName));
                printState(es, taskName);
            } catch (RejectedExecutionException e) {
                log(taskName + " -> " + e);
            }
        }


        es.close();
        long endMs = System.currentTimeMillis();
        log("time: " + (endMs - startMs));

    }  

기본 스레드의 개수와 큐의 사이즈의 합을 넘지 않는다면 초과 스레드는 사용되지 않습니다.

큐의 크기와 초과 스레드의 개수의 합을 넘게되는 작업이 들어오게 되면 RejectedExecutionException 해당 에러가 발생하게 됩니다.

초과 스레드가 사용되게 되는 작업의 총개수가 1200개 일때는 1200 / 200 = 6초 로 두배 짜른 속도로 처리가 가능합니다. (cpu, 메모리 사용량의 정책 필수)

주의

new ThreadPoolExecutor(100, 200, 60, TimeUnit.SECONDS, new
LinkedBlockingQueue());  

다음과 같이 큐의 사이즈를 문한대로 사용하게 되면 초과 스레드는 사용되지 않기 때문에 주의 해야 합니다.
=> ArrayBlockingQueue


Executor 예외 정책

소비자가 처리할 수 없을 정도로 생산 요청이 가득 차면 어떻게 처리해야 할지 예외 정책을 정해야 합니다.

ThreadPoolExecutor

작업을 거절하는 다양한 정책을 제공합니다.

  • AbortPolicy: 새로운 작업을 제출할 때 RejectedExecutionException 을 발생시킨다. 기본 정책이다.

  • DiscardPolicy: 새로운 작업을 조용히 버린다.

  • CallerRunsPolicy: 새로운 작업을 제출한 스레드가 대신해서 직접 작업을 실행한다.

  • 사용자 정의( RejectedExecutionHandler ): 개발자가 직접 정의한 거절 정책을 사용할 수 있다.

AbortPolicy

    public static void main(String[] args) {
      ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
              new SynchronousQueue<>(), new ThreadPoolExecutor.AbortPolicy());

      executor.submit(new RunnableTask("task1"));

      try {
          executor.submit(new RunnableTask("task2"));
      } catch (RejectedExecutionException e) {
          log("요청 초과");
          // 포기, 다시 시도 등 다양한 고민을 하면 됨
          log(e);
      }

      executor.close();
  }

테스트를 편리하기 위해서 SynchronousQueue 를 사용하며 생성자 마지막에 AbortPolicy 해당 정책을 넣어주면 됩니다.

AbortPolicy의 경우 기본 정책이기에 생략해도 무방합니다.


public static class AbortPolicy implements RejectedExecutionHandler {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
           throw new RejectedExecutionException("Task " + r.toString() +
           " rejected from " +
           e.toString());
       }
}

ThreadPoolExecutor 의 생성자의 마지막은 Handler 를 받게 되어 있습니다.

AbortPolicy는 RejectedExecutionHandler의 구현체이다. 때문에 ThreadPoolExecutor 생성자는 RejectedExecutionHandler 의 구현체를 전달 받는다.

DiscardPolicy

거절된 작업은 무시하고 아무런 예외도 발생시키지 않습니다.

public static class DiscardPolicy implements RejectedExecutionHandler {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
       // empty
   }
}

CallerRunsPolicy

호출한 스레드가 직접 작업을 수행하게 됩니다. 이로 인해 새로운 작업을 제출하는 스레드의 속도가 느려질 수 있다.

이 정책의 특징은 생산자 스레드가 소비자 대신 일을 수행하는 것도 있지만, 생산자 스레드가 대신 일을 수행하는 덕분에 작업의 생산 자체가 느려진다는 점이다

스레드 풀에 포관할 큐도 없고, 작업할 스레드가 없다면 거절하지 않고 작업을 요청한 main thread 에 대신 일을 시키게 됩니다.

main thread 는 task1를 본인이 직접 완료하고 나서 다음 task2 를 생상할 수 있고 결과적으로 생산 속도를 조절할 수 있습니다.


public static class CallerRunsPolicy implements RejectedExecutionHandler {
     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
         if (!e.isShutdown()) {
             r.run();
         }
     }
}

main threadrejectedExecution 해당 메서드를 호출하게 됩니다.

그런데 CallerRunsPolicy 정책은 shutdown() 이후에도 작업을 수행해버린다. 따라서 shutdown()
조건을 체크해서 이 경우에는 작업을 수행하지 않도록 한다.

사용자 정의

사용자 정의(RejectedExecutionHandler): 사용자는 RejectedExecutionHandler 인터페이스를 구현
하여 자신만의 거절 처리 전략을 정의할 수 있다.

이를 통해 특정 요구사항에 맞는 작업 거절 방식을 설정가능합니다.

    public static void main(String[] args) {
      ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
              new SynchronousQueue<>(), new MyRejectedExecutionHandler());

      executor.submit(new RunnableTask("task1"));
      executor.submit(new RunnableTask("task2"));
      executor.submit(new RunnableTask("task3"));
      executor.close();
  }

  static class MyRejectedExecutionHandler implements RejectedExecutionHandler {

      static AtomicInteger count = new AtomicInteger(0);

      @Override
      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
          int i = count.incrementAndGet();
          log("[경고] 거절된 누적 작업 수: " + i);
      }
  }

해당 코드에서 submit 함수가 실행되고 rejectedExecution 를 특정상황에 호출하게 됩니다.

가장 좋은 방법은 서비스의 트래픽을 예측해서 최적화 상황을 초기에 완성해놓으면 가장 좋을것 같습니다.


참고

inflearn

profile
이유를 찾아보자

0개의 댓글