F-LAB JAVA · 4주차 · Phase 7 · Executor 프레임워크
이 Unit을 끝내면 다음을 답할 수 있어야 한다.
작업 큐는 스레드 풀이 처리하지 못한 작업을 대기시키는 BlockingQueue 이며, 큐가 가득 차고 스레드도 max 에 도달하면 거부 정책 (RejectedExecutionHandler) 이 작동한다.
LinkedBlockingQueue는 링크드 노드 기반으로 크기를 지정하지 않으면 무제한,ArrayBlockingQueue는 고정 크기 배열,SynchronousQueue는 크기 0 으로 작업을 큐에 담지 않고 워커에게 직접 전달하며,PriorityBlockingQueue는 우선순위 순으로 작업을 꺼낸다.
거부 정책 4가지는AbortPolicy(예외 던짐, 기본),CallerRunsPolicy(호출자 스레드가 직접 실행, 백프레셔),DiscardPolicy(조용히 버림),DiscardOldestPolicy(큐의 가장 오래된 작업을 버리고 새 작업 추가) 다.
큐와 정책의 조합으로 시스템 특성을 결정한다 — 예를 들어 제한된 큐 + CallerRunsPolicy 는 과부하 시 호출자가 직접 처리하여 자연스러운 백프레셔를 제공한다.
무제한 큐는 거부가 일어나지 않지만 OOM 위험이 있으므로, 운영 환경에서는 제한된 큐 + 적절한 거부 정책 이 권장된다.
작업 큐 = 대기 방식:
LinkedBlockingQueue (대기 명단):
- 명단에 이름 적기
- 크기 무제한 (또는 제한)
ArrayBlockingQueue (대기 의자):
- 고정 개수 의자
- 다 차면 못 앉음
SynchronousQueue (직접 안내):
- 대기 X
- 자리 나면 바로 안내
- 자리 없으면 즉시 거부
PriorityBlockingQueue (VIP 우선):
- 우선순위 순 입장
거부 정책 = 만석일 때:
AbortPolicy:
- "죄송합니다 만석입니다" (예외)
CallerRunsPolicy:
- "직접 주방 도와주세요" (호출자 실행)
- → 손님 줄이는 효과 (백프레셔)
DiscardPolicy:
- 조용히 무시 (말없이)
DiscardOldestPolicy:
- 가장 오래 기다린 사람 내보내고 새 손님
→ 큐 (대기 방식) + 거부 정책 (만석 처리) 조합으로 시스템 특성 결정.
1. LinkedBlockingQueue
2. ArrayBlockingQueue
3. SynchronousQueue
4. PriorityBlockingQueue
5. AbortPolicy
6. CallerRunsPolicy
7. DiscardPolicy / DiscardOldestPolicy
8. 큐와 정책 조합
9. 면접 + 자기 점검
LinkedBlockingQueue:
링크드 노드 기반 BlockingQueue.
- 크기 지정 X → 무제한 (Integer.MAX_VALUE)
- 크기 지정 → 제한
- FIFO
LinkedBlockingQueue 특성:
- 링크드 노드 (동적)
- 두 개의 락 (put/take 분리)
- 높은 처리량 (생산/소비 동시)
크기:
- 무제한 (기본)
- 제한 (생성자)
// 무제한
BlockingQueue<Runnable> unbounded = new LinkedBlockingQueue<>();
// 제한
BlockingQueue<Runnable> bounded = new LinkedBlockingQueue<>(1000);
// ThreadPoolExecutor 에서
new ThreadPoolExecutor(
4, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000) // 제한 권장
);
무제한 LinkedBlockingQueue:
크기 지정 안 하면:
- 무제한
- max 도달 X (큐 안 참)
- 작업 무한 쌓임 → OOM
→ 제한 권장 (운영)
LinkedBlockingQueue 의 두 락:
putLock (추가용)
takeLock (제거용)
효과:
- 생산자와 소비자 동시
- 높은 처리량
vs ArrayBlockingQueue:
- 단일 락
@Configuration
public class LinkedBlockingQueueConfig {
// 제한된 큐 (권장)
@Bean
public ThreadPoolExecutor boundedExecutor() {
return new ThreadPoolExecutor(
4, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), // 1000 제한
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 큐 가득 차면 max 확장 → 거부 정책
}
// ❌ 무제한 (위험)
@Bean
public ThreadPoolExecutor unboundedExecutor() {
return new ThreadPoolExecutor(
4, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>() // 무제한 → OOM 위험
);
}
}
LinkedBlockingQueue의 특성은?
답:
1. 정의:
특성:
무제한 위험:
권장:
ArrayBlockingQueue:
고정 크기 배열 기반.
- 생성 시 크기 고정
- FIFO
- 단일 락
ArrayBlockingQueue 특성:
- 고정 크기 (변경 X)
- 배열 (메모리 효율)
- 단일 락 (put/take)
- 공정성 옵션
// 고정 크기 (필수)
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(500);
// 공정성 옵션
BlockingQueue<Runnable> fair = new ArrayBlockingQueue<>(500, true);
// ThreadPoolExecutor
new ThreadPoolExecutor(
4, 10, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(500)
);
| 항목 | Linked | Array |
|---|---|---|
| 구조 | 링크드 노드 | 배열 |
| 크기 | 무제한/제한 | 고정 |
| 락 | 두 개 | 단일 |
| 처리량 | 높음 | 보통 |
| 메모리 | 동적 | 고정 |
메모리:
ArrayBlockingQueue:
- 미리 배열 할당 (고정)
- 예측 가능
- 노드 객체 X
LinkedBlockingQueue:
- 노드 동적 생성
- 가비지 발생
@Configuration
public class ArrayBlockingQueueConfig {
// 고정 크기 (메모리 예측)
@Bean
public ThreadPoolExecutor arrayQueueExecutor() {
return new ThreadPoolExecutor(
4, 10, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(500), // 고정 500
new ThreadPoolExecutor.AbortPolicy()
);
// 메모리 예측 가능 (배열 미리 할당)
// 큐 가득 차면 → max → 거부
}
// 공정성 (순서 보장)
@Bean
public ThreadPoolExecutor fairQueueExecutor() {
return new ThreadPoolExecutor(
4, 10, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(500, true), // 공정
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
ArrayBlockingQueue의 특성은?
답:
1. 정의:
특성:
vs Linked:
메모리:
SynchronousQueue:
크기 0 큐 (저장 X).
- 작업을 담지 않음
- 생산자 ↔ 소비자 직접 전달
- 핸드오프 (handoff)
SynchronousQueue 직접 전달:
put:
- 받을 take 있어야 성공
- 없으면 대기
ThreadPoolExecutor 에서:
- 작업이 바로 워커로
- 워커 없으면 새로 생성
ThreadPoolExecutor + SynchronousQueue:
작업 제출:
- 유휴 워커 있으면 → 즉시 전달
- 없으면 → 새 스레드 (max 까지)
- max 도달 → 거부
→ 큐 대기 X (즉시 처리 또는 생성)
// newCachedThreadPool 의 큐
new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>() // 직접 전달
);
// 작업 = 즉시 스레드 (큐 X)
// 무제한 스레드 (위험)
SynchronousQueue 특성:
- 크기 0
- 핸드오프
- 큐 대기 없음
용도:
- 즉시 처리
- Cached 풀
- 직접 전달
@Configuration
public class SynchronousQueueConfig {
// 직접 전달 (즉시 처리, max 제한)
@Bean
public ThreadPoolExecutor directHandoffExecutor() {
return new ThreadPoolExecutor(
0, // core 0
50, // max 50 (제한!)
60L, TimeUnit.SECONDS,
new SynchronousQueue<>(), // 직접 전달
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 작업 → 즉시 스레드 (큐 X)
// max 50 제한 (무제한 방지)
// 50 초과 → 호출자 실행
}
}
SynchronousQueue의 특성은?
답:
1. 정의:
동작:
Cached 패턴:
용도:
PriorityBlockingQueue:
우선순위 기반 큐.
- 우선순위 순으로 take
- Comparable 또는 Comparator
- 무제한 (동적)
우선순위:
작업이 우선순위 가짐:
- 높은 우선순위 먼저 처리
- FIFO 아님
Comparable:
- compareTo 로 순서
Comparator:
- 생성자에 지정
// 우선순위 작업
class PriorityTask implements Runnable, Comparable<PriorityTask> {
final int priority;
final Runnable task;
PriorityTask(int priority, Runnable task) {
this.priority = priority;
this.task = task;
}
@Override
public int compareTo(PriorityTask other) {
return Integer.compare(other.priority, this.priority); // 높은 우선순위 먼저
}
@Override
public void run() { task.run(); }
}
// PriorityBlockingQueue 사용
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, 4, 0L, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<>()
);
executor.execute(new PriorityTask(10, () -> urgent())); // 높은 우선순위
executor.execute(new PriorityTask(1, () -> normal())); // 낮은 우선순위
PriorityBlockingQueue 주의:
- 무제한 (크기 제한 X)
- OOM 위험
- FIFO 아님 (순서 보장 X)
- 같은 우선순위는 순서 불확실
용도:
- 긴급 작업 우선
- SLA 기반
- VIP 처리
예:
- 긴급 배송 먼저
- 중요 고객 우선
@Service
public class PriorityQueueUsage {
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, 4, 0L, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<>()
);
public void submitShipment(Shipment shipment) {
int priority = calculatePriority(shipment);
executor.execute(new PriorityShipmentTask(priority, shipment));
}
private int calculatePriority(Shipment shipment) {
// 긴급 배송, VIP 고객 등
if (shipment.isUrgent()) return 100;
if (shipment.isVip()) return 50;
return 1;
}
class PriorityShipmentTask implements Runnable, Comparable<PriorityShipmentTask> {
final int priority;
final Shipment shipment;
PriorityShipmentTask(int priority, Shipment shipment) {
this.priority = priority;
this.shipment = shipment;
}
@Override
public int compareTo(PriorityShipmentTask o) {
return Integer.compare(o.priority, this.priority); // 높은 것 먼저
}
@Override
public void run() { process(shipment); }
}
private void process(Shipment s) { }
}
PriorityBlockingQueue는?
답:
1. 정의:
우선순위:
주의:
용도:
AbortPolicy (기본):
거부 시 RejectedExecutionException 던짐.
- 호출자가 예외 받음
- 명확한 실패
- 기본 정책
new ThreadPoolExecutor.AbortPolicy();
// 거부 시:
try {
executor.execute(task);
} catch (RejectedExecutionException e) {
// 거부됨
log.warn("작업 거부", e);
// 호출자가 처리
}
AbortPolicy 특성:
- 예외 던짐
- 호출자 인지
- 명확
장점:
- 거부 명확히 인지
- 처리 강제
단점:
- 예외 처리 필요
AbortPolicy 적합:
- 거부를 명확히 알아야
- 호출자가 대응 (재시도, 알림)
- 작업 손실 방지
기본값으로 안전
@Service
public class AbortPolicyUsage {
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, 8, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadPoolExecutor.AbortPolicy() // 거부 시 예외
);
public boolean submitShipment(Shipment shipment) {
try {
executor.execute(() -> process(shipment));
return true;
} catch (RejectedExecutionException e) {
log.warn("배송 처리 거부 (과부하): {}", shipment.getId());
metricService.incrementRejected();
// 명확히 인지 → 알림, 재시도 등
return false;
}
}
private void process(Shipment s) { }
}
AbortPolicy의 동작은?
답:
1. 정의:
동작:
특성:
적합:
CallerRunsPolicy:
거부 시 호출자 스레드가 직접 실행.
- 제출한 스레드가 작업 실행
- 백프레셔 효과
CallerRunsPolicy 백프레셔:
과부하 시:
- 호출자가 직접 실행
- 호출자 바빠짐
- 새 작업 제출 느려짐
- 자연스러운 속도 조절
→ 시스템 안정
new ThreadPoolExecutor.CallerRunsPolicy();
// 거부 시:
// 호출 스레드 (예: 메인) 가 직접 task.run()
// → 메인이 바빠짐
// → 다음 제출 느려짐 (백프레셔)
CallerRunsPolicy 효과:
- 작업 손실 X (실행됨)
- 속도 조절 (백프레셔)
- 과부하 완화
단점:
- 호출자 블록 (잠시)
- 순서 영향
CallerRunsPolicy 적합:
- 작업 손실 안 됨
- 백프레셔 원함
- 안정적 처리
권장:
- 대부분의 운영 환경
@Service
public class CallerRunsPolicyUsage {
// 백프레셔 (권장)
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, 8, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy() // 백프레셔
);
public void submitBatch(List<Shipment> shipments) {
for (Shipment shipment : shipments) {
executor.execute(() -> process(shipment));
// 큐 + max 가득 차면:
// → 호출자 (이 스레드) 가 직접 실행
// → 제출 속도 느려짐 (백프레셔)
// → 시스템 안정 (작업 손실 X)
}
}
private void process(Shipment s) { }
}
CallerRunsPolicy의 동작은?
답:
1. 정의:
백프레셔:
효과:
적합:
DiscardPolicy:
거부 시 조용히 버림.
- 예외 X
- 로그 X
- 작업 손실
new ThreadPoolExecutor.DiscardPolicy();
// 거부 시:
executor.execute(task);
// 조용히 무시 (아무 일 없음)
// task 실행 X (손실)
DiscardOldestPolicy:
거부 시 큐의 가장 오래된 작업 버림.
- 큐에서 head 제거
- 새 작업 추가 (재시도)
new ThreadPoolExecutor.DiscardOldestPolicy();
// 거부 시:
// 1. 큐의 가장 오래된 작업 제거 (poll)
// 2. 새 작업 추가 (execute 재시도)
// → 오래된 것 손실, 최신 우선
| 정책 | 동작 | 손실 |
|---|---|---|
| Abort | 예외 | X (호출자 처리) |
| CallerRuns | 호출자 실행 | X |
| Discard | 버림 | 새 작업 |
| DiscardOldest | 오래된 것 버림 | 오래된 작업 |
적합한 경우:
Discard:
- 손실 허용
- 멱등 작업
- 로그/메트릭
DiscardOldest:
- 최신 데이터 중요
- 오래된 것 무의미
- 실시간 데이터
@Configuration
public class DiscardPolicyConfig {
// DiscardOldest — 최신 추적 정보 우선
@Bean("trackingExecutor")
public ThreadPoolExecutor trackingExecutor() {
return new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(50),
new ThreadPoolExecutor.DiscardOldestPolicy()
// 오래된 추적 요청 버리고 최신 우선
// (오래된 위치 정보는 무의미)
);
}
// 커스텀 Discard — 로깅 추가
@Bean("metricsExecutor")
public ThreadPoolExecutor metricsExecutor() {
return new ThreadPoolExecutor(
2, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1000),
(task, executor) -> {
// Discard + 로깅
log.debug("메트릭 작업 버림 (큐 가득)");
metricService.incrementDropped();
// 메트릭은 손실 허용
}
);
}
}
DiscardPolicy / DiscardOldestPolicy의 동작은?
답:
1. Discard:
DiscardOldest:
차이:
적합:
큐 + 정책 조합:
시스템 특성 결정:
- 큐: 버퍼링
- 정책: 과부하 처리
조합으로:
- 백프레셔
- 손실 허용
- 우선순위
// 제한 큐 + CallerRunsPolicy (백프레셔)
new ThreadPoolExecutor(
cores, cores * 2,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), // 제한
new ThreadPoolExecutor.CallerRunsPolicy() // 백프레셔
);
// 과부하 시 호출자 실행 → 자연스러운 조절
// 작업 손실 X
// 제한 큐 + AbortPolicy (명확)
new ThreadPoolExecutor(
cores, cores * 2,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(500),
new ThreadPoolExecutor.AbortPolicy()
);
// 과부하 시 예외 → 호출자 명확히 처리 (재시도/알림)
// SynchronousQueue + max 제한
new ThreadPoolExecutor(
0, 100,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>(), // 직접 전달
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 작업 = 즉시 스레드, 100 초과 시 호출자
// PriorityBlockingQueue (우선순위)
new ThreadPoolExecutor(
cores, cores,
0L, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<>()
// 무제한 → 거부 정책 무의미
// 우선순위 순 처리
);
@Configuration
public class QueuePolicyCombinations {
private final int cores = Runtime.getRuntime().availableProcessors();
// 1. 핵심 처리 — 제한 큐 + CallerRuns (백프레셔)
@Bean("coreProcessing")
public ThreadPoolExecutor coreProcessing() {
return new ThreadPoolExecutor(
cores, cores * 2, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
// 2. 추적 갱신 — DiscardOldest (최신 우선)
@Bean("trackingUpdate")
public ThreadPoolExecutor trackingUpdate() {
return new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(50),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
}
// 3. 긴급 배송 — Priority (우선순위)
@Bean("urgentDelivery")
public ThreadPoolExecutor urgentDelivery() {
return new ThreadPoolExecutor(
4, 4, 0L, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<>()
);
}
// 4. 메트릭 — Discard (손실 허용)
@Bean("metrics")
public ThreadPoolExecutor metrics() {
return new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.DiscardPolicy()
);
}
}
큐와 정책 조합의 실무 시나리오는?
답:
1. 안정 (권장):
명확:
최신 우선:
우선순위:
| Q | 핵심 답변 |
|---|---|
| LinkedBlockingQueue? | 링크드, 무제한/제한 |
| ArrayBlockingQueue? | 고정 배열 |
| SynchronousQueue? | 크기 0, 직접 전달 |
| PriorityBlockingQueue? | 우선순위 |
| AbortPolicy? | 예외 (기본) |
| CallerRunsPolicy? | 호출자 실행 (백프레셔) |
| DiscardPolicy? | 버림 |
| DiscardOldestPolicy? | 오래된 것 버림 |
| 백프레셔 조합? | 제한 큐 + CallerRuns |
| 무제한 큐 거부? | 거부 안 일어남 (OOM) |
답:
답:
답:
답:
답:
1. 작업 큐
2. 거부 정책
3. 조합
이번 Unit에서 큐와 거부 정책을 봤다면, 다음은 스레드 풀 종료 (Phase 7 마지막).
🚀 Phase 7 — Executor 프레임워크 (★ 2차 정점)
✅ Unit 7.1 스레드 풀의 필요성
✅ Unit 7.2 Executor와 ExecutorService
✅ Unit 7.3 Future와 Callable
✅ Unit 7.4 ThreadPoolExecutor 내부 (★ 마스터)
✅ Unit 7.5 스레드 풀 종류
✅ Unit 7.6 작업 큐와 거부 정책 ← 여기
⏭ Unit 7.7 스레드 풀 종료 — Phase 7 완주
✅ Phase 1~6 (25 Unit, 1차 정점 완료)
🚀 Phase 7 — Executor (6/7 진행) ★ 2차 정점
총: 31/35 Unit