9. 생산자 소비자 문제2

임대일·2025년 5월 13일

Thread

목록 보기
9/13
post-thumbnail

1. ReentrantLock + Condition으로 동기화 구현 (예제 4 - BoundedQueueV4)

기존 문제점 요약

  • 이전 synchronized + wait/notify 방식에서는 스레드 대기 집합이 하나뿐이었습니다.
  • → 생산자가 생산자를 깨우거나, 소비자가 소비자를 깨우는 비효율이 발생 했습니다.

개선 방향: “생산자는 소비자만 깨우고, 소비자는 생산자만 깨우도록 하자!”

이를 위해선:

  1. 대기 집합을 생산자용 / 소비자용으로 분리
  2. 각 그룹이 서로만 깨우도록 설계

예제4: ReentrantLock 기반 대기 구현

private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
  • Condition: wait()처럼 스레드를 대기시키는 구조, 단 명시적 락과 함께 사용합니다.
  • await(): 현재 스레드를 락 반납 후 대기 상태로 둡니다.
  • signal(): 대기 중인 스레드를 깨웁니다.

구조적 특징

구조설명
lock.lock() / unlock()synchronized의 대체
condition.await()wait()과 동일. 락 반납 후 대기
condition.signal()notify()과 동일. 하나 깨움
단점여전히 스레드 구분 없이 깨움 → 생산자-소비자 혼합 대기 상태 유지

코드

package thread.bounded;

import static util.MyLogger.log;

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedQueueV4 implements BoundedQueue {

  private final Lock lock = new ReentrantLock();
  private final Condition condition = lock.newCondition();

  private final Queue<String> queue = new ArrayDeque<>();
  private final int max;

  public BoundedQueueV4(int max) {
    this.max = max;
  }

  @Override
  public void put(String data) {
    lock.lock();
    try {
      while (queue.size() == max) {
        log("[put] 큐가 가득 참, 생산자 대기");
        try {
          condition.await();
          log("[put] 생산자 깨어남");
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
      }
      queue.offer(data);
      log("[put] 생산자 데이터 저장, signal() 호출");
      condition.signal();
    } finally {
      lock.unlock();
    }
  }

  @Override
  public String take() {
    lock.lock();
    try {
      while (queue.isEmpty()) {
        log("[take] 큐에 데이터가 없음, 소비자 대기");
        try {
          condition.await();
          log("[take] 소비자 깨어남");
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
      }
      String data = queue.poll();
      log("[take] 소비자 데이터 획득, signal() 호출");
      condition.signal();
      return data;
    } finally {
      lock.unlock();
    }
  }

  @Override
  public String toString() {
    return queue.toString();
  }
}

실행 결과

14:32:57.942 [     main] == [생산자 먼저 실행] 시작, BoundedQueueV4 ==

14:32:57.944 [     main] 생산자 시작
14:32:57.957 [producer1] [생산 시도] data1 -> []
14:32:57.957 [producer1] [put] 생산자 데이터 저장, signal() 호출
14:32:57.957 [producer1] [생산 완료] data1 -> [data1]
14:32:58.064 [producer2] [생산 시도] data2 -> [data1]
14:32:58.065 [producer2] [put] 생산자 데이터 저장, signal() 호출
14:32:58.065 [producer2] [생산 완료] data2 -> [data1, data2]
14:32:58.169 [producer3] [생산 시도] data3 -> [data1, data2]
14:32:58.169 [producer3] [put] 큐가 가득 참, 생산자 대기

14:32:58.275 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
14:32:58.276 [     main] producer1: TERMINATED
14:32:58.276 [     main] producer2: TERMINATED
14:32:58.277 [     main] producer3: WAITING

14:32:58.277 [     main] 소비자 시작
14:32:58.279 [consumer1] [소비 시도]     ? <- [data1, data2]
14:32:58.279 [consumer1] [take] 소비자 데이터 획득, signal() 호출
14:32:58.279 [producer3] [put] 생산자 깨어남
14:32:58.279 [producer3] [put] 생산자 데이터 저장, signal() 호출
14:32:58.279 [producer3] [생산 완료] data3 -> [data2, data3]
14:32:58.279 [consumer1] [소비 완료] data1 <- [data2]
14:32:58.379 [consumer2] [소비 시도]     ? <- [data2, data3]
14:32:58.380 [consumer2] [take] 소비자 데이터 획득, signal() 호출
14:32:58.380 [consumer2] [소비 완료] data2 <- [data3]
14:32:58.485 [consumer3] [소비 시도]     ? <- [data3]
14:32:58.485 [consumer3] [take] 소비자 데이터 획득, signal() 호출
14:32:58.485 [consumer3] [소비 완료] data3 <- []

14:32:58.591 [     main] 현재 상태 출력, 큐 데이터: []
14:32:58.591 [     main] producer1: TERMINATED
14:32:58.591 [     main] producer2: TERMINATED
14:32:58.591 [     main] producer3: TERMINATED
14:32:58.591 [     main] consumer1: TERMINATED
14:32:58.591 [     main] consumer2: TERMINATED
14:32:58.592 [     main] consumer3: TERMINATED
14:32:58.592 [     main] == [생산자 먼저 실행] 종료, BoundedQueueV4 ==
14:33:26.174 [     main] == [소비자 먼저 실행] 시작, BoundedQueueV4 ==

14:33:26.176 [     main] 소비자 시작
14:33:26.182 [consumer1] [소비 시도]     ? <- []
14:33:26.182 [consumer1] [take] 큐에 데이터가 없음, 소비자 대기
14:33:26.293 [consumer2] [소비 시도]     ? <- []
14:33:26.293 [consumer2] [take] 큐에 데이터가 없음, 소비자 대기
14:33:26.398 [consumer3] [소비 시도]     ? <- []
14:33:26.399 [consumer3] [take] 큐에 데이터가 없음, 소비자 대기

14:33:26.503 [     main] 현재 상태 출력, 큐 데이터: []
14:33:26.503 [     main] consumer1: WAITING
14:33:26.503 [     main] consumer2: WAITING
14:33:26.504 [     main] consumer3: WAITING

14:33:26.504 [     main] 생산자 시작
14:33:26.506 [producer1] [생산 시도] data1 -> []
14:33:26.506 [producer1] [put] 생산자 데이터 저장, signal() 호출
14:33:26.506 [consumer1] [take] 소비자 깨어남
14:33:26.506 [producer1] [생산 완료] data1 -> [data1]
14:33:26.506 [consumer1] [take] 소비자 데이터 획득, signal() 호출
14:33:26.507 [consumer2] [take] 소비자 깨어남
14:33:26.507 [consumer1] [소비 완료] data1 <- []
14:33:26.507 [consumer2] [take] 큐에 데이터가 없음, 소비자 대기
14:33:26.609 [producer2] [생산 시도] data2 -> []
14:33:26.610 [producer2] [put] 생산자 데이터 저장, signal() 호출
14:33:26.610 [producer2] [생산 완료] data2 -> [data2]
14:33:26.610 [consumer3] [take] 소비자 깨어남
14:33:26.610 [consumer3] [take] 소비자 데이터 획득, signal() 호출
14:33:26.610 [consumer3] [소비 완료] data2 <- []
14:33:26.610 [consumer2] [take] 소비자 깨어남
14:33:26.610 [consumer2] [take] 큐에 데이터가 없음, 소비자 대기
14:33:26.714 [producer3] [생산 시도] data3 -> []
14:33:26.714 [producer3] [put] 생산자 데이터 저장, signal() 호출
14:33:26.715 [producer3] [생산 완료] data3 -> [data3]
14:33:26.715 [consumer2] [take] 소비자 깨어남
14:33:26.716 [consumer2] [take] 소비자 데이터 획득, signal() 호출
14:33:26.716 [consumer2] [소비 완료] data3 <- []

14:33:26.822 [     main] 현재 상태 출력, 큐 데이터: []
14:33:26.822 [     main] consumer1: TERMINATED
14:33:26.823 [     main] consumer2: TERMINATED
14:33:26.823 [     main] consumer3: TERMINATED
14:33:26.823 [     main] producer1: TERMINATED
14:33:26.823 [     main] producer2: TERMINATED
14:33:26.823 [     main] producer3: TERMINATED
14:33:26.823 [     main] == [소비자 먼저 실행] 종료, BoundedQueueV4 ==

실행 결과 요약

  • 소비자 먼저 실행 시, 생산자가 put() 하자마자 소비자가 깨어나서 처리합니다.
  • 생산자 먼저 실행 시, 생산자가 가득 채운 후 WAITING 진입 → 소비자가 처리 후 깨어납니다.
  • 결과는 wait/notify 버전과 동일합니다.

정리

  • ReentrantLock + Conditionsynchronized보다 세분화된 제어가 가능합니다.
  • 하지만 아직은 스레드 대기 공간이 하나뿐이라 완전한 해결은 아닙니다.

2. 예제5 - Condition 분리: 생산자와 소비자 각각의 대기 공간

문제의 본질

  • 예제4에서는 condition.signal()모든 대기 스레드 중 무작위로 1개를 깨웁니다.
  • 그래서 생산자가 생산자를 깨우고, 소비자가 소비자를 깨우는 불필요한 깨어남 발생이 여전히 가능합니다.

예시 상황

  • queue.isFull() 상태에서 put()await()
  • 이후 take()가 실행되고 signal() → 다시 put()이 깨어남 → 조건 미충족 → 다시 대기

개선 전략: Condition을 생산자/소비자 용도로 분리하여 올바른 종류의 스레드만 깨우도록 제어합니다.

코드 구조

package thread.bounded;

import static util.MyLogger.log;

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedQueueV5 implements BoundedQueue {

  private final Lock lock = new ReentrantLock();
  private final Condition producerCond = lock.newCondition();
  private final Condition consumerCond = lock.newCondition();


  private final Queue<String> queue = new ArrayDeque<>();
  private final int max;

  public BoundedQueueV5(int max) {
    this.max = max;
  }


  @Override
  public void put(String data) {
    lock.lock();
    try {
      while (queue.size() == max) {
        log("[put] 큐가 가득 참, 생산자 대기");
        try {
          producerCond.await();
          log("[put] 생산자 깨어남");
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
      }
      queue.offer(data);
      log("[put] 생산자 데이터 저장, signal() 호출");
      consumerCond.signal();
    } finally {
      lock.unlock();
    }
  }

  @Override
  public String take() {
    lock.lock();
    try {
      while (queue.isEmpty()) {
        log("[take] 큐에 데이터가 없음, 소비자 대기");
        try {
          consumerCond.await();
          log("[take] 소비자 깨어남");
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
      }
      String data = queue.poll();
      log("[take] 소비자 데이터 획득, signal() 호출");
      producerCond.signal();
      return data;
    } finally {
      lock.unlock();
    }
  }

  @Override
  public String toString() {
    return queue.toString();
  }
}

실행 결과

14:36:35.759 [     main] == [생산자 먼저 실행] 시작, BoundedQueueV5 ==

14:36:35.761 [     main] 생산자 시작
14:36:35.769 [producer1] [생산 시도] data1 -> []
14:36:35.769 [producer1] [put] 생산자 데이터 저장, signal() 호출
14:36:35.770 [producer1] [생산 완료] data1 -> [data1]
14:36:35.874 [producer2] [생산 시도] data2 -> [data1]
14:36:35.875 [producer2] [put] 생산자 데이터 저장, signal() 호출
14:36:35.875 [producer2] [생산 완료] data2 -> [data1, data2]
14:36:35.979 [producer3] [생산 시도] data3 -> [data1, data2]
14:36:35.979 [producer3] [put] 큐가 가득 참, 생산자 대기

14:36:36.084 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
14:36:36.084 [     main] producer1: TERMINATED
14:36:36.085 [     main] producer2: TERMINATED
14:36:36.085 [     main] producer3: WAITING

14:36:36.085 [     main] 소비자 시작
14:36:36.087 [consumer1] [소비 시도]     ? <- [data1, data2]
14:36:36.087 [consumer1] [take] 소비자 데이터 획득, signal() 호출
14:36:36.087 [producer3] [put] 생산자 깨어남
14:36:36.087 [consumer1] [소비 완료] data1 <- [data2]
14:36:36.087 [producer3] [put] 생산자 데이터 저장, signal() 호출
14:36:36.088 [producer3] [생산 완료] data3 -> [data2, data3]
14:36:36.190 [consumer2] [소비 시도]     ? <- [data2, data3]
14:36:36.190 [consumer2] [take] 소비자 데이터 획득, signal() 호출
14:36:36.190 [consumer2] [소비 완료] data2 <- [data3]
14:36:36.295 [consumer3] [소비 시도]     ? <- [data3]
14:36:36.296 [consumer3] [take] 소비자 데이터 획득, signal() 호출
14:36:36.296 [consumer3] [소비 완료] data3 <- []

14:36:36.401 [     main] 현재 상태 출력, 큐 데이터: []
14:36:36.401 [     main] producer1: TERMINATED
14:36:36.401 [     main] producer2: TERMINATED
14:36:36.401 [     main] producer3: TERMINATED
14:36:36.401 [     main] consumer1: TERMINATED
14:36:36.402 [     main] consumer2: TERMINATED
14:36:36.402 [     main] consumer3: TERMINATED
14:36:36.402 [     main] == [생산자 먼저 실행] 종료, BoundedQueueV5 ==
14:35:48.513 [     main] == [소비자 먼저 실행] 시작, BoundedQueueV5 ==

14:35:48.514 [     main] 소비자 시작
14:35:48.520 [consumer1] [소비 시도]     ? <- []
14:35:48.520 [consumer1] [take] 큐에 데이터가 없음, 소비자 대기
14:35:48.624 [consumer2] [소비 시도]     ? <- []
14:35:48.624 [consumer2] [take] 큐에 데이터가 없음, 소비자 대기
14:35:48.729 [consumer3] [소비 시도]     ? <- []
14:35:48.729 [consumer3] [take] 큐에 데이터가 없음, 소비자 대기

14:35:48.835 [     main] 현재 상태 출력, 큐 데이터: []
14:35:48.836 [     main] consumer1: WAITING
14:35:48.836 [     main] consumer2: WAITING
14:35:48.836 [     main] consumer3: WAITING

14:35:48.836 [     main] 생산자 시작
14:35:48.838 [producer1] [생산 시도] data1 -> []
14:35:48.838 [producer1] [put] 생산자 데이터 저장, signal() 호출
14:35:48.838 [consumer1] [take] 소비자 깨어남
14:35:48.838 [producer1] [생산 완료] data1 -> [data1]
14:35:48.838 [consumer1] [take] 소비자 데이터 획득, signal() 호출
14:35:48.839 [consumer1] [소비 완료] data1 <- []
14:35:48.942 [producer2] [생산 시도] data2 -> []
14:35:48.942 [producer2] [put] 생산자 데이터 저장, signal() 호출
14:35:48.942 [producer2] [생산 완료] data2 -> [data2]
14:35:48.942 [consumer2] [take] 소비자 깨어남
14:35:48.943 [consumer2] [take] 소비자 데이터 획득, signal() 호출
14:35:48.943 [consumer2] [소비 완료] data2 <- []
14:35:49.047 [producer3] [생산 시도] data3 -> []
14:35:49.048 [producer3] [put] 생산자 데이터 저장, signal() 호출
14:35:49.048 [consumer3] [take] 소비자 깨어남
14:35:49.048 [consumer3] [take] 소비자 데이터 획득, signal() 호출
14:35:49.048 [producer3] [생산 완료] data3 -> [data3]
14:35:49.048 [consumer3] [소비 완료] data3 <- []

14:35:49.152 [     main] 현재 상태 출력, 큐 데이터: []
14:35:49.152 [     main] consumer1: TERMINATED
14:35:49.152 [     main] consumer2: TERMINATED
14:35:49.153 [     main] consumer3: TERMINATED
14:35:49.153 [     main] producer1: TERMINATED
14:35:49.153 [     main] producer2: TERMINATED
14:35:49.153 [     main] producer3: TERMINATED
14:35:49.153 [     main] == [소비자 먼저 실행] 종료, BoundedQueueV5 ==

효과 요약

항목설명
대기 분리생산자, 소비자 스레드가 서로 다른 대기 공간에 들어감
신호 최적화생산자 → 소비자만 깨움 / 소비자 → 생산자만 깨움
효율성 향상불필요한 깨어남 없음 → 반복 대기 감소

핵심 요약

구성 요소설명
notFull생산자가 큐가 꽉 찼을 때 대기
notEmpty소비자가 큐가 비었을 때 대기
signal()정확한 대기 그룹만 선택적으로 깨움
while깨어나도 조건을 다시 검사 (spurious wakeup 대비)

실무에서도 매우 중요

  • Condition을 목적별로 분리해 사용하는 것은 고급 동기화 설계의 핵심입니다.
  • 특히 다중 생산자-소비자 구조에서는 필수 전략입니다.

3. 예제6 - BlockingQueue를 이용한 고수준 동기화

핵심 개념: BlockingQueue

자바가 제공하는 동기화 내장 큐로, 생산자-소비자 구조를 직접 구현하지 않아도 자동으로 락 / 대기 / 신호 처리가 됩니다.

BlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
  • 내부에서 ReentrantLock, Condition, wait/notify를 이미 구현해 둡니다.
  • put()take() 메서드가 자동으로 대기 / 신호 처리

코드가 간결해지고, 락 관리, 대기/신호 처리가 모두 라이브러리 내부에서 처리됩니다.

주요 메서드 비교

메서드설명동작 방식
put()큐가 꽉 차면 자동으로 대기내부적으로 await()
take()큐가 비면 자동으로 대기내부적으로 await()
offer()비동기 삽입, 실패 시 false 반환대기 X
poll()비동기 제거, 실패 시 null 반환대기 X

BlockingQueue 장점

항목설명
코드 복잡도매우 간결
락/조건/대기내부에서 자동 처리
실무 사용생산자-소비자 구조의 표준 구현체
대표 클래스ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue

BlockingQueue 단점

  • 행동의 세부 제어가 어렵습니다
    • 예: 특정 조건에서만 notify, 대기 시간 제한 등은 추가 조정 필요
  • 동작 원리를 알아야 신뢰하고 사용할 수 있습니다

핵심 정리

  • BlockingQueue는 생산자-소비자 문제의 최종 해법에 가까운 고수준 도구입니다.
  • 내부에 모든 동기화 처리 로직이 구현되어 있어, 실무에서도 가장 많이 사용됩니다.

4. BlockingQueue 버전별 비교 (V1 ~ V4)

V1 - 기본 put() / take() 사용

package thread.bounded;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BoundedQueueV6_1 implements BoundedQueue {

  private BlockingQueue<String> queue;

  public BoundedQueueV6_1(int max) {
    queue = new ArrayBlockingQueue<>(max);
  }

  @Override
  public void put(String data) {
    try {
      queue.put(data);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }

  @Override
  public String take() {
    try {
      return queue.take();
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }

  @Override
  public String toString() {
    return queue.toString();
  }
}
15:00:28.523 [     main] == [생산자 먼저 실행] 시작, BoundedQueueV6_1 ==

15:00:28.525 [     main] 생산자 시작
15:00:28.537 [producer1] [생산 시도] data1 -> []
15:00:28.537 [producer1] [생산 완료] data1 -> [data1]
15:00:28.642 [producer2] [생산 시도] data2 -> [data1]
15:00:28.642 [producer2] [생산 완료] data2 -> [data1, data2]
15:00:28.747 [producer3] [생산 시도] data3 -> [data1, data2]

15:00:28.853 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
15:00:28.854 [     main] producer1: TERMINATED
15:00:28.854 [     main] producer2: TERMINATED
15:00:28.854 [     main] producer3: WAITING

15:00:28.854 [     main] 소비자 시작
15:00:28.856 [consumer1] [소비 시도]     ? <- [data1, data2]
15:00:28.856 [producer3] [생산 완료] data3 -> [data2, data3]
15:00:28.856 [consumer1] [소비 완료] data1 <- [data2]
15:00:28.958 [consumer2] [소비 시도]     ? <- [data2, data3]
15:00:28.958 [consumer2] [소비 완료] data2 <- [data3]
15:00:29.064 [consumer3] [소비 시도]     ? <- [data3]
15:00:29.064 [consumer3] [소비 완료] data3 <- []

15:00:29.170 [     main] 현재 상태 출력, 큐 데이터: []
15:00:29.170 [     main] producer1: TERMINATED
15:00:29.171 [     main] producer2: TERMINATED
15:00:29.171 [     main] producer3: TERMINATED
15:00:29.171 [     main] consumer1: TERMINATED
15:00:29.171 [     main] consumer2: TERMINATED
15:00:29.171 [     main] consumer3: TERMINATED
15:00:29.172 [     main] == [생산자 먼저 실행] 종료, BoundedQueueV6_1 ==
15:00:47.647 [     main] == [소비자 먼저 실행] 시작, BoundedQueueV6_1 ==

15:00:47.648 [     main] 소비자 시작
15:00:47.652 [consumer1] [소비 시도]     ? <- []
15:00:47.754 [consumer2] [소비 시도]     ? <- []
15:00:47.860 [consumer3] [소비 시도]     ? <- []

15:00:47.965 [     main] 현재 상태 출력, 큐 데이터: []
15:00:47.966 [     main] consumer1: WAITING
15:00:47.966 [     main] consumer2: WAITING
15:00:47.967 [     main] consumer3: WAITING

15:00:47.967 [     main] 생산자 시작
15:00:47.968 [producer1] [생산 시도] data1 -> []
15:00:47.969 [producer1] [생산 완료] data1 -> [data1]
15:00:47.969 [consumer1] [소비 완료] data1 <- []
15:00:48.071 [producer2] [생산 시도] data2 -> []
15:00:48.072 [consumer2] [소비 완료] data2 <- []
15:00:48.072 [producer2] [생산 완료] data2 -> [data2]
15:00:48.177 [producer3] [생산 시도] data3 -> []
15:00:48.177 [producer3] [생산 완료] data3 -> [data3]
15:00:48.178 [consumer3] [소비 완료] data3 <- []

15:00:48.282 [     main] 현재 상태 출력, 큐 데이터: []
15:00:48.283 [     main] consumer1: TERMINATED
15:00:48.283 [     main] consumer2: TERMINATED
15:00:48.283 [     main] consumer3: TERMINATED
15:00:48.283 [     main] producer1: TERMINATED
15:00:48.283 [     main] producer2: TERMINATED
15:00:48.283 [     main] producer3: TERMINATED
15:00:48.283 [     main] == [소비자 먼저 실행] 종료, BoundedQueueV6_1 ==

put(), take()만 호출의 특징

  • 자바가 제공하는 BlockingQueue의 기본 동작을 사용합니다.

    큐가 가득 차면 자동 대기, 비어 있으면 자동 대기하빈다.

장점

  • 내부적으로 자동으로 wait/notify, 락을 처리합니다.
  • 스레드 간 협력, 대기, 신호가 모두 안정적으로 동작합니다.

한계

  • InterruptedException은 런타임 예외로 던지고 있어 예외 처리 전략이 다소 단순합니다.
    → 실제 서비스에서는 Thread.currentThread().interrupt()로 복구 가능성 고려 필요합니다.

결론: 실무에서 가장 추천되는 안정형 구조입니다.

V2 - offer() / poll() 사용 (비차단 방식)

package thread.bounded;

import static util.MyLogger.log;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BoundedQueueV6_2 implements BoundedQueue {

  private BlockingQueue<String> queue;

  public BoundedQueueV6_2(int max) {
    queue = new ArrayBlockingQueue<>(max);
  }

  @Override
  public void put(String data) {
    boolean result = queue.offer(data);
    log("저장 시도 결과 = " + result);
  }

  @Override
  public String take() {
    return queue.poll();
  }

  @Override
  public String toString() {
    return queue.toString();
  }
}
15:01:08.344 [     main] == [생산자 먼저 실행] 시작, BoundedQueueV6_2 ==

15:01:08.344 [     main] 생산자 시작
15:01:08.355 [producer1] [생산 시도] data1 -> []
15:01:08.355 [producer1] 저장 시도 결과 = true
15:01:08.356 [producer1] [생산 완료] data1 -> [data1]
15:01:08.452 [producer2] [생산 시도] data2 -> [data1]
15:01:08.452 [producer2] 저장 시도 결과 = true
15:01:08.452 [producer2] [생산 완료] data2 -> [data1, data2]
15:01:08.557 [producer3] [생산 시도] data3 -> [data1, data2]
15:01:08.557 [producer3] 저장 시도 결과 = false
15:01:08.558 [producer3] [생산 완료] data3 -> [data1, data2]

15:01:08.663 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
15:01:08.663 [     main] producer1: TERMINATED
15:01:08.664 [     main] producer2: TERMINATED
15:01:08.664 [     main] producer3: TERMINATED

15:01:08.665 [     main] 소비자 시작
15:01:08.666 [consumer1] [소비 시도]     ? <- [data1, data2]
15:01:08.666 [consumer1] [소비 완료] data1 <- [data2]
15:01:08.769 [consumer2] [소비 시도]     ? <- [data2]
15:01:08.769 [consumer2] [소비 완료] data2 <- []
15:01:08.875 [consumer3] [소비 시도]     ? <- []
15:01:08.876 [consumer3] [소비 완료] null <- []

15:01:08.979 [     main] 현재 상태 출력, 큐 데이터: []
15:01:08.979 [     main] producer1: TERMINATED
15:01:08.979 [     main] producer2: TERMINATED
15:01:08.979 [     main] producer3: TERMINATED
15:01:08.980 [     main] consumer1: TERMINATED
15:01:08.980 [     main] consumer2: TERMINATED
15:01:08.980 [     main] consumer3: TERMINATED
15:01:08.980 [     main] == [생산자 먼저 실행] 종료, BoundedQueueV6_2 ==
15:01:30.593 [     main] == [소비자 먼저 실행] 시작, BoundedQueueV6_2 ==

15:01:30.594 [     main] 소비자 시작
15:01:30.602 [consumer1] [소비 시도]     ? <- []
15:01:30.610 [consumer1] [소비 완료] null <- []
15:01:30.717 [consumer2] [소비 시도]     ? <- []
15:01:30.717 [consumer2] [소비 완료] null <- []
15:01:30.823 [consumer3] [소비 시도]     ? <- []
15:01:30.823 [consumer3] [소비 완료] null <- []

15:01:30.928 [     main] 현재 상태 출력, 큐 데이터: []
15:01:30.929 [     main] consumer1: TERMINATED
15:01:30.929 [     main] consumer2: TERMINATED
15:01:30.929 [     main] consumer3: TERMINATED

15:01:30.929 [     main] 생산자 시작
15:01:30.932 [producer1] [생산 시도] data1 -> []
15:01:30.932 [producer1] 저장 시도 결과 = true
15:01:30.932 [producer1] [생산 완료] data1 -> [data1]
15:01:31.034 [producer2] [생산 시도] data2 -> [data1]
15:01:31.034 [producer2] 저장 시도 결과 = true
15:01:31.034 [producer2] [생산 완료] data2 -> [data1, data2]
15:01:31.139 [producer3] [생산 시도] data3 -> [data1, data2]
15:01:31.140 [producer3] 저장 시도 결과 = false
15:01:31.140 [producer3] [생산 완료] data3 -> [data1, data2]

15:01:31.245 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
15:01:31.245 [     main] consumer1: TERMINATED
15:01:31.245 [     main] consumer2: TERMINATED
15:01:31.245 [     main] consumer3: TERMINATED
15:01:31.245 [     main] producer1: TERMINATED
15:01:31.245 [     main] producer2: TERMINATED
15:01:31.245 [     main] producer3: TERMINATED
15:01:31.246 [     main] == [소비자 먼저 실행] 종료, BoundedQueueV6_2 ==

offer() / poll() 사용 (비차단 방식)의 특징

  • 대기 없이 즉시 실패/성공 처리합니다.
  • 성공 여부를 로그로 확인합니다.

장점

  • 스레드 블로킹 없습니다.

    시스템이 멈추지 않고 비동기 처리가 필요한 상황에 적합합니다. (예: 로깅 큐, 이벤트 처리)

한계

  • 큐가 가득 차면 offer()는 실패 (false) → 데이터 손실 가능합니다.

    큐가 비어 있으면 poll()null 반환 → 소비 실패 처리 필요합니다.

결론: 실시간성 우선 또는 유실 허용 가능한 로직에 적합합니다.

V3 - offer() / poll() with timeout 사용

package thread.bounded;

import static util.MyLogger.log;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class BoundedQueueV6_3 implements BoundedQueue {

  private BlockingQueue<String> queue;

  public BoundedQueueV6_3(int max) {
    queue = new ArrayBlockingQueue<>(max);
  }

  @Override
  public void put(String data) {
    try {
      // 대기 시간 설정 가능
      boolean result = queue.offer(data, 1, TimeUnit.NANOSECONDS);
      log("저장 시도 결과 = " + result);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }

  @Override
  public String take() {
    try {
      // 대기 시간 설정 가능
      return queue.poll(2, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }

  @Override
  public String toString() {
    return queue.toString();
  }
}
15:02:03.062 [     main] == [생산자 먼저 실행] 시작, BoundedQueueV6_3 ==

15:02:03.063 [     main] 생산자 시작
15:02:03.071 [producer1] [생산 시도] data1 -> []
15:02:03.072 [producer1] 저장 시도 결과 = true
15:02:03.072 [producer1] [생산 완료] data1 -> [data1]
15:02:03.171 [producer2] [생산 시도] data2 -> [data1]
15:02:03.172 [producer2] 저장 시도 결과 = true
15:02:03.172 [producer2] [생산 완료] data2 -> [data1, data2]
15:02:03.276 [producer3] [생산 시도] data3 -> [data1, data2]
15:02:03.276 [producer3] 저장 시도 결과 = false
15:02:03.276 [producer3] [생산 완료] data3 -> [data1, data2]

15:02:03.381 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
15:02:03.381 [     main] producer1: TERMINATED
15:02:03.382 [     main] producer2: TERMINATED
15:02:03.382 [     main] producer3: TERMINATED

15:02:03.382 [     main] 소비자 시작
15:02:03.383 [consumer1] [소비 시도]     ? <- [data1, data2]
15:02:03.383 [consumer1] [소비 완료] data1 <- [data2]
15:02:03.487 [consumer2] [소비 시도]     ? <- [data2]
15:02:03.488 [consumer2] [소비 완료] data2 <- []
15:02:03.592 [consumer3] [소비 시도]     ? <- []

15:02:03.698 [     main] 현재 상태 출력, 큐 데이터: []
15:02:03.698 [     main] producer1: TERMINATED
15:02:03.699 [     main] producer2: TERMINATED
15:02:03.699 [     main] producer3: TERMINATED
15:02:03.699 [     main] consumer1: TERMINATED
15:02:03.699 [     main] consumer2: TERMINATED
15:02:03.699 [     main] consumer3: TIMED_WAITING
15:02:03.700 [     main] == [생산자 먼저 실행] 종료, BoundedQueueV6_3 ==
15:02:31.702 [     main] == [소비자 먼저 실행] 시작, BoundedQueueV6_3 ==

15:02:31.704 [     main] 소비자 시작
15:02:31.711 [consumer1] [소비 시도]     ? <- []
15:02:31.823 [consumer2] [소비 시도]     ? <- []
15:02:31.928 [consumer3] [소비 시도]     ? <- []

15:02:32.033 [     main] 현재 상태 출력, 큐 데이터: []
15:02:32.034 [     main] consumer1: TIMED_WAITING
15:02:32.034 [     main] consumer2: TIMED_WAITING
15:02:32.035 [     main] consumer3: TIMED_WAITING

15:02:32.035 [     main] 생산자 시작
15:02:32.037 [producer1] [생산 시도] data1 -> []
15:02:32.038 [consumer1] [소비 완료] data1 <- []
15:02:32.038 [producer1] 저장 시도 결과 = true
15:02:32.038 [producer1] [생산 완료] data1 -> []
15:02:32.139 [producer2] [생산 시도] data2 -> []
15:02:32.139 [producer2] 저장 시도 결과 = true
15:02:32.139 [consumer2] [소비 완료] data2 <- []
15:02:32.139 [producer2] [생산 완료] data2 -> []
15:02:32.244 [producer3] [생산 시도] data3 -> []
15:02:32.245 [producer3] 저장 시도 결과 = true
15:02:32.245 [consumer3] [소비 완료] data3 <- []
15:02:32.245 [producer3] [생산 완료] data3 -> []

15:02:32.349 [     main] 현재 상태 출력, 큐 데이터: []
15:02:32.349 [     main] consumer1: TERMINATED
15:02:32.349 [     main] consumer2: TERMINATED
15:02:32.349 [     main] consumer3: TERMINATED
15:02:32.349 [     main] producer1: TERMINATED
15:02:32.350 [     main] producer2: TERMINATED
15:02:32.350 [     main] producer3: TERMINATED
15:02:32.350 [     main] == [소비자 먼저 실행] 종료, BoundedQueueV6_3 ==

특징

  • 제한된 시간 동안 대기하고, 시간이 지나면 포기합니다.

    유연한 동작 제어가 가능합니다.

장점

  • 블로킹을 완전히 피하지 않으면서도, 시스템 정지 없이 일정 시간만 기다립니다.

    예측 가능한 반응 시간 확보가 필수입니다.

한계

  • 타임아웃을 짧게 설정하면 V6_2와 유사한 비동기성
  • 타임아웃을 길게 설정하면 V6_1처럼 동기화 처리

결론: 반응성과 안정성의 균형을 원하는 경우 이상적입니다.

V4 - add() / remove() 사용 (예외 기반 처리)

package thread.bounded;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BoundedQueueV6_4 implements BoundedQueue {

  private BlockingQueue<String> queue;

  public BoundedQueueV6_4(int max) {
    queue = new ArrayBlockingQueue<>(max);
  }

  @Override
  public void put(String data) {
    queue.add(data); // java.lang.IllegalStateException: Queue full
  }

  @Override
  public String take() {
    return queue.remove(); // java.util.NoSuchElementException
  }

  @Override
  public String toString() {
    return queue.toString();
  }
}
15:02:50.290 [     main] == [생산자 먼저 실행] 시작, BoundedQueueV6_4 ==

15:02:50.291 [     main] 생산자 시작
15:02:50.301 [producer1] [생산 시도] data1 -> []
15:02:50.301 [producer1] [생산 완료] data1 -> [data1]
15:02:50.400 [producer2] [생산 시도] data2 -> [data1]
15:02:50.401 [producer2] [생산 완료] data2 -> [data1, data2]
15:02:50.505 [producer3] [생산 시도] data3 -> [data1, data2]
Exception in thread "producer3" java.lang.IllegalStateException: Queue full
	at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
	at java.base/java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:329)
	at thread.bounded.BoundedQueueV6_4.put(BoundedQueueV6_4.java:16)
	at thread.bounded.ProducerTask.run(ProducerTask.java:18)
	at java.base/java.lang.Thread.run(Thread.java:1575)

15:02:50.610 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
15:02:50.611 [     main] producer1: TERMINATED
15:02:50.611 [     main] producer2: TERMINATED
15:02:50.611 [     main] producer3: TERMINATED

15:02:50.611 [     main] 소비자 시작
15:02:50.614 [consumer1] [소비 시도]     ? <- [data1, data2]
15:02:50.614 [consumer1] [소비 완료] data1 <- [data2]
15:02:50.718 [consumer2] [소비 시도]     ? <- [data2]
15:02:50.718 [consumer2] [소비 완료] data2 <- []
15:02:50.824 [consumer3] [소비 시도]     ? <- []
Exception in thread "consumer3" java.util.NoSuchElementException
	at java.base/java.util.AbstractQueue.remove(AbstractQueue.java:117)
	at thread.bounded.BoundedQueueV6_4.take(BoundedQueueV6_4.java:21)
	at thread.bounded.ConsumerTask.run(ConsumerTask.java:16)
	at java.base/java.lang.Thread.run(Thread.java:1575)

15:02:50.930 [     main] 현재 상태 출력, 큐 데이터: []
15:02:50.930 [     main] producer1: TERMINATED
15:02:50.930 [     main] producer2: TERMINATED
15:02:50.930 [     main] producer3: TERMINATED
15:02:50.931 [     main] consumer1: TERMINATED
15:02:50.931 [     main] consumer2: TERMINATED
15:02:50.931 [     main] consumer3: TERMINATED
15:02:50.931 [     main] == [생산자 먼저 실행] 종료, BoundedQueueV6_4 ==
15:03:09.760 [     main] == [소비자 먼저 실행] 시작, BoundedQueueV6_4 ==

15:03:09.761 [     main] 소비자 시작
15:03:09.767 [consumer1] [소비 시도]     ? <- []
Exception in thread "consumer1" java.util.NoSuchElementException
	at java.base/java.util.AbstractQueue.remove(AbstractQueue.java:117)
	at thread.bounded.BoundedQueueV6_4.take(BoundedQueueV6_4.java:21)
	at thread.bounded.ConsumerTask.run(ConsumerTask.java:16)
	at java.base/java.lang.Thread.run(Thread.java:1575)
15:03:09.878 [consumer2] [소비 시도]     ? <- []
Exception in thread "consumer2" java.util.NoSuchElementException
	at java.base/java.util.AbstractQueue.remove(AbstractQueue.java:117)
	at thread.bounded.BoundedQueueV6_4.take(BoundedQueueV6_4.java:21)
	at thread.bounded.ConsumerTask.run(ConsumerTask.java:16)
	at java.base/java.lang.Thread.run(Thread.java:1575)
15:03:09.985 [consumer3] [소비 시도]     ? <- []
Exception in thread "consumer3" java.util.NoSuchElementException
	at java.base/java.util.AbstractQueue.remove(AbstractQueue.java:117)
	at thread.bounded.BoundedQueueV6_4.take(BoundedQueueV6_4.java:21)
	at thread.bounded.ConsumerTask.run(ConsumerTask.java:16)
	at java.base/java.lang.Thread.run(Thread.java:1575)

15:03:10.090 [     main] 현재 상태 출력, 큐 데이터: []
15:03:10.091 [     main] consumer1: TERMINATED
15:03:10.091 [     main] consumer2: TERMINATED
15:03:10.091 [     main] consumer3: TERMINATED

15:03:10.091 [     main] 생산자 시작
15:03:10.094 [producer1] [생산 시도] data1 -> []
15:03:10.094 [producer1] [생산 완료] data1 -> [data1]
15:03:10.197 [producer2] [생산 시도] data2 -> [data1]
15:03:10.197 [producer2] [생산 완료] data2 -> [data1, data2]
15:03:10.302 [producer3] [생산 시도] data3 -> [data1, data2]
Exception in thread "producer3" java.lang.IllegalStateException: Queue full
	at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
	at java.base/java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:329)
	at thread.bounded.BoundedQueueV6_4.put(BoundedQueueV6_4.java:16)
	at thread.bounded.ProducerTask.run(ProducerTask.java:18)
	at java.base/java.lang.Thread.run(Thread.java:1575)

15:03:10.408 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
15:03:10.409 [     main] consumer1: TERMINATED
15:03:10.409 [     main] consumer2: TERMINATED
15:03:10.409 [     main] consumer3: TERMINATED
15:03:10.409 [     main] producer1: TERMINATED
15:03:10.409 [     main] producer2: TERMINATED
15:03:10.409 [     main] producer3: TERMINATED
15:03:10.410 [     main] == [소비자 먼저 실행] 종료, BoundedQueueV6_4 ==

특징

  • 큐의 상태를 예외로 판단
    (Queue full, NoSuchElementException)

장점

  • 예외 발생으로 문제 상황을 명확히 감지가 가능합니다.
  • 예외를 catch 하여 로깅하거나 특정 로직 실행이 가능합니다.

한계

  • 예외는 비용이 크고, 흐름 제어에 적합하지 않습니다.
  • 실무에서는 예외를 정상 흐름으로 사용하는 것은 지양합니다.

결론: 예외 상황 테스트나 교육용 데모에는 적합하지만, 실제 서비스 로직에는 추천하지 않습니다.

최종 비교 요약표

버전사용 메서드대기 방식장점단점적합한 상황
V6_1put() / take()무제한 대기가장 안정적, 동기화 완비인터럽트 처리 미흡일반적인 생산자-소비자 처리
V6_2offer() / poll()대기 없음빠름, 유실 감수데이터 손실 가능성로깅, 모니터링 큐
V6_3offer(timeout) / poll(timeout)제한 대기반응성과 안정성의 균형타임아웃 튜닝 필요실시간 + 안정성 요구
V6_4add() / remove()예외로 제어에러 감지 용이예외 남발 비효율테스트, 예외 케이스 학습용

ddsss

실무 팁:

  • 기본은 V6_1
  • 고속 처리를 원하면 V6_2 or V6_3
  • 예외 감지는 V6_4는 피하고, offer() + 결과 확인으로 대체 추천
profile
🤔오늘의 복습은❓

0개의 댓글