이전에는 Object
클래스를 통해 생산자-소비자 문제를 해결하려 하였으나 아래 문제점이 존재했었다.
이번에는 또 다른 해결 방법인 Condition
과 BlockingQueue
를 알아본다.
Condition
인터페이스Object
의 모니터 락 관련 메서드를 별도의 객체로 분리하여 Lock
구현체와 함께 사용 가능하도록 함으로써 객체당 여러 개의 대기 집합을 갖도록 한다.
Lock
인터페이스의 newCondition()
통하여 생성 가능하며, 생성 시 해당 Lock의 스레드 대기 공간을 생성할 수 있다.
Condition
을 생산자와 소비자 각각 생성함으로써 특정 타입의 Thread를 깨울 수 있도록 처리할 수 있다.
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
await()
void await() throws InterruptedException
WAITING
상태로 condition
에 보관InterruptedException
발생하고 인터럽트 플래그가 지워짐Object.wait()
과 유사signal()
void signal()
condition
에서 대기중인 스레드 중 하나를 깨움Object.notify()
와 유사 class BoundedBuffer<E> {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition(); // 생산자 Thread 대기
final Condition notEmpty = lock.newCondition(); // 소비자 Thread 대기
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(E x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await(); // 생산자 Thread 대기
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal(); // 소비자 Thread 깨움
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await(); // 소비자 Thread 대기
E x = (E) items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal(); // 생산자 Thread 깨움
return x;
} finally {
lock.unlock();
}
}
}
BlockingQueue
인터페이스일반적인 Queue
대비 아래 기능을 추가로 지원하는 큐이다.
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
boolean remove(Object o);
boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
대기 시 동작과 기능에 따라 아래와 같이 분류된다.
즉시 락을 획득할 수 없는 경우 처리 방법에 따라 아래와 같이 네 가지로 나뉜다.
Throws exception
: 대기 시 예외 발생Special value
: 연산에 따라 null
또는 false
리턴Blocks
: 연산 성공 시점까지 무기한 대기Times Out
: 주어진 최대 시간 제한 동안 대기 후 null
또는 false
리턴BlockingQueue
인터페이스가 Queue
인터페이스를 상속 받으므로 구현체 모두 Collection Framework에 해당된다.
ArrayBlockingQueue
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
}
Bounded Buffer
LinkedBlockingQueue
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
private final int capacity;
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
}
Bounded Buffer
Integer.MAX_VALUE