[Java] 생산자-소비자 문제와 BlockingQueue

MEUN·2024년 11월 19일
0

이전 글

[Java] 생산자-소비자 문제와 Object 클래스



개요

이전에는 Object 클래스를 통해 생산자-소비자 문제를 해결하려 하였으나 아래 문제점이 존재했었다.

  • 비효율 : 특정 Thread를 깨울 수 없어 동일 타입의 Thread를 깨울 때 비효율 발생 가능
  • 기아 상태 : 특정 Thread를 깨울 수 없어 특정 스레드를 제외하고 깨울 수 있는 상황 발생 가능

이번에는 또 다른 해결 방법인 ConditionBlockingQueue를 알아본다.



Condition 인터페이스

Object의 모니터 락 관련 메서드를 별도의 객체로 분리하여 Lock 구현체와 함께 사용 가능하도록 함으로써 객체당 여러 개의 대기 집합을 갖도록 한다.
Lock 인터페이스의 newCondition() 통하여 생성 가능하며, 생성 시 해당 Lock의 스레드 대기 공간을 생성할 수 있다.
Condition을 생산자와 소비자 각각 생성함으로써 특정 타입의 Thread를 깨울 수 있도록 처리할 수 있다.

public interface Condition {

    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}

주요 메서드

await()

void await() throws InterruptedException
  • 현재 스레드를 WAITING 상태로 condition에 보관
  • 호출 시 인터럽트 플래그가 설정되어 있거나 대기 중 인터럽트 발생 시 InterruptedException 발생하고 인터럽트 플래그가 지워짐
  • Object.wait()과 유사

signal()

void signal()
  • condition에서 대기중인 스레드 중 하나를 깨움
  • Object.notify()와 유사

예제

 class BoundedBuffer<E> {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); // 생산자 Thread 대기
   final Condition notEmpty = lock.newCondition(); // 소비자 Thread 대기

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(E x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await(); // 생산자 Thread 대기
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal(); // 소비자 Thread 깨움
     } finally {
       lock.unlock();
     }
   }

   public E take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)
         notEmpty.await(); // 소비자 Thread 대기
       E x = (E) items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal(); // 생산자 Thread 깨움
       return x;
     } finally {
       lock.unlock();
     }
   }
 }


BlockingQueue 인터페이스

일반적인 Queue 대비 아래 기능을 추가로 지원하는 큐이다.

  • 요소 검색 시 큐가 비어 있지 않을 때까지 기다리는 작업
  • 요소 저장 시 큐에 공간이 확보될 때까지 기다리는 작업

public interface BlockingQueue<E> extends Queue<E> {

    boolean add(E e);
    boolean offer(E e);
    void put(E e) throws InterruptedException;
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    E take() throws InterruptedException;
    E poll(long timeout, TimeUnit unit) throws InterruptedException;
    int remainingCapacity();
    boolean remove(Object o);
    boolean contains(Object o);
    int drainTo(Collection<? super E> c);
    int drainTo(Collection<? super E> c, int maxElements);
}

기능별 주요 메서드

대기 시 동작과 기능에 따라 아래와 같이 분류된다.

출처: https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/BlockingQueue.html

대기 시 동작

즉시 락을 획득할 수 없는 경우 처리 방법에 따라 아래와 같이 네 가지로 나뉜다.

  • Throws exception : 대기 시 예외 발생
  • Special value : 연산에 따라 null 또는 false 리턴
  • Blocks : 연산 성공 시점까지 무기한 대기
  • Times Out : 주어진 최대 시간 제한 동안 대기 후 null 또는 false 리턴

주요 구현체

BlockingQueue 인터페이스가 Queue 인터페이스를 상속 받으므로 구현체 모두 Collection Framework에 해당된다.

ArrayBlockingQueue

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
        
    final Object[] items;
    int takeIndex;
    int putIndex;
    int count;
    
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
}
  • 요소를 FIFO(선입선출) 방식으로 정렬
  • 새 요소는 대기열의 끝에 삽입
  • 고정된 크기의 배열에 요소를 보관하는 전형적인 Bounded Buffer

LinkedBlockingQueue

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
        
     static class Node<E> {
        E item;
        
        Node<E> next;

        Node(E x) { item = x; }
    }
    
    private final int capacity;
    transient Node<E> head;
    private transient Node<E> last;
    
    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();
}
  • 연결된 노드 기반으로 바인딩된 Bounded Buffer
  • 요소를 FIFO(선입선출) 방식으로 정렬
  • 새 요소는 큐의 끝에 삽입
  • 생성 시 사이즈를 지정하지 않으면 사이즈는 Integer.MAX_VALUE



참고 자료

0개의 댓글