스레드 풀 & Executor

이동건 (불꽃냥펀치)·2025년 1월 20일
0

스레드를 사용할 때 생기는 문제

  1. 스레드 생성 시간으로 인한 성능문제
  2. 스레드 관리 문제
  3. Runnable 인터페이스의 불편함

1,2번 문제를 해결하기 위해 스레드 풀이라는 기술이 도입됨

  • 스레드를 관리하는 스레드 풀에 스레드를 미리 필요한 만큼 만들어둔다
  • 스레드가 스레드풀에서 대기할 때 작업요청이 옴
  • 스레드풀에서 이미 만들어진 스레드를 하나 조회
  • 스레드 1은 작업을 완료 후 종료하는 게 아닌 다시 스레드 풀에 반납된다
  • 스레드 1은 이후에 재사용될 수 있다



Executor

Executor 인터페이스

 package java.util.concurrent;
 public interface Executor {
     void execute(Runnable command);
}

주요 메서드

 public interface ExecutorService extends Executor, AutoCloseable {
     <T> Future<T> submit(Callable<T> task);
     @Override
     default void close(){...}
... }

Runnable 인터페이스 구현

public class RunnableTask implements Runnable {
    private final String name;
    private int sleepMs = 1000;
    public RunnableTask(String name) {
        this.name = name;
	}
    public RunnableTask(String name, int sleepMs) {
        this.name = name;
        this.sleepMs = sleepMs;
	}
    @Override
    public void run() {
			log(name + " 시작"); sleep(sleepMs); 
            // 작업 시간 시뮬레이션 
            log(name + " 완료");
	} 
}

ExecutorService 구현

public class ExecutorBasicMain {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(2,2,0,TimeUnit.MILLISECONDS, 
        new LinkedBlockingQueue<>()); 
        log("== 초기 상태 ==");
        printState(es);
        es.execute(new RunnableTask("taskA"));
        es.execute(new RunnableTask("taskB"));
        es.execute(new RunnableTask("taskC"));
        es.execute(new RunnableTask("taskD"));
        log("== 작업 수행 중 =="); printState(es);
		sleep(3000);
		log("== 작업 수행 완료 =="); printState(es);
		es.close();
		log("== shutdown 완료 =="); printState(es);
	} 
}

  • ExectorService의 가장 대표적인 구현체는 ThredPoolExecutor이다
  • ThreadPoolExecutor(ExecutorService)는 크게 2가지 요소가 있다
    • BlockingQueue:작업을 보관한다
    • ThreadPool: 스레드를 보관한다

ThreadPoll Executor 생성자

  • corePoolSize: 스레드 풀에서 관리하는 기본적인 스레드의 수
  • maximunPoolSoze: 스레드 풀에서 관리하는 최대 스레드의 수
  • keepAlivetime, TimeUnit unit:기본 스레드 수를 초과해서 만들어진 스레드가 생존할 수 있는 대기시간 이시간동안 처리할 작업이 없으면 초과 스레드는 제거된다
  • BlockingQueue workQueue: 작업을 보관할 블로킹 큐

Runnable의 불편함

  • 반환값이 없음: run()메서드는 반환값을 가지지 않는다.따라서 실행결과를 얻기 위해서는 별도의 메카니즘을 실행해야한다.
  • 예외처리: run()메서드는 체크예외를 던질 수 없다. 체크 예외의 처리는 메서드 내부에서 처리해야한다.
  • ExecutorService는 이런 문제를 깔끔하게 해결해준다

Callable & Future

public class CallableMainV1 {
     public static void main(String[] args) throws ExecutionException,InterruptedException {
         ExecutorService es = Executors.newFixedThreadPool(1);
         Future<Integer> future = es.submit(new MyCallable());
         Integer result = future.get();
         log("result value = " + result);
         es.close();
     }
     static class MyCallable implements Callable<Integer> {
         @Override
		public Integer call() {
			log("Callable 시작");
			sleep(2000);
			int value = new Random().nextInt(10); 
            log("create value = " + value); 
            log("Callable 완료");
			return value;
		}
	}
  • newFixedThredPool(size)을 사용하면 편리하게 ExectorService를 구현할 수 있다
  • MyCallable은 숫자를 반환하므로 반환할 제네릭 타입을 <Integer>로 선언했다
  • Runnable과 비슷하지만 반환값을 가진다는 점이 차이점이다 따라거 결과를 보관할 별도의 필드를 만들지 않아도된다
Future<Integer> future = es.submit(new MyCallable());
  • submit() 메서드를 통해 Callable을 작업으로 전달 할 수 있다
  • MyCallable의 인스턴스가 블로킹 큐에 전달되고, 스레드 풀의 스레드 중 하나가 이 작업을 실행할 것이다. 이때 작업의 처리 결과는 직접 반환되는 것이 아닌 Future라는 특별한 인터페이스로 전달된다.
Integer result = future.get();
  • future.get() 을 호출하면 MyCallablecall() 이 반환한 결과를 받을 수 있다.


Future 분석

Future<Integer> future = es.submit(new MyCallable());
  • submit()의 호출로 MyCallable의 인스턴스를 전달한다
  • 이때 submit()MyCallable.call()이 반환하는 무작위 숫자 대신에 Future을 반환
  • MyCallable이 즉시 실행되어 결과를 반환하는 것은 불가능하다. 그 이유는 MyCallable은 즉시 실행되는 것이 아닌 스레드풀의 스레드가 미래의 어떤 시점에 이 코드를 대신 실행해야하기 때문이다.
  • 따라서 결과를 즉시 받는 것은 불가능하다. 이런 이유로 es.submit()MyCallable 의 결과를 반환하는 대 신에 MyCallable 의 결과를 나중에 받을 수 있는 Futurue 라는 객체를 대신 제공한다.

  • 요청 스레드는 submit()을 호출받은 경우 Future가 만들어지고 전달한 작업인 taskA가 바로 블로킹 쿠에 담기는 것이 아니라 그림처럼 taskA를 감싼 Future가 대신 블로킹 큐에 담긴다.

  • 큐에 들어있는 Furute를 꺼내서 스레드 풀의 스레드1이 작업을 시작한다
  • 스레드1은 FutureTaskrun() 메서드를 수행한다
  • 그리고 run() 메서드가 taskAcall() 메서드를 호출하고 그 결과를 받아서 처리한다
  • 요청 스레드는 작업의 결과가 필요해서 future.get() 을 호출한다.
    • taskA의 작업이 완료되면 Future의 상태도 완료로 변경
    • 요청 스레드가 future.get() 을 호출하면 Future 가 완료 상태가 될 때 까지 대기한다. 이때 요청 스레드의 상태는 RUNNABLE ->WAITING 이 된다.
  • Future가 완료 상태가 아님: taskA 가 아직 수행되지 않았거나 또는 수행 중이라는 뜻이다. 이때는 어쩔 수 없이 요청 스레드가 결과를 받기 위해 대기해야 한다. 요청 스레드가 마치 락을 얻을 때처럼, 결과를 얻기 위해 대기한 다. 이처럼 스레드가 어떤 결과를 얻기 위해 대기하는 것을 블로킹(Blocking)이라 한다.

Future 잘못 사용

Future<Integer> future1 = es.submit(task1); //non-blocking 
Integer sum1 = future1.get(); // blocking, 2초 대기
Future<Integer> future2 = es.submit(task2); // non-blocking
Integer sum2 = future2.get(); // blocking, 2초 대기
Integer sum1 = es.submit(task1).get(); // get()에서 블로킹 
Integer sum2 = es.submit(task2).get(); // get()에서 블로킹

주요 메서드

boolean cancel(boolean mayInterruptIfRunning)

  • 기능: 아직 완료되지 않은 작업을 취소한다
  • 매개변수: mayInterruptIfRunning
    • cancel(true): Future를 취소 상태로 변경한다. 이때 작업이 실행중이라면 Thread.interrupt()를 호출해서 작업을 중단한다.
    • cancel(false): Future를 취소 상태로 변경한다. 단 작업중인 작업을 중단하지는 않는다
  • 반환값: 작업이 성공적으로 취소된 경우 true , 이미 완료되었거나 취소할 수 없는 경우 false

boolean isCancelled()

  • 기능: 작업이 취소되었는지 여부를 확인한다.
  • 반환값: 작업이 취소된 경우 true , 그렇지 않은 경우 false
  • 설명: 이 메서드는 작업이 cancel() 메서드에 의해 취소된 경우에 true 를 반환한다.

boolean isDone()

  • 기능: 작업이 완료되었는지 여부를 확인한다.
  • 반환값: 작업이 완료된 경우 true , 그렇지 않은 경우 false
  • 설명: 작업이 정상적으로 완료되었거나, 취소되었거나, 예외가 발생하여 종료된 경우에 true 를 반환한다.
    State state()
  • 기능: Future 의 상태를 반환한다. 자바 19부터 지원한다.
    RUNNING : 작업 실행 중 SUCCESS : 성공 완료 FAILED : 실패 완료 CANCELLED : 취소 완료

V get()

  • 기능: 작업이 완료될 때까지 대기하고, 완료되면 결과를 반환한다. 반환값: 작업의 결과
  • 예외
    • InterruptedException : 대기 중에 현재 스레드가 인터럽트된 경우 발생
    • ExecutionException : 작업 계산 중에 예외가 발생한 경우 발생
  • 설명: 작업이 완료될 때까지 get() 을 호출한 현재 스레드를 대기(블록킹)한다. 작업이 완료되면 결과를 반환한 다.

취소

public class FutureCancelMain {
	private static boolean mayInterruptIfRunning = true; // 변경
	//private static boolean mayInterruptIfRunning = false; // 변경
     public static void main(String[] args) {
         ExecutorService es = Executors.newFixedThreadPool(1);
         Future<String> future = es.submit(new MyTask());
         log("Future.state: " + future.state());
// 일정 시간 후 취소 시도 
		sleep(3000);
// cancel() 호출
		log("future.cancel(" + mayInterruptIfRunning + ") 호출");
		boolean cancelResult1 = future.cancel(mayInterruptIfRunning); 				
        log("Future.state: " + future.state());
		log("cancel(" + mayInterruptIfRunning + ") result: " + cancelResult1);
// 결과 확인 
		try {
			log("Future result: " + future.get());
			} catch (CancellationException e) { // 런타임 예외
			log("Future는 이미 취소 되었습니다.");
			} catch (InterruptedException | ExecutionException e) {
             e.printStackTrace();
         }
// Executor 종료
         es.close();
     }
     static class MyTask implements Callable<String> {
		@Override
		public String call() {
    		try {
        		for (int i = 0; i < 10; i++) {
				log("작업 중: " + i);
				Thread.sleep(1000); // 1초 동안 sleep }
				} catch (InterruptedException e) { log("인터럽트 발생");
					return "Interrupted";
				}
   				 return "Completed";
		}
	} 
}
  
  • cancel(true) : Future 를 취소 상태로 변경한다. 이때 작업이 실행중이라면 Thread.interrupt()
    호출해서 작업을 중단한다.
  • cancel(false) : Future 를 취소 상태로 변경한다. 단 이미 실행 중인 작업을 중단하지는 않는다.



ExecutorService - 작업 컬렉션 처리

ExecutorService 는 여러 작업을 한 번에 편리하게 처리하는 invokeAll() , invokeAny() 기능을 제공한다.

public class InvokeAllMain {
    public static void main(String[] args) throws ExecutionException,
InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(10);
        CallableTask task1 = new CallableTask("task1", 1000);
        CallableTask task2 = new CallableTask("task2", 2000);
        CallableTask task3 = new CallableTask("task3", 3000);
        List<CallableTask> tasks = List.of(task1, task2, task3);
        List<Future<Integer>> futures = es.invokeAll(tasks);
        for (Future<Integer> future : futures) {
            Integer value = future.get();
            log("value = " + value);
        }
        es.close();
    }
}

invokeAny() 는 한 번에 여러 작업을 제출하고, 가장 먼저 완료된 작업의 결과를 반환한다. 이때 완료되지 않은 나머 지 작업은 인터럽트를 통해 취소한다.

public class InvokeAnyMain {
     public static void main(String[] args) throws ExecutionException,
 InterruptedException {
         ExecutorService es = Executors.newFixedThreadPool(10);
         CallableTask task1 = new CallableTask("task1", 1000);
         CallableTask task2 = new CallableTask("task2", 2000);
         CallableTask task3 = new CallableTask("task3", 3000);
         List<CallableTask> tasks = List.of(task1, task2, task3);
         Integer value = es.invokeAny(tasks);
         log("value = " + value);
         es.close();
	} 
}



Executors 종료 메서드

서비스 종료

  • void shutdown()
    • 새로운 작업을 받지않고, 이미 제출된 작업을 모두 완료한 후에 종료한다
    • non blocking 메서드
  • List<Runnable> shutdownNow()
    • 실행중인 작업을 중단하고 대기중인 작업을 반환하며 즉시 종료한다
    • 실행 중인 작업을 중단하기 위해 인터럽트를 발생시킴
    • non blocking 메서드

서비스 상태 확인

  • boolean isShutdown()
    • 서비스가 종료되었는지 확인한다
  • boolean isTerminated()
    • shutdown(),shutdownNow() 호출후 모든 작업이 완료되었는지 확인

작업 완료 대기

  • boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException
    • 서비스 종료시 모든 작업이 완료될 때 까지 대기한다. 이때 지정된 시간까지만 대기한다.
    • blocking method

shutdown() - 처리중인 작업이 없는 경우

  • ExecutorService에 아무런 작업이 없고 스레드만 대기중인 상태
  • shutdown()을 호출 시 ExecutorService는 새로운 요청을 거절한다
    • 거절시 기본적으로 java.util.concurrent.RejectedExecutionException 예외가 발생한다
  • 스레드 풀의 자원을 정리한다

shutdown() - 처리중인 작업이 있는 경우

  • shutdown()을 호출 시 ExecutorService는 새로운 요청을 거절한다
  • 스레드 풀의 스레드는 처리중인 작업을 완료한다
  • 스레드 풀의 스레드는 큐에 남아있는 작업도 모두 꺼내서 완료한다

shutdownNow() - 처리중인 작업이 있는 경우

  • shutdown()을 호출 시 ExecutorService는 새로운 요청을 거절한다
  • 큐를 비우면서 큐에있는 작업들을 모두 꺼내서 컬렉션으로 반환한다
    • List<Runnable> runnables = es.shutdownNow()
  • 작업 중인 스레드에 인터럽트가 발생한다
    • 이때 작업중인 스레드의 경우 인터럽트가 걸린다
    • 큐에 대기중인 스레드는 수행되지 않는다
  • 작업을 완료하면 자원을 정리한다


Executor 스레드 풀 관리

ExecutorService의 기본 구현체인 ThreadPoolExecutor 생성자의 속성

  • corePoolSize : 스레드 풀에서 관리되는 기본 스레드의 수
  • maximumPoolSize : 스레드 풀에서 관리되는 최대 스레드 수
  • keepAliveTime , TimeUnit unit : 기본 스레드 수를 초과해서 만들어진 초과 스레드가 생존할 수 있는 대기 시간, 이 시간 동안 처리할 작업이 없다면 초과 스레드는 제거된다.
  • BlockingQueue workQueue : 작업을 보관할 블로킹 큐

그림으로 스레드 풀관리 알아보자

 BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ExecutorService es = new ThreadPoolExecutor(2, 4, 3000,
TimeUnit.MILLISECONDS, workQueue);
public class RunnableTask implements Runnable {
     private final String name;
     private int sleepMs = 1000;
     public RunnableTask(String name) {
         this.name = name;
	}
     public RunnableTask(String name, int sleepMs) {
         this.name = name;
         this.sleepMs = sleepMs;
	}
     @Override
     public void run() {
		log(name + " 시작"); 
        sleep(sleepMs); // 작업 시간 시뮬레이션
        log(name + " 완료");
	}
}
  • 기본 설정은 블로킹 큐의 길이를 2로 설정해, 최대 2개까지 작업을 큐에 보관할 수 있다
  • corePoolSize=2 , maximumPoolSize=4 를 사용해서 기본 스레드는 2개 최대 스레드는 4개로 설정했다
  • 기본 스레드 수를 초과해서 만들어진 스레드가 3초동안 작업을 하지 않고 대기한다면 초과 스레드는 스레드 풀에서 제거된다
  • 이 상황에서 Runnable구현체를 7개 만들어서 동작 시켰다

  • 스레드 풀에서 대기중인 스레드 1개를 가져와서 작업을 수행중인 상태이다

  • 대기중인 스레드가 존재하지 않아 요청온 작업은 대기쿠에서 waiting상태로 기다리고 있다

  • 큐에 보관을 시도하지만 큐가 가득찬 상태이다
  • 이 경우 Executor는 max(maximumPoolSize)사이즈까지 초과스레드를 만들어서 작업을 수행한다
  • Executor는 초과스레드인 스레드3을 만들어서 바로 작업을 지시한다
  • 이때 초과 스레드를 생성했기 때문에 굳이 블로킹 큐에 대기할 필요는 없다

  • 큐가 가득차고 초과스레드도 생성할 수 없는 상태이다
  • 이 경우 큐에 넣을 수도 없고 작업을 수행할 스레드도 만들 수 없어 작업을 거절한다
    • RejectedExecutionException 이 발생한다

  • 작업을 수행하던 스레드가 모든 작업을 마치고 스레드풀로 복귀 후 블로킹 큐에 대기중인 작업까지 끝마쳤다
  • 초과 생성된 스레드는 3초동안 할당된 작업이 없어서 사라진다



Executor 스레드 풀 관리 전략

자바는 Executors 클래스를 통해 3가지 기본 전력을 제공한다.

  • newSingleThreadPool(): 단일 스레드 풀 전략
  • newFixedThreadPool(nThreads): 고정 스레드 풀 전략
  • newCachedThreadPool(): 캐시 스레드 풀 전략

고정 풀 전략

newFixedThreadPool(nThreads)

  • 스레드 풀에 nThreads 만큼의 기본 스레드를 생성하며 초과 스레드는 생성하지 않는다
  • 큐 사이즈에 제한이 없다
  • 스레드 수가 고정되어 CPU, 메모리 리소스가 어느정도 예측 가능한 안정적인 방식
public static void main(String[] args) throws InterruptedException {
         ExecutorService es = Executors.newFixedThreadPool(2);
      
		og("pool 생성"); 
        printState(es);
         for (int i = 1; i <= 6; i++) {
             String taskName = "task" + i;
             es.execute(new RunnableTask(taskName));
             printState(es, taskName);
		}
		es.close();
		log("== shutdown 완료 ==");
}

캐시풀 전략

newCachedThreadPool()

  • 기본 스레드를 사용하지 않고 60초 생존 주기를 가진 초과 스레드만 사용
  • 초과 스레드의 수는 제한이 없음
  • 큐에 작업을 저장하지 않음
    • 대신 생산자의 요청을 스레드 풀의 소비자 스레드가 직접 받아서 바로 처리
  • 모든 요청이 대기하지 않고 스레드가 바로 처리







출처: https://www.inflearn.com/course/%EA%B9%80%EC%98%81%ED%95%9C%EC%9D%98-%EC%8B%A4%EC%A0%84-%EC%9E%90%EB%B0%94-%EC%A4%91%EA%B8%89-1

profile
자바를 사랑합니다

0개의 댓글

관련 채용 정보