문제: 자바에서 직접 Thread를 생성해 사용할 경우, 아래와 같은 3가지 문제가 존재합니다.
1. 스레드 생성 비용으로 인한 성능 문제
2. 스레드 관리의 어려움
무한정 생성 불가 (CPU/메모리 한계)
이벤트 폭주 시:
예: 마케팅 이벤트 → 평소 100개의 스레드로 충분했지만 갑자기 10,000개의 요청
→ 서버 과부하, 장애 가능성이 있습니다.
스레드가 어디서, 얼마나 있는지 관리가 안됩니다. → 정상 종료 및 인터럽트 불가능
③ Runnable 인터페이스의 제약
public interface Runnable {
void run(); // 반환값 없음, 예외 던질 수 없음
}
문제점
join() 으로 기다려야 합니다.run()은 throws Exception을 사용할 수 없습니다.즉, 실행 결과 및 예외 정보를 직접 얻기 어렵습니다.
해결책 요약
→ 이 모든 문제를 자바에서는 Executor 프레임워크로 해결 가능합니다!
Executor 프레임워크는 자바에서 제공하는 멀티스레드 작업 처리 시스템입니다.
구조 요약
1. Executor 인터페이스 (기본)
public interface Executor {
void execute(Runnable command);
}
Runnable을 받아서 실행합니다.2. ExecutorService 인터페이스
public interface ExecutorService extends Executor, AutoCloseable {
<T> Future<T> submit(Callable<T> task);
@Override
default void close(){...}
...
}
Executor의 확장 인터페이스입니다.submit)shutdown, close)Future)ExecutorService를 사용합니다.3. 대표 구현체: ThreadPoolExecutor
ExecutorService의 가장 널리 사용되는 기본 구현입니다.로그 출력 유틸리티 구현하기
package thread.executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import static util.MyLogger.log;
public abstract class ExecutorUtils {
public static void printState(ExecutorService executorService) {
if (executorService instanceof ThreadPoolExecutor poolExecutor) {
int pool = poolExecutor.getPoolSize(); // 스레드 풀 내 전체 스레드 개수
int active = poolExecutor.getActiveCount(); // 작업 중인 스레드 수
int queuedTasks = poolExecutor.getQueue().size(); // 큐에 대기 중인 작업 수
long completedTask = poolExecutor.getCompletedTaskCount(); // 완료된 작업 수
log("[pool=" + pool + ", active=" + active + ", queuedTasks=" + queuedTasks + ", completedTask=" + completedTask + "]");
} else {
log(executorService);
}
}
// 추가
public static void printState(ExecutorService executorService, String taskName) {
if (executorService instanceof ThreadPoolExecutor poolExecutor) {
int pool = poolExecutor.getPoolSize();
int active = poolExecutor.getActiveCount();
int queuedTasks = poolExecutor.getQueue().size();
long completedTask = poolExecutor.getCompletedTaskCount();
log(taskName + " -> [pool=" + pool + ", active=" + active + ", queuedTasks=" + queuedTasks + ", completedTask=" + completedTask + "]");
} else {
log(executorService);
}
}
}
| 항목 | 의미 |
|---|---|
pool | 스레드 풀 내 전체 스레드 수 |
active | 작업 중인 스레드 수 |
queuedTasks | 큐에 대기 중인 작업 수 |
completedTasks | 완료된 작업 수 |
package thread.executor;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class RunnableTask implements Runnable {
private final String name;
private int sleepMs = 1000; // 1초 걸리는 작업
public RunnableTask(String name) {
this.name = name;
}
public RunnableTask(String name, int sleepMs) {
this.name = name;
this.sleepMs = sleepMs;
}
@Override
public void run() {
log(name + " 시작");
sleep(sleepMs); // 작업 시간 시뮬레이션
log(name + " 완료");
}
}
package thread.executor;
import static thread.executor.ExecutorUtils.printState;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExecutorBasicMain {
public static void main(String[] args) {
ThreadPoolExecutor es = new ThreadPoolExecutor(2, 2, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
log("== 초기 상태 ==");
printState(es);
es.execute(new RunnableTask("taskA"));
es.execute(new RunnableTask("taskB"));
es.execute(new RunnableTask("taskC"));
es.execute(new RunnableTask("taskD"));
log("== 작업 수행 중 ==");
printState(es);
sleep(3000);
log("== 작업 수행 완료 ==");
printState(es);
es.close();
log("== shutdown 완료 ==");
printState(es);
}
}
21:20:15.171 [ main] == 초기 상태 ==
21:20:15.196 [ main] [pool=0, active=0, queuedTasks=0, completedTask=0]
21:20:15.198 [ main] == 작업 수행 중 ==
21:20:15.199 [pool-1-thread-2] taskB 시작
21:20:15.199 [pool-1-thread-1] taskA 시작
21:20:15.199 [ main] [pool=2, active=2, queuedTasks=2, completedTask=0]
21:20:16.210 [pool-1-thread-1] taskA 완료
21:20:16.210 [pool-1-thread-2] taskB 완료
21:20:16.210 [pool-1-thread-1] taskC 시작
21:20:16.210 [pool-1-thread-2] taskD 시작
21:20:17.215 [pool-1-thread-1] taskC 완료
21:20:17.215 [pool-1-thread-2] taskD 완료
21:20:18.215 [ main] == 작업 수행 완료 ==
21:20:18.216 [ main] [pool=2, active=0, queuedTasks=0, completedTask=4]
21:20:18.217 [ main] == shutdown 완료 ==
21:20:18.218 [ main] [pool=0, active=0, queuedTasks=0, completedTask=4]
설명
LinkedBlockingQueue (무한 큐)실행 순서 요약
taskA, taskB → 바로 실행 (2개의 스레드 사용)taskC, taskD → 큐에 대기taskA, taskB 완료 → 스레드 반납taskC, taskD 순차 실행ThreadPoolExecutor란?
ThreadPoolExecutor는 자바의 ExecutorService 인터페이스를 구현한 스레드 풀의 표준 구현체입니다.
스레드 풀을 직접 구성하면서, 다양한 동작 정책을 제어할 수 있도록 설계된 매우 유연한 클래스입니다.
ThreadPoolExecutor 생성자
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue
)
| 파라미터 | 설명 |
|---|---|
corePoolSize | 항상 유지되는 기본 스레드 수입니다. 작업이 없더라도 유지됨. |
maximumPoolSize | 생성 가능한 최대 스레드 수입니다. 큐가 가득 찼을 때 확장용 |
keepAliveTime | corePoolSize를 초과해서 생성된 스레드가 작업이 없을 때 얼마 동안 유지할지 |
unit | keepAliveTime의 시간 단위 (TimeUnit.SECONDS 등) |
workQueue | 작업을 담아두는 BlockingQueue입니다. (예: LinkedBlockingQueue) |
작동 흐름 요약 (중요!)
1. 작업 요청이 들어옴 (es.execute(task))
2. 현재 스레드 수 < corePoolSize → 새 스레드 생성
3. corePoolSize 이상이면 → workQueue에 task 저장
4. queue가 꽉 찼고 스레드 수 < maximumPoolSize → 스레드 추가 생성
5. queue가 꽉 찼고 스레드 수 == maximumPoolSize → rejectHandler 동작
주요 동작 요소
1. corePoolSize
2. maximumPoolSize
corePoolSize == maximumPoolSize이면 고정 크기 스레드 풀이 됩니다.3. keepAliveTime + unit
corePoolSize를 초과하는 스레드는 일정 시간동안 작업이 없으면 제거4. BlockingQueue<Runnable>
스레드가 바쁠 때 작업을 담아둘 대기열
큐 종류에 따라 동작이 달라집니다:
| 큐 타입 | 특징 |
|---|---|
LinkedBlockingQueue | 무한 큐 (기본), 큐에 무조건 저장 |
ArrayBlockingQueue | 크기 제한 있음 |
SynchronousQueue | 저장 공간 없음, 직접 전달 방식 |
PriorityBlockingQueue | 우선순위 기반 큐 |
스레드 수 결정 방식 요약
| 상황 | 동작 |
|---|---|
현재 스레드 수 < corePoolSize | 무조건 새 스레드 생성 |
현재 스레드 수 ≥ corePoolSize && 큐가 비어 있음 | 큐에 저장 |
큐가 꽉 참 && 스레드 수 < maximumPoolSize | 새 스레드 생성 |
큐가 꽉 참 && 스레드 수 ≥ maximumPoolSize | RejectedExecutionHandler 호출 (예외 발생 가능) |
기타 설정
RejectedExecutionHandler: 작업을 처리할 수 없을 때 호출되는 거부 정책 처리자입니다.
자주 쓰이는 정책들:
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 기본
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
ThreadPoolExecutor를 쉽게 생성하는 방법
Executors 유틸 클래스가 편의 메서드를 제공합니다:
| 메서드 | 설명 |
|---|---|
Executors.newFixedThreadPool(n) | corePoolSize == maxPoolSize == n |
Executors.newCachedThreadPool() | corePoolSize = 0, max = Integer.MAX_VALUE, keepAliveTime = 60s |
Executors.newSingleThreadExecutor() | 단일 스레드 |
Executors.newScheduledThreadPool(n) | 예약 및 반복 작업 가능 |
→ 내부적으로 모두 ThreadPoolExecutor를 사용합니다.
시각화
작업 요청
↓
+-------------+ execute() +-------------------------+
| Caller(main)| ───────────▶ | ThreadPoolExecutor |
+-------------+ | |
| 1. 작업 실행할 스레드 선택
| 2. 큐 저장 or 스레드 생성
| 3. 실행 or 거부
+-------------------------+
Runnable 인터페이스
package java.lang;
public interface Runnable {
void run(); // 반환값 없음, 예외도 못 던짐
}
단점 요약
| 항목 | 설명 |
|---|---|
| ❌ 반환값 없음 | run()은 void → 작업 결과를 리턴할 수 없음 |
| ❌ 예외 선언 불가 | throws Exception을 선언할 수 없음 |
| ❗ 결과를 공유해야 함 | 결과를 공유하려면 필드에 저장하고 join()으로 기다려야 함 |
Runnable 사용 예시
MyRunnable task = new MyRunnable();
Thread thread = new Thread(task);
thread.start();
thread.join(); // 결과 기다림
int result = task.value;
값은 내부 필드에 저장 → main 스레드에서 꺼냅니다.
결과적으로...
Callable<T> 인터페이스
package java.util.concurrent;
public interface Callable<V> {
V call() throws Exception;
}
| 특징 | 설명 |
|---|---|
| ✅ 반환값 있음 | V 타입으로 결과 반환 가능 |
| ✅ 예외 처리 가능 | throws Exception 선언 가능 |
Future<T> 인터페이스
Future<Integer> future = executorService.submit(new MyCallable());
Integer result = future.get(); // 결과 기다리기
| 기능 | 설명 |
|---|---|
get() | 결과를 기다리고 받음 (블로킹) |
isDone() | 작업 완료 여부 확인 |
cancel() | 작업 취소 가능 |
isCancelled() | 취소 여부 확인 |
submit()과 get()의 흐름
1. submit() 호출 → 작업 큐에 등록
2. 스레드 풀에서 작업 실행
3. Future 객체는 즉시 반환됨 (작업은 아직 안 끝났음)
4. future.get() 호출 시:
- 작업 완료 → 결과 즉시 반환
- 미완료 → 대기 (블로킹)
Runnable vs Callable 비교
| 항목 | Runnable | Callable |
|---|---|---|
| 반환값 | ❌ 없음 | ✅ 있음 |
| 예외 처리 | ❌ 불가 | ✅ 가능 |
| 스레드 직접 제어 | 필요 | 필요 없음 |
| 코드 간결성 | 복잡함 | 직관적 |
예시 코드
package thread.executor.future;
import java.util.Random;
import java.util.concurrent.*;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class CallableMainV1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(1); // ExecutorService 생성
Future<Integer> future = es.submit(new MyCallable());
Integer result = future.get();
log("result value = " + result);
es.close();
}
static class MyCallable implements Callable<Integer> {
@Override
public Integer call() {
log("Callable 시작");
sleep(2000);
int value = new Random().nextInt(10);
log("create value = " + value);
log("Callable 완료");
return value;
}
}
}
submit()으로 작업 제출Future.get()으로 결과 수신 (2초 블로킹)기존 코드
ExecutorService es = new ThreadPoolExecutor(1,1,0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
편의 코드
ExecutorService es = Executors.newFixedThreadPool(1);
실행 결과
14:39:47.764 [pool-1-thread-1] Callable 시작
14:39:49.776 [pool-1-thread-1] create value = 4
14:39:49.776 [pool-1-thread-1] Callable 완료
14:39:49.777 [ main] result value = 4
먼저 MyCallable을 구현하는 부분을 살펴봅니다.
static class MyCallable implements Callable<Integer> {
...
}
<Integer>로 선언했습니다.Runnable 코드와 비슷한데, 유일한 차이는 결과를 필드에 담아두는 것이 아니라, 결과를 반환한다는 점 입니다.submit()
ExcutorService가 제공하는 submit()을 통해 Callable을 작업으로 전달할 수 있습니다.
<T> Future<T> submit(Callable<T> task); //인터페이스 정의
public static void main(String[] args) throws ExecutionException, InterruptedException {
...
Future<Integer> future = es.submit(new MyCallable());
...
}
MyCallable 인터페이스가 블로킹 큐에 전달되고, 스레드 풀의 스레드 중 하나가 이 작업을 실행할 것 입니다.
이때 작업의 처리 결과는 직접 반환되는 것이 아니라, Future라는 특별한 인터페이스를 통해 반환됩니다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
...
Future<Integer> future = es.submit(new MyCallable());
Integer result = future.get();
...
}
future.get()은 InterruptException, ExcutionException 체크 예외를 던집니다. 여기서는 잡지말고 간단하게 밖으로 던지도록 작성했습니다.
목표: 1부터 100까지의 합을 구하는 문제를 두 개의 작업으로 나누어 병렬 처리합니다.
1 ~ 50 → task151 ~ 100 → task2(1) Runnable 버전 예제
코드 요약
Thread thread1 = new Thread(new SumTask(1, 50));
Thread thread2 = new Thread(new SumTask(51, 100));
thread1.start(); thread2.start();
thread1.join(); thread2.join();
문제점
result를 멤버 필드로 저장합니다.join()으로 결과 기다립니다.(2) Callable + Future 버전 예제
코드 요약
ExecutorService es = Executors.newFixedThreadPool(2);
Future<Integer> future1 = es.submit(new SumTask(1, 50));
Future<Integer> future2 = es.submit(new SumTask(51, 100));
int result = future1.get() + future2.get();
장점
| 항목 | 설명 |
|---|---|
| ✅ 반환값 명확 | call()의 return 값으로 바로 전달 |
| ✅ 예외 처리 가능 | throws Exception 사용 가능 |
| ✅ 스레드 관리 불필요 | join(), Thread 사용 없음 |
코드 흐름
Future<Integer> future = executor.submit(new MyCallable());
이 시점에 MyCallable은 실행되지 않습니다.
→ 내부적으로 FutureTask라는 객체로 감싸져서 큐에 들어갑니다.
Integer result = future.get(); // 필요 시점에만 블로킹
실행 흐름 요약
1. submit() 호출 → Future 즉시 반환 (비동기)
2. 작업은 ThreadPoolExecutor 내부에서 실행 대기
3. 실행 완료 시 → Future 내부에 결과 저장
4. get() 호출 시:
- 완료됨 → 즉시 반환
- 미완료 → 블로킹 대기
단순화된 구조
// 이전 방식
Thread t = new Thread(...);
t.start();
t.join(); return result;
// Future 방식
Future<Integer> future = executor.submit(callable);
Integer result = future.get();
→ 복잡한 스레드 생성, 공유, 종료 코드 제거
→ 싱글 스레드처럼 깔끔한 멀티스레드 처리
나쁜 예
Future<Integer> future = es.submit(task);
Integer result = future.get(); // 즉시 대기
submit()과 동시에 get() 호출하면 → 직렬 처리처럼 동작 (총 4초)
좋은 예
Future<Integer> f1 = es.submit(task1);
Future<Integer> f2 = es.submit(task2);
// 요청 완료 후 결과 수신
Integer r1 = f1.get();
Integer r2 = f2.get();
동시에 작업 요청 → 결과 병렬 수신
정리
| 항목 | 설명 |
|---|---|
submit() | 작업 요청, Future 반환 |
Future.get() | 결과 수신, 필요 시 대기 |
Callable | Runnable보다 유연: 값 반환 + 예외 처리 |
FutureTask | Runnable과 Future를 함께 구현한 내부 클래스 |
Future 주요 API 정리Future<T> 인터페이스는 비동기 작업의 상태를 확인하고 결과를 가져오며, 작업을 취소할 수 있는 기능을 제공합니다.
Future<T> 주요 메서드 요약
| 메서드 | 설명 |
|---|---|
get() | 작업이 완료될 때까지 대기 → 완료 후 결과 반환 |
get(long timeout, TimeUnit unit) | 일정 시간 동안만 대기, 초과 시 TimeoutException 발생 |
cancel(boolean mayInterruptIfRunning) | 작업 취소 시도 |
isDone() | 작업 완료 여부 확인 (true 반환: 정상 종료/예외/취소 포함) |
isCancelled() | 작업이 취소되었는지 확인 |
state() (Java 19+) | 작업 상태 반환 (RUNNING, SUCCESS, FAILED, CANCELLED) |
cancel(boolean mayInterruptIfRunning)
future.cancel(true); // 실행 중이라면 interrupt 시도
| 파라미터 값 | 의미 |
|---|---|
true | 실행 중이라면 Thread.interrupt()로 작업 중단 시도 |
false | 작업이 실행 중이면 그냥 내버려둠 (취소만 표시) |
작업이 실행되기 전이라면 즉시 취소됨
실행 중이라면 인터럽트 허용 여부에 따라 중단 시도
get()과 get(timeout)의 차이
Integer result = future.get(); // 무한 대기
Integer result = future.get(3, TimeUnit.SECONDS); // 3초 초과 시 TimeoutException
get()은 작업이 완료될 때까지 무조건 기다림get(timeout)은 일정 시간 초과하면 예외 발생isDone()과 isCancelled()
isDone():true 반환 시점:isCancelled():true이면 cancel() 호출되었고, 취소 성공한 상태cancel() 실습 결과 예mayInterruptIfRunning = true
future.cancel(true);
InterruptedException 처리 필요Future.get() 호출 시 CancellationException 발생mayInterruptIfRunning = false
future.cancel(false);
Future.get() 시에도 CancellationException정리: 상태 흐름
초기 상태: NEW
→ 작업 제출: RUNNING
→ 정상 완료: SUCCESS
→ 예외 발생: FAILED
→ 취소됨: CANCELLED
Java 19 이상에서는 future.state()로 위 상태를 직접 조회 가능
예외 처리 정리
| 상황 | get() 예외 |
|---|---|
| 작업 중 인터럽트 | InterruptedException |
| 작업 내부 예외 발생 | ExecutionException (원인 포함) |
cancel(true) 후 get() | CancellationException |
invokeAll, invokeAny)기존 방식의 문제
submit()을 반복 호출해야 했고,Future를 받아서 일일이 get() 호출 필요 → 복잡 + 코드 반복1. invokeAll(Collection<Callable<T>> tasks) 특징
예시
List<Callable<Integer>> tasks = List.of(
() -> { return 1; },
() -> { return 2; },
() -> { return 3; }
);
List<Future<Integer>> futures = executorService.invokeAll(tasks);
for (Future<Integer> future : futures) {
Integer result = future.get(); // 순서대로 결과 받음
}
2. invokeAny(Collection<Callable<T>> tasks) 특징
예시
List<Callable<String>> tasks = List.of(
slowTask1, fastTask2, slowTask3
);
String result = executorService.invokeAny(tasks);
가장 빠른 작업의 결과만 받음
비교 요약
| 메서드 | 특징 | 반환값 |
|---|---|---|
invokeAll | 모든 작업 완료 대기 | List<Future<T>> |
invokeAny | 가장 빠른 작업 하나만 | T (단일 결과) |
실무 활용 팁
| 사용 상황 | 적절한 메서드 |
|---|---|
| 작업 전체가 필요할 때 | invokeAll |
| 하나만 빠르게 필요할 때 | invokeAny |
문제 1: Future.get() 남용 → 병렬의 이점 무효화
잘못된 예시
Integer sum1 = executor.submit(task1).get(); // 즉시 대기
Integer sum2 = executor.submit(task2).get(); // 또 대기
task1 + task2 → 병렬의 장점 없음✅ 올바른 예시
Future<Integer> f1 = executor.submit(task1); // 먼저 요청
Future<Integer> f2 = executor.submit(task2); // 병렬 요청
Integer sum1 = f1.get(); // 기다림
Integer sum2 = f2.get(); // 이미 완료 → 바로 반환될 가능성 높음
get()은 결과가 필요할 때만 호출문제 2: 취소 처리 누락
Future.cancel(true)를 사용하지 않으면, 실행 중인 작업을 제어할 수 없음문제 3: 예외 발생 시 처리 미흡
try {
future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
...
}
get() 중 예외 발생 시 ExecutionException 래핑됨e.getCause()로 원인 확인 필요문제 4: timeout 설정 없이 get()
future.get(3, TimeUnit.SECONDS);
핵심 개념 정리
| 개념 | 설명 |
|---|---|
ExecutorService | 작업 실행 및 스레드 풀 관리 |
Callable<T> | 값 반환 및 예외 처리 가능한 작업 단위 |
Future<T> | 작업 결과, 상태, 취소를 제어하는 인터페이스 |
submit() | 작업 제출, Future 반환 |
get() | 결과 수신 (필요 시 블로킹) |
invokeAll() | 모든 작업 병렬 실행 후 결과 수집 |
invokeAny() | 가장 빠른 결과 하나만 수집 |
cancel() | 작업 취소, 실행 중이면 인터럽트 가능 |
실무 권장 사용 패턴
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Callable<Integer>> tasks = ...
try {
List<Future<Integer>> results = executor.invokeAll(tasks, 5, TimeUnit.SECONDS);
for (Future<Integer> f : results) {
if (!f.isCancelled()) {
process(f.get());
}
}
} catch (Exception e) {
// 예외 처리
} finally {
executor.shutdown();
}
isCancelled() 확인