ReentrantLock + Condition으로 동기화 구현 (예제 4 - BoundedQueueV4)기존 문제점 요약
synchronized + wait/notify 방식에서는 스레드 대기 집합이 하나뿐이었습니다.개선 방향: “생산자는 소비자만 깨우고, 소비자는 생산자만 깨우도록 하자!”
이를 위해선:
예제4: ReentrantLock 기반 대기 구현
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
Condition: wait()처럼 스레드를 대기시키는 구조, 단 명시적 락과 함께 사용합니다.await(): 현재 스레드를 락 반납 후 대기 상태로 둡니다.signal(): 대기 중인 스레드를 깨웁니다.구조적 특징
| 구조 | 설명 |
|---|---|
lock.lock() / unlock() | synchronized의 대체 |
condition.await() | wait()과 동일. 락 반납 후 대기 |
condition.signal() | notify()과 동일. 하나 깨움 |
| 단점 | 여전히 스레드 구분 없이 깨움 → 생산자-소비자 혼합 대기 상태 유지 |
코드
package thread.bounded;
import static util.MyLogger.log;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedQueueV4 implements BoundedQueue {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV4(int max) {
this.max = max;
}
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("[put] 큐가 가득 참, 생산자 대기");
try {
condition.await();
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, signal() 호출");
condition.signal();
} finally {
lock.unlock();
}
}
@Override
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 소비자 대기");
try {
condition.await();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, signal() 호출");
condition.signal();
return data;
} finally {
lock.unlock();
}
}
@Override
public String toString() {
return queue.toString();
}
}
실행 결과
14:32:57.942 [ main] == [생산자 먼저 실행] 시작, BoundedQueueV4 ==
14:32:57.944 [ main] 생산자 시작
14:32:57.957 [producer1] [생산 시도] data1 -> []
14:32:57.957 [producer1] [put] 생산자 데이터 저장, signal() 호출
14:32:57.957 [producer1] [생산 완료] data1 -> [data1]
14:32:58.064 [producer2] [생산 시도] data2 -> [data1]
14:32:58.065 [producer2] [put] 생산자 데이터 저장, signal() 호출
14:32:58.065 [producer2] [생산 완료] data2 -> [data1, data2]
14:32:58.169 [producer3] [생산 시도] data3 -> [data1, data2]
14:32:58.169 [producer3] [put] 큐가 가득 참, 생산자 대기
14:32:58.275 [ main] 현재 상태 출력, 큐 데이터: [data1, data2]
14:32:58.276 [ main] producer1: TERMINATED
14:32:58.276 [ main] producer2: TERMINATED
14:32:58.277 [ main] producer3: WAITING
14:32:58.277 [ main] 소비자 시작
14:32:58.279 [consumer1] [소비 시도] ? <- [data1, data2]
14:32:58.279 [consumer1] [take] 소비자 데이터 획득, signal() 호출
14:32:58.279 [producer3] [put] 생산자 깨어남
14:32:58.279 [producer3] [put] 생산자 데이터 저장, signal() 호출
14:32:58.279 [producer3] [생산 완료] data3 -> [data2, data3]
14:32:58.279 [consumer1] [소비 완료] data1 <- [data2]
14:32:58.379 [consumer2] [소비 시도] ? <- [data2, data3]
14:32:58.380 [consumer2] [take] 소비자 데이터 획득, signal() 호출
14:32:58.380 [consumer2] [소비 완료] data2 <- [data3]
14:32:58.485 [consumer3] [소비 시도] ? <- [data3]
14:32:58.485 [consumer3] [take] 소비자 데이터 획득, signal() 호출
14:32:58.485 [consumer3] [소비 완료] data3 <- []
14:32:58.591 [ main] 현재 상태 출력, 큐 데이터: []
14:32:58.591 [ main] producer1: TERMINATED
14:32:58.591 [ main] producer2: TERMINATED
14:32:58.591 [ main] producer3: TERMINATED
14:32:58.591 [ main] consumer1: TERMINATED
14:32:58.591 [ main] consumer2: TERMINATED
14:32:58.592 [ main] consumer3: TERMINATED
14:32:58.592 [ main] == [생산자 먼저 실행] 종료, BoundedQueueV4 ==
14:33:26.174 [ main] == [소비자 먼저 실행] 시작, BoundedQueueV4 ==
14:33:26.176 [ main] 소비자 시작
14:33:26.182 [consumer1] [소비 시도] ? <- []
14:33:26.182 [consumer1] [take] 큐에 데이터가 없음, 소비자 대기
14:33:26.293 [consumer2] [소비 시도] ? <- []
14:33:26.293 [consumer2] [take] 큐에 데이터가 없음, 소비자 대기
14:33:26.398 [consumer3] [소비 시도] ? <- []
14:33:26.399 [consumer3] [take] 큐에 데이터가 없음, 소비자 대기
14:33:26.503 [ main] 현재 상태 출력, 큐 데이터: []
14:33:26.503 [ main] consumer1: WAITING
14:33:26.503 [ main] consumer2: WAITING
14:33:26.504 [ main] consumer3: WAITING
14:33:26.504 [ main] 생산자 시작
14:33:26.506 [producer1] [생산 시도] data1 -> []
14:33:26.506 [producer1] [put] 생산자 데이터 저장, signal() 호출
14:33:26.506 [consumer1] [take] 소비자 깨어남
14:33:26.506 [producer1] [생산 완료] data1 -> [data1]
14:33:26.506 [consumer1] [take] 소비자 데이터 획득, signal() 호출
14:33:26.507 [consumer2] [take] 소비자 깨어남
14:33:26.507 [consumer1] [소비 완료] data1 <- []
14:33:26.507 [consumer2] [take] 큐에 데이터가 없음, 소비자 대기
14:33:26.609 [producer2] [생산 시도] data2 -> []
14:33:26.610 [producer2] [put] 생산자 데이터 저장, signal() 호출
14:33:26.610 [producer2] [생산 완료] data2 -> [data2]
14:33:26.610 [consumer3] [take] 소비자 깨어남
14:33:26.610 [consumer3] [take] 소비자 데이터 획득, signal() 호출
14:33:26.610 [consumer3] [소비 완료] data2 <- []
14:33:26.610 [consumer2] [take] 소비자 깨어남
14:33:26.610 [consumer2] [take] 큐에 데이터가 없음, 소비자 대기
14:33:26.714 [producer3] [생산 시도] data3 -> []
14:33:26.714 [producer3] [put] 생산자 데이터 저장, signal() 호출
14:33:26.715 [producer3] [생산 완료] data3 -> [data3]
14:33:26.715 [consumer2] [take] 소비자 깨어남
14:33:26.716 [consumer2] [take] 소비자 데이터 획득, signal() 호출
14:33:26.716 [consumer2] [소비 완료] data3 <- []
14:33:26.822 [ main] 현재 상태 출력, 큐 데이터: []
14:33:26.822 [ main] consumer1: TERMINATED
14:33:26.823 [ main] consumer2: TERMINATED
14:33:26.823 [ main] consumer3: TERMINATED
14:33:26.823 [ main] producer1: TERMINATED
14:33:26.823 [ main] producer2: TERMINATED
14:33:26.823 [ main] producer3: TERMINATED
14:33:26.823 [ main] == [소비자 먼저 실행] 종료, BoundedQueueV4 ==
실행 결과 요약
put() 하자마자 소비자가 깨어나서 처리합니다.WAITING 진입 → 소비자가 처리 후 깨어납니다.wait/notify 버전과 동일합니다.정리
ReentrantLock + Condition은 synchronized보다 세분화된 제어가 가능합니다.문제의 본질
condition.signal()이 모든 대기 스레드 중 무작위로 1개를 깨웁니다.예시 상황
queue.isFull() 상태에서 put() → await()take()가 실행되고 signal() → 다시 put()이 깨어남 → 조건 미충족 → 다시 대기개선 전략: Condition을 생산자/소비자 용도로 분리하여 올바른 종류의 스레드만 깨우도록 제어합니다.
코드 구조
package thread.bounded;
import static util.MyLogger.log;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedQueueV5 implements BoundedQueue {
private final Lock lock = new ReentrantLock();
private final Condition producerCond = lock.newCondition();
private final Condition consumerCond = lock.newCondition();
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV5(int max) {
this.max = max;
}
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("[put] 큐가 가득 참, 생산자 대기");
try {
producerCond.await();
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, signal() 호출");
consumerCond.signal();
} finally {
lock.unlock();
}
}
@Override
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 소비자 대기");
try {
consumerCond.await();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, signal() 호출");
producerCond.signal();
return data;
} finally {
lock.unlock();
}
}
@Override
public String toString() {
return queue.toString();
}
}
실행 결과
14:36:35.759 [ main] == [생산자 먼저 실행] 시작, BoundedQueueV5 ==
14:36:35.761 [ main] 생산자 시작
14:36:35.769 [producer1] [생산 시도] data1 -> []
14:36:35.769 [producer1] [put] 생산자 데이터 저장, signal() 호출
14:36:35.770 [producer1] [생산 완료] data1 -> [data1]
14:36:35.874 [producer2] [생산 시도] data2 -> [data1]
14:36:35.875 [producer2] [put] 생산자 데이터 저장, signal() 호출
14:36:35.875 [producer2] [생산 완료] data2 -> [data1, data2]
14:36:35.979 [producer3] [생산 시도] data3 -> [data1, data2]
14:36:35.979 [producer3] [put] 큐가 가득 참, 생산자 대기
14:36:36.084 [ main] 현재 상태 출력, 큐 데이터: [data1, data2]
14:36:36.084 [ main] producer1: TERMINATED
14:36:36.085 [ main] producer2: TERMINATED
14:36:36.085 [ main] producer3: WAITING
14:36:36.085 [ main] 소비자 시작
14:36:36.087 [consumer1] [소비 시도] ? <- [data1, data2]
14:36:36.087 [consumer1] [take] 소비자 데이터 획득, signal() 호출
14:36:36.087 [producer3] [put] 생산자 깨어남
14:36:36.087 [consumer1] [소비 완료] data1 <- [data2]
14:36:36.087 [producer3] [put] 생산자 데이터 저장, signal() 호출
14:36:36.088 [producer3] [생산 완료] data3 -> [data2, data3]
14:36:36.190 [consumer2] [소비 시도] ? <- [data2, data3]
14:36:36.190 [consumer2] [take] 소비자 데이터 획득, signal() 호출
14:36:36.190 [consumer2] [소비 완료] data2 <- [data3]
14:36:36.295 [consumer3] [소비 시도] ? <- [data3]
14:36:36.296 [consumer3] [take] 소비자 데이터 획득, signal() 호출
14:36:36.296 [consumer3] [소비 완료] data3 <- []
14:36:36.401 [ main] 현재 상태 출력, 큐 데이터: []
14:36:36.401 [ main] producer1: TERMINATED
14:36:36.401 [ main] producer2: TERMINATED
14:36:36.401 [ main] producer3: TERMINATED
14:36:36.401 [ main] consumer1: TERMINATED
14:36:36.402 [ main] consumer2: TERMINATED
14:36:36.402 [ main] consumer3: TERMINATED
14:36:36.402 [ main] == [생산자 먼저 실행] 종료, BoundedQueueV5 ==
14:35:48.513 [ main] == [소비자 먼저 실행] 시작, BoundedQueueV5 ==
14:35:48.514 [ main] 소비자 시작
14:35:48.520 [consumer1] [소비 시도] ? <- []
14:35:48.520 [consumer1] [take] 큐에 데이터가 없음, 소비자 대기
14:35:48.624 [consumer2] [소비 시도] ? <- []
14:35:48.624 [consumer2] [take] 큐에 데이터가 없음, 소비자 대기
14:35:48.729 [consumer3] [소비 시도] ? <- []
14:35:48.729 [consumer3] [take] 큐에 데이터가 없음, 소비자 대기
14:35:48.835 [ main] 현재 상태 출력, 큐 데이터: []
14:35:48.836 [ main] consumer1: WAITING
14:35:48.836 [ main] consumer2: WAITING
14:35:48.836 [ main] consumer3: WAITING
14:35:48.836 [ main] 생산자 시작
14:35:48.838 [producer1] [생산 시도] data1 -> []
14:35:48.838 [producer1] [put] 생산자 데이터 저장, signal() 호출
14:35:48.838 [consumer1] [take] 소비자 깨어남
14:35:48.838 [producer1] [생산 완료] data1 -> [data1]
14:35:48.838 [consumer1] [take] 소비자 데이터 획득, signal() 호출
14:35:48.839 [consumer1] [소비 완료] data1 <- []
14:35:48.942 [producer2] [생산 시도] data2 -> []
14:35:48.942 [producer2] [put] 생산자 데이터 저장, signal() 호출
14:35:48.942 [producer2] [생산 완료] data2 -> [data2]
14:35:48.942 [consumer2] [take] 소비자 깨어남
14:35:48.943 [consumer2] [take] 소비자 데이터 획득, signal() 호출
14:35:48.943 [consumer2] [소비 완료] data2 <- []
14:35:49.047 [producer3] [생산 시도] data3 -> []
14:35:49.048 [producer3] [put] 생산자 데이터 저장, signal() 호출
14:35:49.048 [consumer3] [take] 소비자 깨어남
14:35:49.048 [consumer3] [take] 소비자 데이터 획득, signal() 호출
14:35:49.048 [producer3] [생산 완료] data3 -> [data3]
14:35:49.048 [consumer3] [소비 완료] data3 <- []
14:35:49.152 [ main] 현재 상태 출력, 큐 데이터: []
14:35:49.152 [ main] consumer1: TERMINATED
14:35:49.152 [ main] consumer2: TERMINATED
14:35:49.153 [ main] consumer3: TERMINATED
14:35:49.153 [ main] producer1: TERMINATED
14:35:49.153 [ main] producer2: TERMINATED
14:35:49.153 [ main] producer3: TERMINATED
14:35:49.153 [ main] == [소비자 먼저 실행] 종료, BoundedQueueV5 ==
효과 요약
| 항목 | 설명 |
|---|---|
| 대기 분리 | 생산자, 소비자 스레드가 서로 다른 대기 공간에 들어감 |
| 신호 최적화 | 생산자 → 소비자만 깨움 / 소비자 → 생산자만 깨움 |
| 효율성 향상 | 불필요한 깨어남 없음 → 반복 대기 감소 |
핵심 요약
| 구성 요소 | 설명 |
|---|---|
notFull | 생산자가 큐가 꽉 찼을 때 대기 |
notEmpty | 소비자가 큐가 비었을 때 대기 |
signal() | 정확한 대기 그룹만 선택적으로 깨움 |
while | 깨어나도 조건을 다시 검사 (spurious wakeup 대비) |
실무에서도 매우 중요
Condition을 목적별로 분리해 사용하는 것은 고급 동기화 설계의 핵심입니다.BlockingQueue를 이용한 고수준 동기화핵심 개념: BlockingQueue
자바가 제공하는 동기화 내장 큐로, 생산자-소비자 구조를 직접 구현하지 않아도 자동으로 락 / 대기 / 신호 처리가 됩니다.
BlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
ReentrantLock, Condition, wait/notify를 이미 구현해 둡니다.put()과 take() 메서드가 자동으로 대기 / 신호 처리코드가 간결해지고, 락 관리, 대기/신호 처리가 모두 라이브러리 내부에서 처리됩니다.
주요 메서드 비교
| 메서드 | 설명 | 동작 방식 |
|---|---|---|
put() | 큐가 꽉 차면 자동으로 대기 | 내부적으로 await() |
take() | 큐가 비면 자동으로 대기 | 내부적으로 await() |
offer() | 비동기 삽입, 실패 시 false 반환 | 대기 X |
poll() | 비동기 제거, 실패 시 null 반환 | 대기 X |
BlockingQueue 장점
| 항목 | 설명 |
|---|---|
| 코드 복잡도 | 매우 간결 |
| 락/조건/대기 | 내부에서 자동 처리 |
| 실무 사용 | 생산자-소비자 구조의 표준 구현체 |
| 대표 클래스 | ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue 등 |
BlockingQueue 단점
notify, 대기 시간 제한 등은 추가 조정 필요핵심 정리
BlockingQueue는 생산자-소비자 문제의 최종 해법에 가까운 고수준 도구입니다.BlockingQueue 버전별 비교 (V1 ~ V4)V1 - 기본 put() / take() 사용
package thread.bounded;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BoundedQueueV6_1 implements BoundedQueue {
private BlockingQueue<String> queue;
public BoundedQueueV6_1(int max) {
queue = new ArrayBlockingQueue<>(max);
}
@Override
public void put(String data) {
try {
queue.put(data);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String take() {
try {
return queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String toString() {
return queue.toString();
}
}
15:00:28.523 [ main] == [생산자 먼저 실행] 시작, BoundedQueueV6_1 ==
15:00:28.525 [ main] 생산자 시작
15:00:28.537 [producer1] [생산 시도] data1 -> []
15:00:28.537 [producer1] [생산 완료] data1 -> [data1]
15:00:28.642 [producer2] [생산 시도] data2 -> [data1]
15:00:28.642 [producer2] [생산 완료] data2 -> [data1, data2]
15:00:28.747 [producer3] [생산 시도] data3 -> [data1, data2]
15:00:28.853 [ main] 현재 상태 출력, 큐 데이터: [data1, data2]
15:00:28.854 [ main] producer1: TERMINATED
15:00:28.854 [ main] producer2: TERMINATED
15:00:28.854 [ main] producer3: WAITING
15:00:28.854 [ main] 소비자 시작
15:00:28.856 [consumer1] [소비 시도] ? <- [data1, data2]
15:00:28.856 [producer3] [생산 완료] data3 -> [data2, data3]
15:00:28.856 [consumer1] [소비 완료] data1 <- [data2]
15:00:28.958 [consumer2] [소비 시도] ? <- [data2, data3]
15:00:28.958 [consumer2] [소비 완료] data2 <- [data3]
15:00:29.064 [consumer3] [소비 시도] ? <- [data3]
15:00:29.064 [consumer3] [소비 완료] data3 <- []
15:00:29.170 [ main] 현재 상태 출력, 큐 데이터: []
15:00:29.170 [ main] producer1: TERMINATED
15:00:29.171 [ main] producer2: TERMINATED
15:00:29.171 [ main] producer3: TERMINATED
15:00:29.171 [ main] consumer1: TERMINATED
15:00:29.171 [ main] consumer2: TERMINATED
15:00:29.171 [ main] consumer3: TERMINATED
15:00:29.172 [ main] == [생산자 먼저 실행] 종료, BoundedQueueV6_1 ==
15:00:47.647 [ main] == [소비자 먼저 실행] 시작, BoundedQueueV6_1 ==
15:00:47.648 [ main] 소비자 시작
15:00:47.652 [consumer1] [소비 시도] ? <- []
15:00:47.754 [consumer2] [소비 시도] ? <- []
15:00:47.860 [consumer3] [소비 시도] ? <- []
15:00:47.965 [ main] 현재 상태 출력, 큐 데이터: []
15:00:47.966 [ main] consumer1: WAITING
15:00:47.966 [ main] consumer2: WAITING
15:00:47.967 [ main] consumer3: WAITING
15:00:47.967 [ main] 생산자 시작
15:00:47.968 [producer1] [생산 시도] data1 -> []
15:00:47.969 [producer1] [생산 완료] data1 -> [data1]
15:00:47.969 [consumer1] [소비 완료] data1 <- []
15:00:48.071 [producer2] [생산 시도] data2 -> []
15:00:48.072 [consumer2] [소비 완료] data2 <- []
15:00:48.072 [producer2] [생산 완료] data2 -> [data2]
15:00:48.177 [producer3] [생산 시도] data3 -> []
15:00:48.177 [producer3] [생산 완료] data3 -> [data3]
15:00:48.178 [consumer3] [소비 완료] data3 <- []
15:00:48.282 [ main] 현재 상태 출력, 큐 데이터: []
15:00:48.283 [ main] consumer1: TERMINATED
15:00:48.283 [ main] consumer2: TERMINATED
15:00:48.283 [ main] consumer3: TERMINATED
15:00:48.283 [ main] producer1: TERMINATED
15:00:48.283 [ main] producer2: TERMINATED
15:00:48.283 [ main] producer3: TERMINATED
15:00:48.283 [ main] == [소비자 먼저 실행] 종료, BoundedQueueV6_1 ==
put(), take()만 호출의 특징
자바가 제공하는 BlockingQueue의 기본 동작을 사용합니다.
큐가 가득 차면 자동 대기, 비어 있으면 자동 대기하빈다.
장점
한계
InterruptedException은 런타임 예외로 던지고 있어 예외 처리 전략이 다소 단순합니다.Thread.currentThread().interrupt()로 복구 가능성 고려 필요합니다.결론: 실무에서 가장 추천되는 안정형 구조입니다.
V2 - offer() / poll() 사용 (비차단 방식)
package thread.bounded;
import static util.MyLogger.log;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BoundedQueueV6_2 implements BoundedQueue {
private BlockingQueue<String> queue;
public BoundedQueueV6_2(int max) {
queue = new ArrayBlockingQueue<>(max);
}
@Override
public void put(String data) {
boolean result = queue.offer(data);
log("저장 시도 결과 = " + result);
}
@Override
public String take() {
return queue.poll();
}
@Override
public String toString() {
return queue.toString();
}
}
15:01:08.344 [ main] == [생산자 먼저 실행] 시작, BoundedQueueV6_2 ==
15:01:08.344 [ main] 생산자 시작
15:01:08.355 [producer1] [생산 시도] data1 -> []
15:01:08.355 [producer1] 저장 시도 결과 = true
15:01:08.356 [producer1] [생산 완료] data1 -> [data1]
15:01:08.452 [producer2] [생산 시도] data2 -> [data1]
15:01:08.452 [producer2] 저장 시도 결과 = true
15:01:08.452 [producer2] [생산 완료] data2 -> [data1, data2]
15:01:08.557 [producer3] [생산 시도] data3 -> [data1, data2]
15:01:08.557 [producer3] 저장 시도 결과 = false
15:01:08.558 [producer3] [생산 완료] data3 -> [data1, data2]
15:01:08.663 [ main] 현재 상태 출력, 큐 데이터: [data1, data2]
15:01:08.663 [ main] producer1: TERMINATED
15:01:08.664 [ main] producer2: TERMINATED
15:01:08.664 [ main] producer3: TERMINATED
15:01:08.665 [ main] 소비자 시작
15:01:08.666 [consumer1] [소비 시도] ? <- [data1, data2]
15:01:08.666 [consumer1] [소비 완료] data1 <- [data2]
15:01:08.769 [consumer2] [소비 시도] ? <- [data2]
15:01:08.769 [consumer2] [소비 완료] data2 <- []
15:01:08.875 [consumer3] [소비 시도] ? <- []
15:01:08.876 [consumer3] [소비 완료] null <- []
15:01:08.979 [ main] 현재 상태 출력, 큐 데이터: []
15:01:08.979 [ main] producer1: TERMINATED
15:01:08.979 [ main] producer2: TERMINATED
15:01:08.979 [ main] producer3: TERMINATED
15:01:08.980 [ main] consumer1: TERMINATED
15:01:08.980 [ main] consumer2: TERMINATED
15:01:08.980 [ main] consumer3: TERMINATED
15:01:08.980 [ main] == [생산자 먼저 실행] 종료, BoundedQueueV6_2 ==
15:01:30.593 [ main] == [소비자 먼저 실행] 시작, BoundedQueueV6_2 ==
15:01:30.594 [ main] 소비자 시작
15:01:30.602 [consumer1] [소비 시도] ? <- []
15:01:30.610 [consumer1] [소비 완료] null <- []
15:01:30.717 [consumer2] [소비 시도] ? <- []
15:01:30.717 [consumer2] [소비 완료] null <- []
15:01:30.823 [consumer3] [소비 시도] ? <- []
15:01:30.823 [consumer3] [소비 완료] null <- []
15:01:30.928 [ main] 현재 상태 출력, 큐 데이터: []
15:01:30.929 [ main] consumer1: TERMINATED
15:01:30.929 [ main] consumer2: TERMINATED
15:01:30.929 [ main] consumer3: TERMINATED
15:01:30.929 [ main] 생산자 시작
15:01:30.932 [producer1] [생산 시도] data1 -> []
15:01:30.932 [producer1] 저장 시도 결과 = true
15:01:30.932 [producer1] [생산 완료] data1 -> [data1]
15:01:31.034 [producer2] [생산 시도] data2 -> [data1]
15:01:31.034 [producer2] 저장 시도 결과 = true
15:01:31.034 [producer2] [생산 완료] data2 -> [data1, data2]
15:01:31.139 [producer3] [생산 시도] data3 -> [data1, data2]
15:01:31.140 [producer3] 저장 시도 결과 = false
15:01:31.140 [producer3] [생산 완료] data3 -> [data1, data2]
15:01:31.245 [ main] 현재 상태 출력, 큐 데이터: [data1, data2]
15:01:31.245 [ main] consumer1: TERMINATED
15:01:31.245 [ main] consumer2: TERMINATED
15:01:31.245 [ main] consumer3: TERMINATED
15:01:31.245 [ main] producer1: TERMINATED
15:01:31.245 [ main] producer2: TERMINATED
15:01:31.245 [ main] producer3: TERMINATED
15:01:31.246 [ main] == [소비자 먼저 실행] 종료, BoundedQueueV6_2 ==
offer() / poll() 사용 (비차단 방식)의 특징
장점
스레드 블로킹 없습니다.
시스템이 멈추지 않고 비동기 처리가 필요한 상황에 적합합니다. (예: 로깅 큐, 이벤트 처리)
한계
큐가 가득 차면 offer()는 실패 (false) → 데이터 손실 가능합니다.
큐가 비어 있으면 poll()은 null 반환 → 소비 실패 처리 필요합니다.
결론: 실시간성 우선 또는 유실 허용 가능한 로직에 적합합니다.
V3 - offer() / poll() with timeout 사용
package thread.bounded;
import static util.MyLogger.log;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class BoundedQueueV6_3 implements BoundedQueue {
private BlockingQueue<String> queue;
public BoundedQueueV6_3(int max) {
queue = new ArrayBlockingQueue<>(max);
}
@Override
public void put(String data) {
try {
// 대기 시간 설정 가능
boolean result = queue.offer(data, 1, TimeUnit.NANOSECONDS);
log("저장 시도 결과 = " + result);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String take() {
try {
// 대기 시간 설정 가능
return queue.poll(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String toString() {
return queue.toString();
}
}
15:02:03.062 [ main] == [생산자 먼저 실행] 시작, BoundedQueueV6_3 ==
15:02:03.063 [ main] 생산자 시작
15:02:03.071 [producer1] [생산 시도] data1 -> []
15:02:03.072 [producer1] 저장 시도 결과 = true
15:02:03.072 [producer1] [생산 완료] data1 -> [data1]
15:02:03.171 [producer2] [생산 시도] data2 -> [data1]
15:02:03.172 [producer2] 저장 시도 결과 = true
15:02:03.172 [producer2] [생산 완료] data2 -> [data1, data2]
15:02:03.276 [producer3] [생산 시도] data3 -> [data1, data2]
15:02:03.276 [producer3] 저장 시도 결과 = false
15:02:03.276 [producer3] [생산 완료] data3 -> [data1, data2]
15:02:03.381 [ main] 현재 상태 출력, 큐 데이터: [data1, data2]
15:02:03.381 [ main] producer1: TERMINATED
15:02:03.382 [ main] producer2: TERMINATED
15:02:03.382 [ main] producer3: TERMINATED
15:02:03.382 [ main] 소비자 시작
15:02:03.383 [consumer1] [소비 시도] ? <- [data1, data2]
15:02:03.383 [consumer1] [소비 완료] data1 <- [data2]
15:02:03.487 [consumer2] [소비 시도] ? <- [data2]
15:02:03.488 [consumer2] [소비 완료] data2 <- []
15:02:03.592 [consumer3] [소비 시도] ? <- []
15:02:03.698 [ main] 현재 상태 출력, 큐 데이터: []
15:02:03.698 [ main] producer1: TERMINATED
15:02:03.699 [ main] producer2: TERMINATED
15:02:03.699 [ main] producer3: TERMINATED
15:02:03.699 [ main] consumer1: TERMINATED
15:02:03.699 [ main] consumer2: TERMINATED
15:02:03.699 [ main] consumer3: TIMED_WAITING
15:02:03.700 [ main] == [생산자 먼저 실행] 종료, BoundedQueueV6_3 ==
15:02:31.702 [ main] == [소비자 먼저 실행] 시작, BoundedQueueV6_3 ==
15:02:31.704 [ main] 소비자 시작
15:02:31.711 [consumer1] [소비 시도] ? <- []
15:02:31.823 [consumer2] [소비 시도] ? <- []
15:02:31.928 [consumer3] [소비 시도] ? <- []
15:02:32.033 [ main] 현재 상태 출력, 큐 데이터: []
15:02:32.034 [ main] consumer1: TIMED_WAITING
15:02:32.034 [ main] consumer2: TIMED_WAITING
15:02:32.035 [ main] consumer3: TIMED_WAITING
15:02:32.035 [ main] 생산자 시작
15:02:32.037 [producer1] [생산 시도] data1 -> []
15:02:32.038 [consumer1] [소비 완료] data1 <- []
15:02:32.038 [producer1] 저장 시도 결과 = true
15:02:32.038 [producer1] [생산 완료] data1 -> []
15:02:32.139 [producer2] [생산 시도] data2 -> []
15:02:32.139 [producer2] 저장 시도 결과 = true
15:02:32.139 [consumer2] [소비 완료] data2 <- []
15:02:32.139 [producer2] [생산 완료] data2 -> []
15:02:32.244 [producer3] [생산 시도] data3 -> []
15:02:32.245 [producer3] 저장 시도 결과 = true
15:02:32.245 [consumer3] [소비 완료] data3 <- []
15:02:32.245 [producer3] [생산 완료] data3 -> []
15:02:32.349 [ main] 현재 상태 출력, 큐 데이터: []
15:02:32.349 [ main] consumer1: TERMINATED
15:02:32.349 [ main] consumer2: TERMINATED
15:02:32.349 [ main] consumer3: TERMINATED
15:02:32.349 [ main] producer1: TERMINATED
15:02:32.350 [ main] producer2: TERMINATED
15:02:32.350 [ main] producer3: TERMINATED
15:02:32.350 [ main] == [소비자 먼저 실행] 종료, BoundedQueueV6_3 ==
특징
제한된 시간 동안 대기하고, 시간이 지나면 포기합니다.
유연한 동작 제어가 가능합니다.
장점
블로킹을 완전히 피하지 않으면서도, 시스템 정지 없이 일정 시간만 기다립니다.
예측 가능한 반응 시간 확보가 필수입니다.
한계
결론: 반응성과 안정성의 균형을 원하는 경우 이상적입니다.
V4 - add() / remove() 사용 (예외 기반 처리)
package thread.bounded;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BoundedQueueV6_4 implements BoundedQueue {
private BlockingQueue<String> queue;
public BoundedQueueV6_4(int max) {
queue = new ArrayBlockingQueue<>(max);
}
@Override
public void put(String data) {
queue.add(data); // java.lang.IllegalStateException: Queue full
}
@Override
public String take() {
return queue.remove(); // java.util.NoSuchElementException
}
@Override
public String toString() {
return queue.toString();
}
}
15:02:50.290 [ main] == [생산자 먼저 실행] 시작, BoundedQueueV6_4 ==
15:02:50.291 [ main] 생산자 시작
15:02:50.301 [producer1] [생산 시도] data1 -> []
15:02:50.301 [producer1] [생산 완료] data1 -> [data1]
15:02:50.400 [producer2] [생산 시도] data2 -> [data1]
15:02:50.401 [producer2] [생산 완료] data2 -> [data1, data2]
15:02:50.505 [producer3] [생산 시도] data3 -> [data1, data2]
Exception in thread "producer3" java.lang.IllegalStateException: Queue full
at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
at java.base/java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:329)
at thread.bounded.BoundedQueueV6_4.put(BoundedQueueV6_4.java:16)
at thread.bounded.ProducerTask.run(ProducerTask.java:18)
at java.base/java.lang.Thread.run(Thread.java:1575)
15:02:50.610 [ main] 현재 상태 출력, 큐 데이터: [data1, data2]
15:02:50.611 [ main] producer1: TERMINATED
15:02:50.611 [ main] producer2: TERMINATED
15:02:50.611 [ main] producer3: TERMINATED
15:02:50.611 [ main] 소비자 시작
15:02:50.614 [consumer1] [소비 시도] ? <- [data1, data2]
15:02:50.614 [consumer1] [소비 완료] data1 <- [data2]
15:02:50.718 [consumer2] [소비 시도] ? <- [data2]
15:02:50.718 [consumer2] [소비 완료] data2 <- []
15:02:50.824 [consumer3] [소비 시도] ? <- []
Exception in thread "consumer3" java.util.NoSuchElementException
at java.base/java.util.AbstractQueue.remove(AbstractQueue.java:117)
at thread.bounded.BoundedQueueV6_4.take(BoundedQueueV6_4.java:21)
at thread.bounded.ConsumerTask.run(ConsumerTask.java:16)
at java.base/java.lang.Thread.run(Thread.java:1575)
15:02:50.930 [ main] 현재 상태 출력, 큐 데이터: []
15:02:50.930 [ main] producer1: TERMINATED
15:02:50.930 [ main] producer2: TERMINATED
15:02:50.930 [ main] producer3: TERMINATED
15:02:50.931 [ main] consumer1: TERMINATED
15:02:50.931 [ main] consumer2: TERMINATED
15:02:50.931 [ main] consumer3: TERMINATED
15:02:50.931 [ main] == [생산자 먼저 실행] 종료, BoundedQueueV6_4 ==
15:03:09.760 [ main] == [소비자 먼저 실행] 시작, BoundedQueueV6_4 ==
15:03:09.761 [ main] 소비자 시작
15:03:09.767 [consumer1] [소비 시도] ? <- []
Exception in thread "consumer1" java.util.NoSuchElementException
at java.base/java.util.AbstractQueue.remove(AbstractQueue.java:117)
at thread.bounded.BoundedQueueV6_4.take(BoundedQueueV6_4.java:21)
at thread.bounded.ConsumerTask.run(ConsumerTask.java:16)
at java.base/java.lang.Thread.run(Thread.java:1575)
15:03:09.878 [consumer2] [소비 시도] ? <- []
Exception in thread "consumer2" java.util.NoSuchElementException
at java.base/java.util.AbstractQueue.remove(AbstractQueue.java:117)
at thread.bounded.BoundedQueueV6_4.take(BoundedQueueV6_4.java:21)
at thread.bounded.ConsumerTask.run(ConsumerTask.java:16)
at java.base/java.lang.Thread.run(Thread.java:1575)
15:03:09.985 [consumer3] [소비 시도] ? <- []
Exception in thread "consumer3" java.util.NoSuchElementException
at java.base/java.util.AbstractQueue.remove(AbstractQueue.java:117)
at thread.bounded.BoundedQueueV6_4.take(BoundedQueueV6_4.java:21)
at thread.bounded.ConsumerTask.run(ConsumerTask.java:16)
at java.base/java.lang.Thread.run(Thread.java:1575)
15:03:10.090 [ main] 현재 상태 출력, 큐 데이터: []
15:03:10.091 [ main] consumer1: TERMINATED
15:03:10.091 [ main] consumer2: TERMINATED
15:03:10.091 [ main] consumer3: TERMINATED
15:03:10.091 [ main] 생산자 시작
15:03:10.094 [producer1] [생산 시도] data1 -> []
15:03:10.094 [producer1] [생산 완료] data1 -> [data1]
15:03:10.197 [producer2] [생산 시도] data2 -> [data1]
15:03:10.197 [producer2] [생산 완료] data2 -> [data1, data2]
15:03:10.302 [producer3] [생산 시도] data3 -> [data1, data2]
Exception in thread "producer3" java.lang.IllegalStateException: Queue full
at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
at java.base/java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:329)
at thread.bounded.BoundedQueueV6_4.put(BoundedQueueV6_4.java:16)
at thread.bounded.ProducerTask.run(ProducerTask.java:18)
at java.base/java.lang.Thread.run(Thread.java:1575)
15:03:10.408 [ main] 현재 상태 출력, 큐 데이터: [data1, data2]
15:03:10.409 [ main] consumer1: TERMINATED
15:03:10.409 [ main] consumer2: TERMINATED
15:03:10.409 [ main] consumer3: TERMINATED
15:03:10.409 [ main] producer1: TERMINATED
15:03:10.409 [ main] producer2: TERMINATED
15:03:10.409 [ main] producer3: TERMINATED
15:03:10.410 [ main] == [소비자 먼저 실행] 종료, BoundedQueueV6_4 ==
특징
Queue full, NoSuchElementException)장점
한계
결론: 예외 상황 테스트나 교육용 데모에는 적합하지만, 실제 서비스 로직에는 추천하지 않습니다.
최종 비교 요약표
| 버전 | 사용 메서드 | 대기 방식 | 장점 | 단점 | 적합한 상황 |
|---|---|---|---|---|---|
| V6_1 | put() / take() | 무제한 대기 | 가장 안정적, 동기화 완비 | 인터럽트 처리 미흡 | 일반적인 생산자-소비자 처리 |
| V6_2 | offer() / poll() | 대기 없음 | 빠름, 유실 감수 | 데이터 손실 가능성 | 로깅, 모니터링 큐 |
| V6_3 | offer(timeout) / poll(timeout) | 제한 대기 | 반응성과 안정성의 균형 | 타임아웃 튜닝 필요 | 실시간 + 안정성 요구 |
| V6_4 | add() / remove() | 예외로 제어 | 에러 감지 용이 | 예외 남발 비효율 | 테스트, 예외 케이스 학습용 |
ddsss
실무 팁:
offer() + 결과 확인으로 대체 추천