[Java 멀티스레드] ThreadPool과 Executor, ExecutorService 그리고 Future

벼랑 끝 코딩·2025년 3월 10일

Java MultiThread

목록 보기
5/6
post-thumbnail

자바 스레드에 대한 개념을 배우면서 동시에 여러 작업이 가능해졌다!

그렇다면 스레드는 동시에 많은 일을 수행 가능하게 해주는 완벽한 존재인걸까?
아쉽게도 스레드에도 약점은 존재한다.

Thread 문제점

Thread가 가지고 있는 문제점을 살펴보자.

자원 소모

아쉽게도 혁명과 같은 스레드는 우수한 기능을 가진 만큼 비용을 소모한다.

  • 스택 영역에 메모리를 추가로 할당해야 한다.
  • 운영체제 스케줄러에 의해 관리된다.
  • 이처럼 메모리, CPU와 같은 운영체제 자원을 사용한다.
Thread thread = new Thread(new Runnable());

Thread는 다음과 같이 객체를 직접 생성하게 되는데,
thread 생성 코드를 루프 내부에 삽입했다가
문제가 발생하여 대량의 스레드가 생성되는 상황을 생각해보자.

자원을 사용하는 만큼 제한 없이 늘어난 스레드는 서버 다운과 같은 심각한 문제로 이어질 것이다.

Runnable 단점

Thread thread = new Thread(new Runnable());

우리는 Thread 대신 Runnable을 사용하기로 했다.
하지만 Runnable에는 치명적인 단점이 존재한다.

void 타입 메서드

class Task implements Runnable {

	int value;

	@Override
	public void run() {
		// 메서드 바디
	}
}

Runnable을 구현하여 스레드를 생성할 경우, run() 메서드를 오버라이딩해야 한다.
run() 메서드의 반환 타입은 void이다.

따라서 Runnable에서 연산을 수행하여 결과 값을 얻고 싶다면,
해당 객체에 직접 접근하여 값을 꺼내야 한다.

Task task = new Task();
Thread thread = new Thread(task);

int result = task.value;

Exception throws 불가

public interface Runnable {
    void run();
 }

Runnable 인터페이스의 run() 메서드는 (체크)예외를 던지지 않는다.
따라서 Runnable을 구현하는 클래스도 마찬가지로 예외를 던질 수 없다.
(부모가 던지는 예외의 하위 예외만 자식은 던질 수 있다)

즉, 체크 예외를 모두 잡아야만 하는 번거로움이 있다.


사실상 이 단점들을 극복하지 못하면 사용하기 매우 위험하기 때문에
반드시 해결책 또는 대안이 필요하다.

Executor

자바에서는 스레드 약점을 극복하기 위해 Executor, ExecutorService를 지원한다.

Executor

public interface Executor {
    void execute(Runnable command);
}

Thread, Runnable에서는 run() 메서드를 사용했지만,
Executor 인터페이스를 구현하면 스레드를 실행하기 위해 execute() 메서드를 사용한다.
하지만 스레드를 실행하기 위해서는 추가적인 과정을 거쳐야한다.

이 때 등장하는 것이 바로 ExecutorService이다.

ExecutorService

Thread 단점 중 하나는 자원을 소모하는데
자원 소모에 의해 발생하는 위험에 그대로 노출되는 것이었다.
그렇기에 Thread는 관리되어야 한다.
Thread를 관리하기 위한 방법이 바로 ThreadPool이다.

ThreadPool은 말 그대로 스레드를 저장 및 관리할 수 있는 공간을 말한다.
ExecutorService 인터페이스는 Executor 인터페이스를 확장한 것인데,
ExecutorService의 구현체인 ThreadPoolExecutor로 ThreadPool을 생성할 수 있다.

ExecutorService executorService = new ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize, int keepAliveTime, TimeUnit unit,
new BlockingQueue(), new ThreadPoolExecutor.POLICYTYPE())

Executor에 이은 확장 버전 ExecutorService,
그리고 구현체인 ThreadPoolExecutor까지 도달했다.
ThreadPoolExecutor을 먼저 파헤치고 본격적으로 ExecutorService의 활용법을 알아보자.

ThreadPoolExecutor

ThreadPoolExecutor, 스레드 풀로 스레드를 관리하면 뭐가 좋은걸까?
자원을 소모하는 무거운 존재인 스레드는 생성 시에 일반적인 작업 이상의 시간이 걸린다.
스레드를 대량으로 생성해야 하는 경우 상당한 시간이 걸리기 때문에,
스레드를 미리 생성하고 관리하면 그 시간을 절약할 수 있다.
미리 생성한 스레드의 상태 또한 자유롭게 관리하면서 스레드를 더욱 유연하게 통제할 수 있다.

그럼 스레드 풀을 구성하는 파라미터에 대해 알아보자.

corePoolSize

스레드 풀에서 관리하는 기본 스레드의 수를 의미한다.
corePoolSize가 2인 경우, 스레드 풀 내에는 2개의 스레드가 생성될 수 있다.

스레드 풀은 초기 비어 있는 상태에서 기본적으로 작업이 들어와야 스레드가 생성되지만,
ThreadPoolExecutor의 prestartAllCoreThreads() 메서드를 사용하여
작업이 들어오기 전에 스레드를 미리 생성할 수 있다.

또한 생성된 기본 스레드는 작업을 수행하지 않고 시간이 지나도 없어지지 않지만,
ThreadPoolExecutor의 allowCoreThreadTimeOut(true) 메서드를 사용하여
기본 스레드도 일정 시간 동안 작업을 수행하지 않으면 제거할 수 있다.

maximumPoolSize

스레드 풀에서 관리되는 최대 스레드의 수를 의미한다.
기본 스레드가 모두 작업을 처리하고 있다면, 초과하는 작업은 BlockingQueue에 보관된다.

BlockingQueue 사이즈 마저 초과하는 작업이 들어온다면,
기본 스레드 수(corePoolSize) 보다 최대 스레드의 수(maximumPoolSize)를 더 크게 설정하여
스레드를 추가로 생성하고 BlockingQueue에 밀린 작업을 수행할 수 있다.

===== CASE =====
corePoolSize = 2;  // 기본 스레드 2개
maximumPoolSize = 3;  // 최대 스레드 3개
queue.size() = 2;  // 큐 공간 2개

** task 5개 부여 상황 **

===== PROCESS =====

1. 기본 스레드 = 2 task 수행 (남은 task = 3)
2. queue = 2 task 보관 (남은 task = 1)

// 기본 스레드와 Queue의 공간을 모두 사용하고도 task가 남아 있음(남은 task = 1)
3. 초과 스레드 생성(maximumPoolSize = 3 도달)

4. 초과 스레드 = 1 task 수행 (남은 task = 0)
5. 2개의 기본 스레드 작업 완료, queue의 2 task 각각 수행

주의 사항

LinkedBlocingQueue와 같이 Queue의 공간이 무한대인 경우 maximumPoolSize는 의미가 없다.
사용하는 Queue 종류와 Queue Size를 확인하고 maximumPoolSize를 설정해야 한다.

keepAliveTime

스레드 생존 시간을 의미한다.
기본 스레드가 모두 작업을 처리하고 Queue의 공간도 가득 차서 생성한 초과 스레드 또는
작업이 너무 없어 놀고 있는 기본 스레드는 설정에 따라 keepAliveTime이 지나면 제거될 수 있다.

BlockingQueue

기본 스레드가 모두 작업을 수행하고 있을 때 초과로 요청된 작업을 보관하는 Queue이다.
멀티스레드 상황에서 사용할 수 있는 BlockingQueue를 파라미터로 전달해야 한다.

ThreadPoolExecutor.POLICYTYPE()

기본 스레드가 모두 작업을 수행하고,
Queue의 공간도 모두 꽉 찬 상태에서
maximumPoolSize에 따라 생성한 초과 스레드도 모두 작업을 수행중이라면?

작업을 거절해야 한다. 안되는 것을 되게 할 수는 없다!
어떻게 거절할 지 대응하는 방식을 거절 정책이라고 한다.

  • AbortPolicy()

default 거절 정책이다.
예외(RejectedExecutionException)이 발생되며 작업을 거절한다.

  • DiscardPolicy()

버리는(Discard) 거절 정책이다.
말 그대로 초과로 요청된 작업을 아무런 조치 없이 버린다.
실제로도 아무런 코드 구현이 되어있지 않다.

 public static class DiscardPolicy implements RejectedExecutionHandler {
 	public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    	// ** 메서드 바디 없음 **
    }
}
  • CallerRunsPolicy()

요청한 스레드(Caller)가 대신 작업을 수행(Runs)하는 정책이다.
요청한 스레드는 새로운 작업을 제출하는 스레드이기 때문에,
작업을 제출하는 속도가 저하되면서 스레드의 수와 작업의 수를 맞출 수 있다.
CallerRunsPolicy() 정책은 작업을 수행하는 스레드가 종료된 후에도 계속해서 작업을 수행하기 때문에,
종료된 스레드는 작업을 수행하지 않도록 설계해야 한다.

  • 사용자 정의 정책

RejectedExecutionHandler 인터페이스를 구현하여 거절 정책을 직접 만들 수 있다.

ThreadPool 적정 스레드 수

그렇다면 스레드는 얼마나 만들어 두는 것이 좋을까?
기본 스레드, 최대 스레드, 큐 사이즈는 어떻게 설정하면 좋을까?
스레드 풀을 설정하는 것을 풀 전략이라고 한다.

단일 스레드 풀 전략

ExecutorService executorService = new SingleThreadPool();

스레드 풀에 스레드가 1개인 것을 의미한다.
LinkedBlockingQueue를 사용하여 큐 사이즈에는 제한이 없다.
보통 테스트용으로 사용한다.

고정 풀 전략

ExecutorService executorService = new FixedThreadPool(size);

스레드 개수를 고정하여 관리하는 전략을 의미한다.
LinkedBlockingQueue를 사용하여 큐 사이즈에는 제한이 없다.
스레드가 고정되어 있기 때문에, CPU와 메모리 리소스가 예측이 가능한 안정적인 방식이다.

사용자가 늘어날 경우 Queue에 작업은 계속 쌓이게 되는데,
스레드 수는 고정적이어서 성능의 저하로 이어지게 되는 단점이 있다.

캐시 풀 전략

ExecutorService executorService = new CachedThreadPool();

기본 스레드 없이 필요할 때만 캐시로 스레드를 사용하는 것을 의미한다.
Queue에 저장 공간을 두지 않고 바로 스레드에게 처리를 요청하는
SynchronousQueue를 사용하며 최대 스레드에 제한이 없어 작업이 추가될 때마다 스레드가 생성된다.
스레드의 생존 주기는 60초이다.

필요할 때만 스레드를 생성하여 최적화할 수 있다는 장점이 있지만,
작업이 급격하게 증가하는 경우 스레드가 과다 생성되어 서버가 다운될 수 있는 위험이 있다.

이상적인 풀 전략

그렇다면 어떤 풀 전략을 선택해서 운영해야 할까?
정답은 테스트용 풀 전략인 단일 스레드 풀 전략을 제외하고,
고정 스레드 풀 전략과 캐시 풀 전략을 혼합하여 사용하는 것이다.

어떻게 두 전략을 합쳐서 사용할 수 있을까?
먼저 기본 스레드를 설정하여 고정적으로 사용할 수 있는 스레드를 확보한다.
기본 스레드만 고정적으로 사용할 경우, 성능이 저하될 수 있기 때문에
작업이 많은 위급한 상황인 경우 초과 스레드를 생성하여 상황에 대처한다.
초과 스레드를 무한으로 생성하는 경우 서버가 다운될 수 있으니,
초과 스레드는 이상적인 숫자를 설정하여 적절하게 제한한다.

기본 스레드, 초과 스레드.
어디서 많이 들어본 단어이다.
그렇다, 앞서 ThreadPoolExecutor의 파라미터인 corePoolSize와 maximumPoolSize이다.
두 스레드 전략을 혼합하여 생성할 수 있는 객체가 바로 ThreadPoolExecutor이다!

ExecutorService executorService = new ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize, int keepAliveTime, TimeUnit unit,
new BlockingQueue(), new ThreadPoolExecutor.POLICYTYPE())

트래픽에 따라 적절한 스레드 수인 corePoolSize를 설정하고
작업이 많은 상황을 대처하면서 서버가 안정적으로 운영될 수 있도록
초과로 생성할 수 있는 maximumPoolSize를 설정한다.
스레드의 생명 주기를 적절하게 설정한다.
Queue 사이즈를 설정하여 Queue 사이즈를 초과하는 경우 거절 정책을 활용할 수 있도록 한다.

이제 스레드 풀에 스레드를 생성하여 자원 소모 시 발생하는 스레드의 약점을 극복할 수 있다!

Callable

Executor를 확장한 ExecutorService를
ThreadPoolExecutor 구현체를 통하여 객체 생성할 수 있었다.
이제 스레드 풀에 생성된 스레드를 execute() 메서드를 호출하면
Thread, Runnable의 run() 메서드를 호출하는 것과 같이 스레드를 실행할 수 있다.

하지만 스레드를 실행하려면 작업을 스레드 풀에 전달해야 한다.
또한 자원 소모에서 오는 스레드의 약점은 극복했지만,
여전히 Runnable의 void 타입과 Exception을 throws하지 못하는 단점을 극복하지 못했다.

사실상 사용하기 어려운 Runnable을 쓸 필요가 있을까?

Runnable → Callable

Runnable은 반환 타입이 없어 불편하고, 예외도 모두 잡아야해서 번거롭다.
이참에 아예 새롭게 개편해버리자.
그렇게 등장한 것이 바로 Callable이다.

public interface Callable<V> {
    
    V call() throws Exception;
}

Callable은 run() 메서드가 아닌 call() 메서드를 호출한다.
Runnable에는 없던 반환 타입이 등장했으며, 예외 또한 던질 수 있다.

이제 Runnable 인터페이스 대신 Callable 인터페이스를 구현하면 된다!

반환 결과 get

이제 우리는 Callable을 구현하여 스레드에서 생성된 결과물, 즉 반환 타입을 받아볼 수 있다.
신이나서 싱글벙글하며 계산기 작업을 수행하는 스레드를 만들었고 결과를 출력하려고 한다.

여기서 문제가 발생한다.
호출한 스레드와 생성된 스레드는 동시에 실행된다.
너무 복잡한 계산을 시켜서 계산이 오래 걸리는 경우,
호출한 스레드가 결과를 반환받는 시점에 계산이 끝나지 않았다면 어떻게 될까?

결과를 기다려야 하지 않을까?
계산 후 결과를 반환하는 스레드를 기다려야 한다면,
동시에 실행되지 않는 이 상황은 멀티스레드라고 할 수 있을까?

이 문제를 해결하기 위한 것이 바로 Future이다.

Future

Future, 말 그대로 미래에 값을 가지는 존재를 의미한다.

계산이 복잡한 경우 결과를 바로 반환받을 수 없다.
그렇다고 해서 얼마나 시간이 필요한지 모르는 스레드를 위해
생성과 동시에 기다릴 수도 없다. 그건 동시에 실행되는게 아니다!

따라서 스레드의 생성과 결과를 반환하는 주체를 분리하여,
멀티스레드 환경을 유지하고 결과도 반환받을 수 있는 환경을 구축하는 것이다.

우리는 Callable을 통해 작업을 생성해서 스레드 풀에 제출하면
스레드는 결과로 Future을 반환한다.
이 과정에서 스레드의 세상은 멈추지 않는다.
우리가 Future을 통해 결과를 반환하려고 할 때,
아직 계산이 끝나지 않아 Future에 결과가 없을 때에만 일시적으로 호출한 스레드가 멈춘다.
동시에 실행되면서 결과를 반환받을 수도 있는 환경이 드디어 만들어졌다.

Future이 가지고 있는 메서드에 대해 알아보자.

get(), get(long timeout, TimeUnit unit)

Callable 작업이 수행한 결과를 반환할 수 있는 메서드이다.
반복해서 말한 것처럼, 결과를 반환하는 계산이 완료되지 않은 경우 호출한 스레드는 멈춘다.
이것을 실행을 막는다고 해서 블로킹(Blocking) 메서드라고 한다.

future.get();  // 계산이 완료되지 않음
System.out.println("계산 완료");  // ** 계산 완료가 출력되지 않음 **

대기 시간을 파라미터로 전달하여 원하는 시간 만큼만 결과를 기다릴 수 있다.

생성과 결과 반환 분리

동시에 작업을 수행하기 위해서 생성과 결과를 반환하는 작업을 분리했다.
작업을 수행하기 위한 스레드를 생성하는 것은 블로킹 메서드가 아니지만,
결과를 반환하는 메서드는 블로킹 메서드이다.

따라서 스레드가 여러 개인 경우, 모든 스레드를 생성하고 결과를 반환해야 한다.

// 스레드1 생성
future1.get();  // Blocking 메서드

// 스레드2 생성
future2.get();

이 경우 스레드 2가 생성되기 전에 코드는 Blocking되어 스레드2를 생성하지 못하고
결국에 스레드가 동시에 실행될 수 없다.

// 스레드1 생성
// 스레드2 생성

future1.get();
future2.get();

모든 스레드의 생성을 선행하고 결과를 반환 받아야만
Blocking된 상태에서도 다른 스레드가 영향을 받지 않고 동시에 실행될 수 있다!

cancel()

스레드 동작을 취소하는 메서드이다.

future.cancel(true);  // 실행 중이어도 중단
future.cancel(false); // 실행 중이라면 중단하지 않음

future.cancel(true) 메서드를 호출하면 스레드가 실행 중이어도 interrupt로 중단한다.
future.cancel(false) 메서드를 호출하면 실행 중인 경우에는 중단하지 않는다.
취소에 성공하면 true, 실패하면 false를 반환한다.

Future에는 Exeception도 저장되는데,
취소한 경우에는 get() 메서드를 호출하면 발생한 예외를 반환한다.

isCancelled()

future.isCancelled();

스레드 작업이 취소되었는지 확인하는 메서드이다.

isDone()

future.isDone();

스레드의 작업이 완료되었는지 확인하는 메서드이다.

state()

future.state();

Future의 상태를 반환하는 메서드이다.

  • RUNNING : 작업 실행 중
  • SUCCESS : 작업 완료
  • FAILED : 작업 실패
  • CANCELLED : 작업 취소

ExecutorService 메서드

ExecutorService를 활용하기 위해
ThreadPoolExecutor로 스레드 풀을 생성하는 과정과
Runnable의 약점을 극복하여 결과를 반환 받고 예외를 던질 수 있는 Future까지 알아봤다.

이제 ExecutorService를 활용할 수 있는 준비가 끝났다.
본격적으로 ExecutorService의 메서드에 대해 알아보자.

ExecutorService executorService = new ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize, int keepAliveTime, TimeUnit unit,
new BlockingQueue(), new ThreadPoolExecutor.POLICYTYPE())

execute()

executoreService.execute();

단일 스레드를 실행하는 메서드이다.
void 타입으로 결과를 반환하지 않고 스레드만 실행한다.

submit()

Future<T> future = executoreService.submit(new Callable())

execute()와 마찬가지로 단일 스레드를 실행하지만, 결과를 반환하는 메서드이다.
결과로는 Future 객체를 반환하고
스레드의 결과물을 얻고 싶은 경우 Future의 get() 메서드를 호출해야 한다.

Runnable도 submit() 할 수 있다.
Runnable submit() 이후 get() 메서드를 호출하면 마찬가지로 Blocking 하고
Runnable은 반환 타입이 없기 때문에 결과로는 null을 반환한다.

invokeAll()

List<Future<T>> futures = executoreService.invokeAll(Collection<T> tasks);

모든 Callable 작업을 컬렉션으로 제출한다.
모든 Callable의 결과를 Future List를 반환한다.

invokeAny()

T result = executoreService.invokeAny(Collection<T> tasks);

invokeAll()과 마찬가지로 모든 Callable 작업을 컬렉션으로 제출한다.
하지만 Future를 반환한 후 get() 메서드를 호출하여 결과를 반환하는 것이 아닌,
Callable 작업들 중 가장 먼저 완료된 하나의 작업의 결과를 직접 반환한다.
get() 메서드와 마찬가지로 Blocking 메서드이다.

shutdown()

executoreService.shutdown();

스레드 사용은 반드시 종료해주어야 한다.
새로운 작업을 차단하면서 이미 제출된 작업을 모두 완료하고 종료하는 메서드이다.

close()

executoreService.close();

shutdown()과 같이 스레드 사용을 종료하는 메서드이다.
자바 19버전 이후로 지원하며, 19버전 이전은 shutdown() 메서드를 호출해야 한다.
내부적으로 shutdown() 메서드를 호출한 이후,
종료가 지연되는 경우나 interrupt가 발생하면 shutdownNow() 메서드를 호출한다.
shutdown() + shutdownNow()를 수행하는 메서드이다.

shutdownNow()

List<Runnable> runnables = executoreService.shutdownNow();

shutdown() 메서드는 새로운 작업은 차단하지만 제출된 작업은 모두 완료하는 메서드였다.
shutdownNow()는 새로운 작업은 물론 기존 작업까지 중단하고 즉시 종료한다.
실행 대기 중인 작업, 실행 중인 작업 모두 interrupt를 발생하여 중단한다.
중단된 작업들을 반환한다.

awaitTermination()

executoreService.awaitTermination(long time, TimeUnit unit);

서비스 종료(shutdown)까지 대기하는 메서드이다.
대기하기 때문에 블로킹 메서드이다.
대기할 시간을 지정할 수 있다.

안전하게 서비스가 종료되었는지 확인할 수 있고
시간이 초과된 경우 추가 조치를 수행할 수 있다.

isShutdown()

executoreService.isShutdown();

서비스가 종료 메서드가 호출되었는지 확인하는 메서드이다.
shutdown(), shutdownNow() 메서드 호출 여부를 확인한다.

isTerminated()

서비스가 종료되었는지 확인하는 메서드이다.
shutdown(), shutdownNow() 호출 이후 남은 작업까지 완료되었는지 확인한다.

마무리

스레드의 약점을 보완한 진화된 스레드를 이해하기 위해 아주 머나먼 여정이었다.
자원 소모에서 오는 약점을 보완하기 위한 스레드풀 ThreadPoolExecutor을 알아봤고
Runnable의 단점인 반환 타입, 예외 던지기를 극복하는 Callable과
Future에 대해 알아봤다.

그리고 Executor, ExecutorService의 메서드로 이를 활용할 수 있다.
잊지 말자. 멀티스레드를 정복하기 위하여.

profile
복습에 대한 비판과 지적을 부탁드립니다

0개의 댓글