F-LAB JAVA · 4주차 · Phase 8 · 고급 비동기
이 Unit을 끝내면 다음을 답할 수 있어야 한다.
ForkJoinPool 은 큰 작업을 작은 하위 작업으로 분할 (fork) 하고 결과를 합치는 (join) 분할 정복에 특화된 스레드 풀로, 각 스레드가 자신의 작업 큐 (deque) 를 가지고 유휴 시 다른 스레드의 작업을 훔쳐오는 (work stealing) 방식으로 부하를 분산한다.
분할 정복 은 작업이 충분히 작아질 때까지 재귀적으로 나누고 (fork), 작은 작업들을 처리한 뒤 결과를 합치는 (join) 방식이다.
work stealing 의 핵심은 각 워커 스레드가 자신의 deque (양방향 큐) 를 가지고 자기 작업은 LIFO (한쪽 끝) 로 처리하며, 큐가 비면 다른 스레드의 deque 반대쪽 끝에서 FIFO 로 작업을 훔쳐온다는 점이다.
fork()는 하위 작업을 비동기로 큐에 넣고,join()은 그 결과를 기다려 받으며, 이 작업들은ForkJoinTask(RecursiveTask/RecursiveAction) 로 표현된다.
별도 지정이 없으면 JVM 전역의 commonPool (코어 수 - 1 크기) 을 사용하며, 병렬 스트림 (parallelStream) 도 내부적으로 이 commonPool 을 쓴다.
ForkJoinPool = 설문 집계 팀:
분할 정복:
- 설문 100만 장
- 너무 많아 → 반으로 나눔 (fork)
- 또 반으로 (재귀)
- 충분히 작으면 (예: 1000장) → 직접 집계
- 결과 합침 (join)
work stealing:
- 각 직원 자기 책상 더미 (deque)
- 자기 더미 위에서 처리 (LIFO)
- 자기 것 다 끝나면
- 옆 직원 더미 아래에서 가져옴 (FIFO 훔치기)
→ 노는 직원 없음 (부하 분산)
fork / join:
- fork: "이 더미 너가 해" (분할)
- join: "결과 내놔" (합침)
commonPool:
- 회사 공용 집계팀 (전역 공유)
→ ForkJoinPool = 분할 정복 + work stealing (deque, LIFO/FIFO), fork/join.
1. ForkJoinPool의 정의
2. 분할 정복
3. work stealing
4. deque와 LIFO/FIFO
5. fork()와 join()
6. ForkJoinTask와 commonPool
7. ThreadPoolExecutor와 차이
8. 병렬 스트림과의 관계
9. 면접 + 자기 점검
ForkJoinPool (Java 7):
분할 정복에 특화된 스레드 풀.
- 큰 작업 → 작은 작업 (fork)
- 결과 합침 (join)
- work stealing
ForkJoinPool 목적:
CPU 집약적 작업의 병렬 처리:
- 큰 작업 분할
- 여러 코어 활용
- 부하 분산 (work stealing)
용도:
- 재귀 분할 작업
- 병렬 정렬, 검색
- 병렬 스트림
// 직접 생성
ForkJoinPool pool = new ForkJoinPool(); // 코어 수
ForkJoinPool pool2 = new ForkJoinPool(4); // 병렬도 4
// commonPool (전역)
ForkJoinPool common = ForkJoinPool.commonPool();
ForkJoinPool pool = new ForkJoinPool();
// 작업 제출
RecursiveTask<Long> task = new SumTask(array, 0, array.length);
Long result = pool.invoke(task); // 실행 + 결과
ForkJoinPool 특징:
- 분할 정복
- work stealing (분산 큐)
- ForkJoinTask
- 병렬도 (parallelism)
vs ThreadPoolExecutor:
- 단일 큐 vs 분산 큐
@Service
public class ForkJoinPoolBasics {
private final ForkJoinPool pool = new ForkJoinPool();
// 대량 운임 합산 (분할 정복)
public BigDecimal sumFreight(List<Shipment> shipments) {
FreightSumTask task = new FreightSumTask(shipments, 0, shipments.size());
return pool.invoke(task); // 분할 정복 실행
}
// RecursiveTask (Unit 8.3 상세)
class FreightSumTask extends RecursiveTask<BigDecimal> {
private final List<Shipment> shipments;
private final int start, end;
private static final int THRESHOLD = 1000;
FreightSumTask(List<Shipment> shipments, int start, int end) {
this.shipments = shipments;
this.start = start;
this.end = end;
}
@Override
protected BigDecimal compute() {
if (end - start <= THRESHOLD) {
// 직접 계산
BigDecimal sum = BigDecimal.ZERO;
for (int i = start; i < end; i++) {
sum = sum.add(shipments.get(i).getWeight());
}
return sum;
}
// 분할
int mid = (start + end) / 2;
FreightSumTask left = new FreightSumTask(shipments, start, mid);
FreightSumTask right = new FreightSumTask(shipments, mid, end);
left.fork(); // 비동기
return right.compute().add(left.join()); // 합침
}
}
}
ForkJoinPool의 정의와 목적은?
답:
1. 정의:
목적:
생성:
특징:
분할 정복 (Divide and Conquer):
큰 문제를 작은 하위 문제로 나누고
결과를 합치는 방식.
단계:
1. 분할 (Divide): 작게 나눔
2. 정복 (Conquer): 작은 것 해결
3. 결합 (Combine): 결과 합침
분할 정복 재귀:
작업이 충분히 작은가?
예 → 직접 처리
아니오 → 둘로 분할 (재귀)
↓
각각 처리
↓
결과 합침
임계값 (Threshold):
분할 중단 기준:
- 작업 크기 <= 임계값
- 직접 처리 (분할 X)
너무 작으면: 오버헤드 ↑
너무 크면: 병렬성 ↓
→ 적절한 임계값 중요
// 분할 정복 의사 코드
compute(task) {
if (task 충분히 작음) {
return 직접_처리(task); // 정복
}
// 분할
[left, right] = split(task);
leftResult = fork(left); // 비동기
rightResult = compute(right); // 직접
return combine(leftResult.join(), rightResult); // 결합
}
분할 정복:
[전체 작업]
/ \
[절반] [절반]
/ \ / \
[¼] [¼] [¼] [¼]
↓ ↓ ↓ ↓
처리 처리 처리 처리
\ / \ /
합침 합침
\ /
합침 (전체)
@Service
public class DivideAndConquer {
private final ForkJoinPool pool = new ForkJoinPool();
// 대량 배송 검증 (분할 정복)
public int countValidShipments(List<Shipment> shipments) {
return pool.invoke(new CountTask(shipments, 0, shipments.size()));
}
class CountTask extends RecursiveTask<Integer> {
private final List<Shipment> shipments;
private final int start, end;
private static final int THRESHOLD = 500; // 임계값
CountTask(List<Shipment> shipments, int start, int end) {
this.shipments = shipments;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= THRESHOLD) {
// 정복 (직접 처리)
int count = 0;
for (int i = start; i < end; i++) {
if (isValid(shipments.get(i))) count++;
}
return count;
}
// 분할
int mid = (start + end) / 2;
CountTask left = new CountTask(shipments, start, mid);
CountTask right = new CountTask(shipments, mid, end);
left.fork();
int rightCount = right.compute();
int leftCount = left.join();
return leftCount + rightCount; // 결합
}
private boolean isValid(Shipment s) { return s.getWeight() != null; }
}
}
분할 정복 방식은?
답:
1. 분할 정복:
재귀:
임계값:
결합:
work stealing (작업 훔치기):
각 스레드가 자기 작업 큐를 가지고,
유휴 시 다른 스레드 작업을 훔침.
효과:
- 부하 분산
- 유휴 스레드 활용
work stealing 메커니즘:
각 워커:
- 자기 deque
- 자기 작업 처리
자기 큐 비면:
- 다른 워커 deque 에서 훔침
- 부하 분산
work stealing:
워커1 deque: [작업, 작업, 작업, 작업]
워커2 deque: [] (비음)
↓ 훔침
워커2: 워커1 deque 반대쪽에서 작업 가져옴
→ 워커2 가 놀지 않음
→ 부하 자동 분산
fork 와 큐:
fork():
- 하위 작업을 자기 deque 에 push
자기 작업:
- deque 에서 pop (LIFO)
훔치기:
- 다른 deque 에서 (FIFO)
work stealing 효율:
- 부하 불균형 자동 해소
- 코어 활용 극대화
- 락 경쟁 적음 (분산 큐)
vs 단일 큐:
- 단일 큐: 경쟁 (모든 스레드)
- 분산 큐: 자기 큐 (경쟁 적음)
@Service
public class WorkStealingExample {
// work stealing 풀 (Executors)
private final ExecutorService workStealingPool =
Executors.newWorkStealingPool(); // ForkJoinPool 기반
public void processParallel(List<Shipment> shipments) {
List<CompletableFuture<Void>> futures = shipments.stream()
.map(s -> CompletableFuture.runAsync(
() -> process(s), workStealingPool))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.join();
// 작업이 불균형해도 work stealing 으로 분산
// (오래 걸리는 배송은 다른 워커가 도움)
}
private void process(Shipment s) { }
}
work stealing 메커니즘은?
답:
1. 정의:
메커니즘:
효율:
fork:
deque (Double-Ended Queue):
양쪽 끝에서 추가/제거 가능.
- 각 워커가 자기 deque
work stealing:
- 자기: 한쪽 끝 (LIFO)
- 훔치기: 반대쪽 끝 (FIFO)
자기 작업 LIFO:
자기 deque:
- push (한쪽 끝)
- pop (같은 끝) → LIFO
이유:
- 최근 작업 (캐시 지역성)
- 분할 정복 깊이 우선
훔치기 FIFO:
다른 deque:
- 반대쪽 끝에서 가져옴 → FIFO
이유:
- 오래된 작업 (큰 작업)
- 경쟁 회피 (반대쪽)
- 큰 작업 훔쳐 효율
deque LIFO/FIFO:
워커1 deque:
[오래된]──────[최근]
↑ ↑
훔치기(FIFO) 자기(LIFO)
- 자기: 최근 끝 (LIFO)
- 다른 워커: 오래된 끝 (FIFO 훔침)
→ 양쪽 분리 (경쟁 적음)
LIFO/FIFO 분리 효율:
자기 LIFO:
- 캐시 지역성 (최근 데이터)
- 분할 깊이 우선
훔치기 FIFO:
- 큰 작업 (오래된 것)
- 자기 작업과 충돌 X (반대쪽)
- 락 경쟁 최소
// deque 동작은 ForkJoinPool 내부
// 사용자는 fork/compute/join 만
@Service
public class DequeExample {
private final ForkJoinPool pool = new ForkJoinPool();
class ProcessTask extends RecursiveAction {
private final List<Shipment> shipments;
private final int start, end;
private static final int THRESHOLD = 100;
ProcessTask(List<Shipment> shipments, int start, int end) {
this.shipments = shipments;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
for (int i = start; i < end; i++) {
process(shipments.get(i));
}
return;
}
int mid = (start + end) / 2;
ProcessTask left = new ProcessTask(shipments, start, mid);
ProcessTask right = new ProcessTask(shipments, mid, end);
// fork → 자기 deque 에 push (LIFO)
// 다른 워커가 훔칠 수 있음 (FIFO)
invokeAll(left, right);
}
private void process(Shipment s) { }
}
}
deque와 LIFO/FIFO는?
답:
1. deque:
자기 LIFO:
훔치기 FIFO:
효율:
fork():
하위 작업을 비동기로 실행.
- 자기 deque 에 push
- 즉시 반환 (대기 X)
- 다른 워커가 처리 가능
join():
하위 작업의 결과를 기다려 받음.
- fork 한 작업 완료 대기
- 결과 반환
- (대기 중 다른 작업 처리 가능)
// fork / join 패턴
protected Long compute() {
if (작음) {
return 직접_계산();
}
// 분할
SubTask left = new SubTask(왼쪽);
SubTask right = new SubTask(오른쪽);
left.fork(); // 비동기 (deque push)
long rightResult = right.compute(); // 직접 (현재 스레드)
long leftResult = left.join(); // 결과 대기
return leftResult + rightResult; // 합침
}
// ✓ 권장 순서
left.fork(); // 1. 분할 (비동기)
long r = right.compute(); // 2. 하나는 직접
long l = left.join(); // 3. 결과 합침
// ❌ 비효율
left.fork();
right.fork(); // 둘 다 fork
long l = left.join();
long r = right.join(); // 현재 스레드 놀음
// → 하나는 compute 가 효율적
// invokeAll — 여러 작업 한번에
invokeAll(left, right);
// 또는
invokeAll(List.of(task1, task2, task3));
// fork + join 자동
@Service
public class ForkJoinMethods {
private final ForkJoinPool pool = new ForkJoinPool();
public BigDecimal calculateTotalFreight(List<Shipment> shipments) {
return pool.invoke(new FreightTask(shipments, 0, shipments.size()));
}
class FreightTask extends RecursiveTask<BigDecimal> {
private final List<Shipment> shipments;
private final int start, end;
private static final int THRESHOLD = 1000;
FreightTask(List<Shipment> shipments, int start, int end) {
this.shipments = shipments;
this.start = start;
this.end = end;
}
@Override
protected BigDecimal compute() {
if (end - start <= THRESHOLD) {
BigDecimal sum = BigDecimal.ZERO;
for (int i = start; i < end; i++) {
sum = sum.add(shipments.get(i).getWeight());
}
return sum;
}
int mid = (start + end) / 2;
FreightTask left = new FreightTask(shipments, start, mid);
FreightTask right = new FreightTask(shipments, mid, end);
left.fork(); // 비동기
BigDecimal rightResult = right.compute(); // 직접
BigDecimal leftResult = left.join(); // 대기
return leftResult.add(rightResult); // 합침
}
}
}
fork()와 join()의 동작은?
답:
1. fork():
join():
패턴:
순서:
ForkJoinTask:
ForkJoinPool 의 작업 단위.
- fork/join 가능
하위 클래스:
- RecursiveTask<V>: 결과 있음
- RecursiveAction: 결과 없음
// RecursiveTask — 결과 있음
class SumTask extends RecursiveTask<Long> {
protected Long compute() {
return sum; // 결과 반환
}
}
// RecursiveAction — 결과 없음
class ProcessTask extends RecursiveAction {
protected void compute() {
process(); // 반환 X
}
}
commonPool:
ForkJoinPool 의 전역 공유 풀.
- JVM 하나
- 병렬도 = 코어 수 - 1
- 별도 지정 안 하면 사용
// commonPool 사용처
// 1. 직접
ForkJoinPool.commonPool().invoke(task);
// 2. CompletableFuture (기본)
CompletableFuture.supplyAsync(() -> task()); // commonPool
// 3. 병렬 스트림
list.parallelStream().map(...).collect(...); // commonPool
commonPool 주의:
전역 공유:
- 모든 곳에서 공유
- 블로킹 작업 시 고갈
- 다른 작업 영향
→ 블로킹 작업은 커스텀 풀
@Service
public class ForkJoinTaskExample {
// RecursiveTask — 결과 있음 (합산)
static class SumTask extends RecursiveTask<BigDecimal> {
// compute 가 BigDecimal 반환
protected BigDecimal compute() { return BigDecimal.ZERO; }
}
// RecursiveAction — 결과 없음 (처리)
static class ProcessAction extends RecursiveAction {
// compute 가 void
protected void compute() { }
}
// commonPool 회피 — 커스텀 풀
private final ForkJoinPool customPool = new ForkJoinPool(8);
public BigDecimal sumWithCustomPool(List<Shipment> shipments) {
SumTask task = new SumTask();
return customPool.invoke(task); // 커스텀 (commonPool X)
}
// 병렬 스트림 — commonPool 사용 (주의)
public BigDecimal sumWithParallelStream(List<Shipment> shipments) {
return shipments.parallelStream() // commonPool
.map(Shipment::getWeight)
.reduce(BigDecimal.ZERO, BigDecimal::add);
}
}
ForkJoinTask와 commonPool은?
답:
1. ForkJoinTask:
RecursiveTask vs Action:
commonPool:
주의:
| 항목 | ThreadPoolExecutor | ForkJoinPool |
|---|---|---|
| 큐 | 단일 공유 | 분산 (워커별 deque) |
| 부하 분산 | X | work stealing |
| 작업 | 독립적 | 분할 정복 (재귀) |
| 용도 | 일반 작업 | CPU 집약 분할 |
큐 구조 차이:
ThreadPoolExecutor:
- 단일 작업 큐
- 모든 워커 공유
- 경쟁 (락)
ForkJoinPool:
- 워커별 deque
- 분산
- work stealing
작업 유형:
ThreadPoolExecutor:
- 독립적 작업
- submit 으로 제출
- 일반 비동기
ForkJoinPool:
- 분할 정복
- fork/join
- 재귀 작업
선택:
ThreadPoolExecutor:
- 독립 작업
- I/O 포함
- 일반 서버
ForkJoinPool:
- CPU 집약
- 분할 가능
- 재귀 (정렬, 합산)
함께 사용:
- 일반 작업: ThreadPoolExecutor
- 분할 정복: ForkJoinPool
- CompletableFuture: 둘 다 (Executor 지정)
용도별 분리
@Configuration
public class PoolComparison {
// ThreadPoolExecutor — 일반 작업 (I/O 포함)
@Bean("generalPool")
public ExecutorService generalPool() {
return new ThreadPoolExecutor(
10, 20, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 배송 처리, API 호출 등 독립 작업
}
// ForkJoinPool — CPU 집약 분할 정복
@Bean("computePool")
public ForkJoinPool computePool() {
return new ForkJoinPool(
Runtime.getRuntime().availableProcessors());
// 대량 데이터 합산, 정렬 등
}
}
ThreadPoolExecutor와 ForkJoinPool 차이는?
답:
1. 큐:
부하 분산:
작업:
선택:
병렬 스트림:
parallelStream() / stream().parallel()
- 내부적으로 ForkJoinPool
- commonPool 사용
- 분할 정복 자동
// 병렬 스트림 = ForkJoinPool
list.parallelStream()
.map(x -> transform(x))
.reduce(0, Integer::sum);
// 내부:
// - 스트림 분할 (spliterator)
// - ForkJoinPool.commonPool
// - work stealing
// - 결과 합침
병렬 스트림 commonPool 공유:
모든 병렬 스트림:
- commonPool 공유
- 블로킹 작업 시 고갈
- 서로 영향
→ 블로킹 병렬 스트림 주의
// 병렬 스트림 + 커스텀 풀
ForkJoinPool customPool = new ForkJoinPool(8);
customPool.submit(() ->
list.parallelStream()
.map(x -> transform(x))
.collect(Collectors.toList())
).get();
// commonPool 대신 커스텀
// 블로킹 작업 격리
병렬 스트림 주의:
- commonPool 공유 (격리 X)
- 블로킹 작업 부적합
- 작은 데이터는 오버헤드
- 순서 보장 X (대부분)
적합:
- CPU 집약
- 큰 데이터
- 독립 연산
@Service
public class ParallelStreamRelation {
// 병렬 스트림 (commonPool)
public BigDecimal sumFreightParallel(List<Shipment> shipments) {
return shipments.parallelStream() // ForkJoinPool.commonPool
.map(Shipment::getWeight)
.reduce(BigDecimal.ZERO, BigDecimal::add);
// CPU 집약 + 큰 데이터 → 적합
}
// 커스텀 풀로 격리 (블로킹 작업 시)
private final ForkJoinPool customPool = new ForkJoinPool(8);
public List<Result> processParallelIsolated(List<Shipment> shipments)
throws Exception {
return customPool.submit(() ->
shipments.parallelStream()
.map(this::process)
.toList()
).get();
// commonPool 격리 (다른 작업 영향 X)
}
// ❌ 병렬 스트림에 블로킹 (commonPool 고갈)
public void badBlockingParallel(List<Shipment> shipments) {
shipments.parallelStream()
.forEach(s -> {
callSlowExternalApi(s); // 블로킹 → commonPool 고갈
// 다른 병렬 작업 영향
});
}
private Result process(Shipment s) { return new Result(); }
private void callSlowExternalApi(Shipment s) { }
record Result() {}
}
병렬 스트림과 ForkJoinPool의 관계는?
답:
1. 관계:
내부:
commonPool 공유:
주의:
| Q | 핵심 답변 |
|---|---|
| ForkJoinPool? | 분할 정복 + work stealing |
| 분할 정복? | 분할 → 정복 → 결합 |
| work stealing? | 자기 큐 + 훔치기 |
| deque? | 양방향, 워커별 |
| LIFO/FIFO? | 자기 LIFO, 훔치기 FIFO |
| fork()? | 비동기 (deque push) |
| join()? | 결과 대기 |
| RecursiveTask/Action? | 결과 O/X |
| commonPool? | 전역 공유 |
| 병렬 스트림? | commonPool 사용 |
답:
답:
답:
답:
답:
1. ForkJoinPool
2. work stealing
3. 특징
이번 Unit에서 ForkJoinPool 을 봤다면, 다음은 RecursiveTask (4주차 마지막).
🚀 Phase 8 — 고급 비동기
✅ Unit 8.1 CompletableFuture (★ 마스터)
✅ Unit 8.2 ForkJoinPool ← 여기
⏭ Unit 8.3 RecursiveTask — 4주차 최종 완주
✅ Phase 1~7 (32 Unit, 1·2차 정점 완료)
🚀 Phase 8 — 고급 비동기 (2/3 진행)
총: 34/35 Unit