생산자 스레드가 대기하는 곳과, 소비사 스레드가 대기하는 곳을 나누면 비효율성 문제를 해결!
Lock, ReentrantLock을 사용
//아직 공간을 분리하지 않아 비효율성 문제는 발생
public class BoundedQueueV4 implements BoundedQueue {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition(); //ReentrantLock을 사용하는 스레드가 기다리는 대기 집합
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV4(int max) {
this.max = max;
}
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("[put] 큐가 가득 참, 생산자 대기");
try {
condition.await(); //condition이라는 곳에 들어가서 대기 WAITING
log("[put] 생산자가 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, signal() 호출");
condition.signal(); //signal을 주면 대기하던 스레드가 깨어난다.
} finally {
lock.unlock();
}
}
@Override
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 소비자 대기");
try {
condition.await();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, signal() 호출");
condition.signal();
return data;
} finally {
lock.unlock();
}
}
@Override
public String toString() {
return queue.toString();
}
}
Condition
ReentrantLock이 스레드가 대기하는 공간.
lock.newCondition() 메서드를 호출, 스레드 대기 공간 형성
condition.await()
현재 스레드를 대기상태로 보관
ReentrantLock에서 획득한 락을 반납하고 대기 상태로 condition에 보관
condition.signal()
condition에서 대기중인 스레드를 하나 깨움.

lock은 synchornized에서 사용하는 객체 내부에 있는 모니터 락이 아니라, ReentrantLock을 의미

//생산자는 소비자를 깨우고, 소비자는 생산자를 깨운다.
public class BoundedQueueV5 implements BoundedQueue {
private final Lock lock = new ReentrantLock();
private final Condition producerCond = lock.newCondition(); //생산자 대기 공간
private final Condition consumerCond = lock.newCondition(); //소비자 대기 공간
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV5(int max) {
this.max = max;
}
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("[put] 큐가 가득 참, 생산자 대기");
try {
producerCond.await(); //condition이라는 곳에 들어가서 대기 WAITING
log("[put] 생산자가 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, consumerCond() 호출");
consumerCond.signal(); //signal을 주면 대기하던 스레드가 깨어난다.
} finally {
lock.unlock();
}
}
@Override
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 소비자 대기");
try {
consumerCond.await();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, producerCond() 호출");
producerCond.signal();
return data;
} finally {
lock.unlock();
}
}
@Override
public String toString() {
return queue.toString();
}
}
Condition 분리
consumerCond : 생산자를 위한 스레드 대기 공간
producerCond : 소비자를 위한 스레드 대기 공간
put(data) - 생산자 스레드 호출
producerCond.await() 호출을 통해 생산자 스레드를 생산자 전용 스레드 대기 공간에 보관
consumerCond.signal() 호출해서 소비자 전용 스레드 대기 공간에 신호 보냄
take() - 소비자 스레드가 호출
consumerCond.await() : 큐가 비었을 경우 소비자 스레드를 소비자 전용 스레드 대기 공간에 보관
producerCond.signal() : 소비자가 데이터를 소비한 경우, 큐에 여유공간이 생김. 생산자 전용 스레드 대기 공간에 신호를 보냄
Object.notify() vs Condition.signal()
Object.notify
대기중인 스레드 중 임의의 스레드를 선택해서 깨움, 순서 보장 X
synchronized block 내에서 모니터 락을 갖고 잇는 스레드가 호출
Condition.signal
대기중인 스레드중 하나를 깨우며, 일반적으로 FIFO 순서로 깨움
synchronized 대기
락 획득 대기
wait

개념상으로 보면, 락 대기 집합이 1차 대기소, 스레드 대기 집합이 2차 대기소
2차를 거쳐 1차 대기소까지 나와야 임계영역에서 로직을 수행 할 수 있다.
ReentrantLock 대기

ReentrantLock 락 획득 대기
await() 대기
ReentrantLock 또한 2차 대기소
데이터 추가 차단
package java.util.concurrent;
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
boolean remove(Object o);
/
대표적 구현체
public class BoundedQueueV6_1 implements BoundedQueue {
private BlockingQueue<String> queue; //스레드를 강제로 block 시킬 수 있는 것
public BoundedQueueV6_1(int max) {
queue = new ArrayBlockingQueue<String>(max); //size를 넣어줘야 함
}
@Override
public void put(String data) {
try {
queue.put(data);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String take() {
try {
return queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String toString() {
return queue.toString();
}
}
ArrayBlockingQueue.put()
public class ArrayBlockingQueue {
final Object[] items;
int count;
ReentrantLock lock;
Condition notEmpty; //소비자 스레드가 대기하는 condition
Condition notFull; //생산자 스레드가 대기하는 condition
public void put(E e) throws InterruptedException {
lock.lockInterruptibly();
try {
while (count == items.length) {
notFull.await();
}
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E e) {
items[putIndex] = e;
count++;
notEmpty.signal();
}
}
실무에서 멀티스레드를 사용할 때는 응답성이 중요
큐가 가득 찼을 때 생각할 수 있는 선택지는 4가지가 있다.
1. 예외를 던진다. 예외를 받아서 처리한다.
2. 대기하지 않는다. 즉시 false를 반환한다.
3. 대기한다.
4. 특정 시간 만큼만 대기한다.

Throws Exception - 대기시 예외
IllegalStateException 예외를 던짐 NoSuchElementException 을 던짐 NoSuchElementException 를 반환 Special Value - 대기시 즉시 반환
false 를 반환null 을 반환null 을 반환.Blocks - 대기
Times Out - 시간 대기
false 를 반환한다.null 을 반환한다.public class BoundedQueueV6_2 implements BoundedQueue {
private BlockingQueue<String> queue; //스레드를 강제로 block 시킬 수 있는 것
public BoundedQueueV6_2(int max) {
queue = new ArrayBlockingQueue<String>(max); //size를 넣어줘야 함
}
@Override
public void put(String data) {
boolean rst = queue.offer(data);
// 버퍼가 가득 차있는 경우 데이터를 추가하지 않고 바로 false
log("저장 시도 결과 = " + rst);
}
@Override
public String take() {
// 버퍼가 비어있는 경우 null
return queue.poll();
}
@Override
public String toString() {
return queue.toString();
}
}
public class BoundedQueueV6_3 implements BoundedQueue {
private BlockingQueue<String> queue; //스레드를 강제로 block 시킬 수 있는 것
public BoundedQueueV6_3(int max) {
queue = new ArrayBlockingQueue<String>(max); //size를 넣어줘야 함
}
@Override
public void put(String data) {
try {
//대기 시간 설정 가능
boolean rst = queue.offer(data, 1, TimeUnit.NANOSECONDS);
log("저장 시도 결과 = " + rst);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String take() {
try {
//대기 시간 설정 가능
return queue.poll(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String toString() {
return queue.toString();
}
}
public class BoundedQueueV6_4 implements BoundedQueue {
private BlockingQueue<String> queue; //스레드를 강제로 block 시킬 수 있는 것
public BoundedQueueV6_4(int max) {
queue = new ArrayBlockingQueue<String>(max); //size를 넣어줘야 함
}
@Override
public void put(String data) {
queue.add(data); //IllegalStateException : Queue full
}
@Override
public String take() {
return queue.remove(); //NoSuchElementException
}
@Override
public String toString() {
return queue.toString();
}
}