(Java) Thread pool 생성과 관리

BaekGwa·2024년 8월 25일
0

✔️ Java

목록 보기
7/12

Thread Pool


Runnable

Thread 생성 비용

  • 새로운 Thread를 만드는 것은 매우 무거운 작업이다.
    1. 메모리 할당 : Thread는 자신만의 호출 스택을 가지고 있다. 이 메모리는 보통 1MB 이상의 메모리를 할당하여 사용한다.
    2. 운영체제 자원 사용 : Thread를 생성하는 작업은 운영체제 커널 수준에서 이뤄진다. 시스템 콜을 이용하여 처리되는데, 이때 CPU와 메모리 리소스를 소모한다.
    3. 운영체제 스케줄러 설정 : 새로운 Thread가 생성되면, 이 스레드의 실행 순서를 관리하고 조정하는 스케줄러가 동작한다. 추가적인 Thread 가 생성 될 수록, 운영체제의 스케줄러는 관리 작업을 진행하여 추가적인 오버헤드가 발생한다.

Runnable 생성의 단점

  • Runnable 인터페이스 구현을 통해 Thread를 생성하게 되면 몇가지 단점이 존재한다.
    1. 반환 값이 없다 : void run(); 메서드를 재정의 하여 사용하는 Runnable의 특성상, Thread의 생성 완료의 결과값을 바로 return 받아서 사용할 수 없다. 변수를 사용하고, join()을 사용하여 결과 완료를 명시적으로 완료 하고 확인하여 처리할 수 있다.
    2. 예외처리가 없다 : Runnable 객체는 체크 예외에 대한 부분이 처리되어있지 않다. 따라서 Thread.sleep(ms) 등을 사용할 때, 별도의 예외 (try-catch)처리가 필요하다.
    3. Thread 재활용이 되지 않는다 : 앞서 Thread 생성비용이 꽤 무거운 작업임을 알 수 있었다. 한번의 Thread는 완료가 된 이후에는 재사용할 수 없다. 재사용 하기 위해서는 Thread를 관리하는 객체를 별도로 생성하여야 한다.
    4. Thread 생성으로 인한 오버헤드 : 3번 문제와 연결되는 문제로, 재활용이 되지 않아, 지속적인 생성 작업으로 인해 성능 오버헤드가 발생 할 수 있다.

Java의 해결

  • Java는 이러한 단점을 해결하기 위해, Executor 프레임워크를 제공하여 Thread 관리를 쉽게 할 수 있도록 지원한다.

Executor 프레임워크

Executor 소개

  • Executor 또한, 인터페이스로, Runnable 객체를 입력받아 실행시켜주는 동일한 인터페이스다.
package java.util.concurrent;

public interface Executor {
	void execute(Runnable command);
}

ExecutorService 소개

  • 이 Executor를 상속받아 다양한 작업을 할 수 있도록 만든, ExecutorService 인터페이스가 존재한다.
public interface ExecutorService extends Executor, AutoCloseable {
	<T> Future<T> submit(Callable<T> task);
    
    ...
}
  • ExecutorService에는 다양한 메스닥 존재하며, 실행을 위한 submit(), ThreadPool을 종료하기 위한 close() 등이 있다.
  • submit()은 Callable 객체를 상속 받는데, 이는 반환값이 있는 Runnable 객체라고 생각하면 된다.
  • submit()은 Future 객체를 반환하는데, 이는 결과값을 의미하며, 완료 여부, 완료 시 결과 값 등을 가지고 있는 객체이다.
  • 해당 인터페이스를 구현하고 있는 대표적인 구현체는, ThreadPoolExecuto가 있다.

ExecutorService 사용. 예제1)

  • ExecutorService는 다양한 매개변수를 입력 받는다.
  • corePoolSize : 스레드 풀의 기본 크기를 정의합니다. 즉, 풀에서 항상 유지될 최소 스레드 수입니다.
  • maximunPoolSize : 스레드 풀에서 허용할 수 있는 최대 스레드 수입니다. corePoolSize보다 큰 값으로 설정할 수 있으며, 최대 스레드 수까지 작업을 동시에 처리할 수 있습니다. 스레드 풀에서 작업이 급증할 경우, 이 값에 따라 추가 스레드가 생성됩니다.
  • keepAliveTime :스레드가 풀에서 유휴 상태로 유지될 최대 시간입니다. corePoolSize보다 많은 스레드가 생성되었을 때, 그 스레드가 유휴 상태로 이 시간만큼 유지된 후에는 종료됩니다.
  • TimeUnit : keepAliveTime의 시간 단위를 지정합니다.
  • BlockingQueue : 작업을 대기열에 저장하는 큐입니다. 스레드 풀에서 처리할 작업이 많아지면, 이 큐에 작업이 쌓이게 됩니다. 일반적으로 LinkedBlockingQueue, ArrayBlockingQueue, SynchronousQueue 등이 사용되며, 각각의 특성에 맞게 선택합니다.
  • (선택) ThreadFactory : 새로운 스레드를 생성할 때 사용할 공장을 정의합니다. 기본적으로 Java는 표준 스레드 생성 방식을 사용하지만, 특정 스레드 명명 규칙을 사용하거나, 데몬 스레드를 생성해야 하는 경우 직접 구현할 수 있습니다.
  • (선택) RejectedExcutionHandler : 스레드 풀에서 작업을 처리할 수 없을 때의 처리 방식을 정의합니다.

(선택) 사항을 제외 하고는 모두 필수적인 사항으로, 테스트를 위해 아래의 셋팅으로 진행하여 코드를 작성해 보겠습니다.

package thread.executor;

import java.util.concurrent.*;

public class ExecutorServiceMain {

    public static void main(String[] args) {
        ExecutorService es = new ThreadPoolExecutor(2, 2, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));

        System.out.println(Thread.currentThread().getName() + " : Start!");

        Future<Boolean> result1 = es.submit(new CallableTask());
        Future<Boolean> result2 = es.submit(new CallableTask());
        Future<Boolean> result3 = es.submit(new CallableTask());

        System.out.println("result1 state() : " + result1.state());
        System.out.println("result2 state() : " + result2.state());
        System.out.println("result3 state() : " + result3.state());

        System.out.println(Thread.currentThread().getName() + " : End!");
    }

    private static class CallableTask implements Callable<Boolean> {

        @Override
        public Boolean call() throws Exception {
            System.out.println(Thread.currentThread().getName() + " : Start!");
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " : End!");

            return true;
        }
    }

}
main : Start!
result1 state() : RUNNING
result2 state() : RUNNING
result3 state() : RUNNING
pool-1-thread-2 : Start!
main : End!
pool-1-thread-1 : Start!
pool-1-thread-1 : End!
pool-1-thread-2 : End!
pool-1-thread-2 : Start!
pool-1-thread-2 : End!

예제1) 해석

  • 콘솔로그와 결과 먼저 분석해본다.
    • 몇가지 특이점이 존재한다.
      1. pool-1-thread-1이 두번 실행되었다.
      2. main은 시작로그를 기록하고, 종료가 실행되었다.
      3. 해당 프로그램은 thread에 넣어둔 3개의 작업이 완료되어도, 계속 실행 중이다.
    • 이를 해석하면 다음과 같다.
      1. 한번에 실행될 수 있는 스레드는 ThreadPoolExecutor를 생성할때 2로 설정 해두었다. 따라서 pool-1-thread-1재활용하여 es.submit() 세번째 구문에 재활용하여 사용하였다.
      2. submit()을 통해 실행한 다른 Thread는 non-blocking 방식으로 실행되었다. state()를 통해 확인 해보았더니, Running 상태로 확인 되었다.
      3. main thread는 마지막 End! 콘솔이 실행된 것으로 보아, 종료 되었지만, ThreadPoolExecutor에서 생성된 2개의 corePool은 계속 실행 중 이다.
  • 분석 결과, ThreadPoolExecutor의 여러가지 특징을 알 수 있었다.
  • 3개의 thread를 실행 요청하였더니, 순착적으로 2가지 먼저 실행되었고, 하나가 완료 된 후, BlockingQueue에 있는 작업이 진행 되었다. 이때, Thread는 새로 생성되지 않았고, 재활용하여 자원을 관리 하였다.

예제2) ThreadPool 종료와 get() 메서드로 결과값 읽어 오기.

package thread.executor;

import static util.MyLogger.log;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceMainV2 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(2, 3, 10, TimeUnit.MILLISECONDS, new SynchronousQueue<>());

        System.out.println(Thread.currentThread().getName() + " : Start!");

        Future<Boolean> result1 = es.submit(new CallableTask());
        Future<Boolean> result2 = es.submit(new CallableTask());
        Future<Boolean> result3 = es.submit(new CallableTask());

        System.out.println("result1.get() = " + result1.get());
        System.out.println("result2.get() = " + result2.get());
        System.out.println("result3.get() = " + result3.get());

        System.out.println("result1 state() : " + result1.state());
        System.out.println("result2 state() : " + result2.state());
        System.out.println("result3 state() : " + result3.state());

        printState(es);
        Thread.sleep(10000);
        System.out.println("10초 대기중...");
        printState(es);

        System.out.println(Thread.currentThread().getName() + " : End!");
        es.close();
    }

    private static class CallableTask implements Callable<Boolean> {

        @Override
        public Boolean call() throws Exception {
            System.out.println(Thread.currentThread().getName() + " : Start!");
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " : End!");

            return true;
        }
    }

    public static void printState(ExecutorService executorService) {
        if (executorService instanceof ThreadPoolExecutor poolExecutor) {
            int pool = poolExecutor.getPoolSize();
            int active = poolExecutor.getActiveCount();
            int queue = poolExecutor.getQueue().size();
            long completedTasks = poolExecutor.getCompletedTaskCount();
            System.out.println("[pool=" + pool + ", active=" + active + ", queueTasks=" + queue
                    + ", completedTasks" + completedTasks + "]");
        } else {
            System.out.println(executorService);
        }
    }
}
main : Start!
pool-1-thread-3 : Start!
pool-1-thread-1 : Start!
pool-1-thread-2 : Start!
pool-1-thread-1 : End!
pool-1-thread-3 : End!
pool-1-thread-2 : End!
result1.get() = true
result2.get() = true
result3.get() = true
result1 state() : SUCCESS
result2 state() : SUCCESS
result3 state() : SUCCESS
[pool=3, active=0, queueTasks=0, completedTasks3]
10초 대기중...
[pool=2, active=0, queueTasks=0, completedTasks3]
main : End!

예제2) 분석

  • 로그와 콘솔을 통해 분석을 해보겠다. 다음같은 특징이 있다.
    1. main thread는 모든 작업이 완료가 되고, 스레드 종료 되었다.
    2. get()을 실행하였더니, 잠시 log 출력이 중지되었다.
    3. get()을 통해 결과값을 확인 해 보니, true 가 확인 되었고, 이후 state()를 확인 했더니, SUCCESS 로 확인 되었다.
    4. ThreadPool 의 상태를 확인해 보았더니, Pool 갯수가 3개였고, 10초 뒤 확인 해 보니, Pool이 2개로 확인되었다.
    5. 프로그램이 정상 종료 되었다.
  • 이를 통해 우리는 ExecutorService의 특징을 조금 더 알 수 있다.
    1. 1~3 같이 설명
    2. 1~3 같이 설명
    3. 이전과 다르게 get()을 사용하면 결과값을 받을 수 있는데, 이는 호출한 thread의 결과값이 나올 떄 까지 대기 한다. 이는 blocking 방식으로 동작함을 알 수 있다.
    4. ThreadPoolExecutor의 설정 대로, 2개의 기본 pool을 설정 하였지만, 3개 이상의 작업이 들어와서 급한대로 1개의 추가 thread를 생성 하엿다. 또한, 생존 시간은 10초 이므로, 10초동안 생존하다가 이후 삭제되어 2개로 변경 된 모습을 볼 수 있다.

예제3) BlockingQueue의 종류에 따른 동작 관계

  • 예제2번 코드에서, ThreadPoolExcutor의 파라미터만 변경한 채 진행 예정이다.

Case1) ArrayBlockingQueue

ExecutorService es = new ThreadPoolExecutor(2, 3, 10, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
main : Start!
pool-1-thread-2 : Start!
pool-1-thread-1 : Start!
pool-1-thread-2 : End!
pool-1-thread-1 : End!
pool-1-thread-2 : Start!
result1.get() = true
result2.get() = true
pool-1-thread-2 : End!
result3.get() = true
result1 state() : SUCCESS
result2 state() : SUCCESS
result3 state() : SUCCESS
[pool=2, active=0, queueTasks=0, completedTasks3]
10초 대기중...
[pool=2, active=0, queueTasks=0, completedTasks3]
main : End!

Case2) LinkedBlockingDeque

ExecutorService es = new ThreadPoolExecutor(2, 3, 10, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());
main : Start!
pool-1-thread-1 : Start!
pool-1-thread-2 : Start!
pool-1-thread-2 : End!
pool-1-thread-1 : End!
pool-1-thread-2 : Start!
result1.get() = true
result2.get() = true
pool-1-thread-2 : End!
result3.get() = true
result1 state() : SUCCESS
result2 state() : SUCCESS
result3 state() : SUCCESS
[pool=2, active=0, queueTasks=0, completedTasks3]
10초 대기중...
[pool=2, active=0, queueTasks=0, completedTasks3]
main : End!

Case3) SynchronousQueue, maxThread 2 설정

ExecutorService es = new ThreadPoolExecutor(2, 2, 10, TimeUnit.MILLISECONDS, new SynchronousQueue<>());
main : Start!
pool-1-thread-2 : Start!
pool-1-thread-1 : Start!
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@30dae81[Not completed, task = thread.executor.ExecutorServiceMainV2$CallableTask@7cd84586] rejected from java.util.concurrent.ThreadPoolExecutor@5b6f7412[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
	at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:145)
	at thread.executor.ExecutorServiceMainV2.main(ExecutorServiceMainV2.java:25)
pool-1-thread-1 : End!
pool-1-thread-2 : End!

예제3) 분석

Case1)

  • 먼저 ArrayBlockingQueue를 사용한 결과의 분석이다.
    1. thread-1, thread-2 만 실행되었다.
    2. thread의 풀은 10초전후 상관없이 2개만 생성되었다. 완료된 작업은 3개이다.
  • 따라서 이 내용을 통해 특징을 설명하면 다음과 같다.
    1. Blocking Queue에 담는것이, maxThread 값 까지 생성한는 것보다 우선 순위이다.

Case2)

  • LinkedBlockingDeque의 결과
    1. ArrayBlockingQueue와 동일하게 thread-1, thread-2 만 생성되었고, thread-3는 생성되어 관리 되지 않았다.
  • LinkedList와 동일하게 LinkedBlockingDeque는 링크 구조를 가진 자료 구조로, Capacity가 무한대로, 해당 큐가 가득차는 경우는 없다.
  • 따라서, 다음 우선순위인 maxThread 값을 사용해서 새로운 Thread를 만드는 경우는 절대 없다.

Case3)

  • LinkedBlockingDeque의 결과
    1. thread1, thread2를 실행하고 thread3을 실행할 때, RejectedException이 발생하였다.
    2. 이에 따라, main 로직은 es.close()를 실행하지 못하고, 무한 대기 상태에 빠져 버렸다.
  • 이를 통해, 큐에 넣을 수도 없고, maxThread 까지 생성을 해도 입력받는 작업을 수행 하거나 대기 할 수 없는 경우, Exception 이 발생하는 것을 확인 할 수 있다.

정리

  • 위의 내용대로, ThreadPoolExecutor은 다양한 특징을 가지고 있다.

  • 또한, 사용자의 서비스 및 시스템 상황에 맞게 여러가지 설정을 변경하며 Thread Pool 전략을 변경 할 수 있다.

    • Thread Pool 전략은 다음에 정리 예정.
  • 특징의 종류

    1. 기본 Non-Blocking 지원. 기능에 따라 Blocking으로 변환 가능
    2. Thread Pool의 설정 및 Blocking Queue의 종류로 다양한 방법의 Thread 제어가 가능

추가적인 메서드 (기능)

  • 여러가지 메서드를 지원하지만, 유용하게 사용하는 몇가지 메서드를 소개한다.

invokeAll()

  • 모든 Callable 작업을 제출하고 모든 작업이 완료 될 때 까지 기다린다.
  • Blocking.

invokeAny()

  • 모든 Callable 작업 중, 하나의 Callable 작업이 완료 될때까지 대기, 가장 먼저 완료된 작업의 결과값을 반환한다.
  • 완료되지 않은 나머지 작업은 취소된다.
profile
현재 블로그 이전 중입니다. https://blog.baekgwa.site/

0개의 댓글