생산자 소비자 문제는 멀티스레드 프로그래밍에서 자주 등장하는 문제중 하나로 여러 스레드가 동시에 데이터를 생산하고 소비하는 상황을 다룬다. 멀티스레드의 핵심을 제대로 이해하려면 반드시 생산자 소비자 문제를 이해하고, 올바른 해결 방안도 함께 알아두어야 한다.
기본 개념
문제 상황
public class BoundedQueueV1 implements BoundedQueue {
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV1(int max) {
this.max = max;
}
@Override
public synchronized void put(String data) {
if (queue.size() == max) {
log("[put] 큐가 가득 참, 버림: " + data);
return;
}
queue.offer(data);
}
@Override
public synchronized String take() {
if (queue.isEmpty()) {
return null;
}
return queue.poll();
}
@Override
public String toString() {
return queue.toString();
}
}
BoundedQueueV1
: 한정된 버퍼 역할을 하는 가장 단순한 구현체이다. 이후에 버전이 점점 올라가면서 코드를 개선한다.Queue
,ArrayDeque
: 데이터를 중간에 보관하는 버퍼로 큐를 사용한다. 구현체로는 ArrayDeque
를 사용한다.int max
: 한정된 버퍼이므로 버퍼에 저장할 수 있는 최대 크기를 지정한다.put()
: 큐에 데이터를 저장한다. 큐가 가득 찬 경우 더는 데이터를 보관할 수 없으므로 데이터를 버린다.take()
: 큐의 데이터를 가져간다. 큐에 데이터가 없는 경우 null
을 반환한다.toString()
: 버퍼 역할을 하는 queue
정보를 출력한다.import static util.MyLogger.log;
public class ProducerTask implements Runnable {
private BoundedQueue queue;
private String request;
public ProducerTask(BoundedQueue queue, String request) {
this.queue = queue;
this.request = request;
}
@Override
public void run() {
log("[생산 시도] " + request + " -> " + queue);
queue.put(request);
log("[생산 완료] " + request + " -> " + queue);
}
}
Runnable
을 구현한다.public class ConsumerTask implements Runnable {
private BoundedQueue queue;
public ConsumerTask(BoundedQueue queue) {
this.queue = queue;
}
@Override
public void run() {
log("[소비 시도] ? <- " + queue);
String data = queue.take();
log("[소비 완료] " + data + " <- " + queue);
}
}
Runnable
을 구현한다.public class BoundedMain {
public static void main(String[] args) {
// 1. BoundedQueue 선택
BoundedQueue queue = new BoundedQueueV1(2);
// 2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택!
producerFirst(queue); // 생산자 먼저 실행
//consumerFirst(queue); // 소비자 먼저 실행
}
private static void producerFirst(BoundedQueue queue) {
log("== [생산자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + "==");
List<Thread> threads = new ArrayList<>();
startProducer(queue, threads);
printAllState(queue, threads);
startConsumer(queue, threads);
printAllState(queue, threads);
log("== [생산자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + "==");
}
private static void consumerFirst(BoundedQueue queue) {
log("== [소비자 먼저 실행] 시작, " + queue.getClass().getSimpleName()+ "==");
List<Thread> threads = new ArrayList<>();
startConsumer(queue, threads);
printAllState(queue, threads);
startProducer(queue, threads);
printAllState(queue, threads);
log("== [소비자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
}
private static void startProducer(BoundedQueue queue, List<Thread> threads){
System.out.println(); log("생산자 시작");
for (int i = 1; i <= 3; i++) {
Thread producer = new Thread(new ProducerTask(queue, "data" + i),
"producer" + i);
threads.add(producer);
producer.start();
sleep(100);
}
}
private static void startConsumer(BoundedQueue queue, List<Thread> threads){
System.out.println();
log("소비자 시작");
for (int i = 1; i <= 3; i++) {
threads.add(consumer);
consumer.start();
sleep(100);
}
}
}
producerFirst 코드 분석
threads
: 스레드의 결과 상태를 한꺼번에 출력하기 위해 생성한 스레드를 보관해둠startProducer
: 생산자 스레드를 3개 만들어서 실행한다.startConsumer
: 소비자 스레드를 3개 만들어서 실행한다.p
는 생산자 스레드를 뜻한다.c
는 소비자 스레드를 뜻한다.synchronized
영역을 뜻한다. 스레드가 이 영역에서 들어가려면 모니터 락이 필요하다.생산자 스레드 실행 시작
boundedQueue
에 저장을 완료했다.put()
내부에서 data3은 버린다.소비자 스레드 실행 시작
null
을 반환받는다.p3
이 보관하는 data3
은 버려지고 c3
은 데이터를 받지 못한다.c1,c2,c3
은 데이터를 받지 못한다. 그리고 p3
이 보관하는data3
은 버려진다. Object-wait,notify
synchronized
를 사용한 임계영역에서 락을 가지고 무한 대기하는 문제는 Object
클래스에 해결방안이 있다. Object
클래스는 무한 대기 문제를 해결하는 wait(),notify()
라는 메서드를 제공한다.
Object.wait()
Waiting
)Waiting
) 상태로 전환한다. 이 메서드는 현재 스레드가 synchronized
블록이나 메서드에서 락을 소유하고 있을 때만 호출할 수 있다. 호출한 스레드는 락을 반납하고 다른 스레드가 해당 락을 획득 할 수 있도록한다. 이렇게 대기 상태로 전환된 스레드는 다른 스레드가 notify()
/notifyAll()
을 호출할 때 까지 대기상태를 유지한다.Object.notify()
synchronized
블록이나 메서드에서 호출되어야한다. 깨운 스레드는 다시 락을 획득할 기회를 얻게된다. 만약 대기중인 스레드가 여러개라면 그중 하나만이 깨워지게 된다.Object.notifyAll()
synchronized
블럭이나 메서드에서 호출되어야 하며, 모든 대기 중인 스레드가 락을 획득할 기회를 얻게 된다. 코드 변경
public synchronized void put(String data) {
while (queue.size() == max) {
log("[put] 큐가 가득 참, 생산자 대기");
try {
wait(); // RUNNABLE -> WAITING, 락 반납
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, notify() 호출");
notify();
// 대기 스레드, WAIT -> BLOCKED //notifyAll(); // 모든 대기 스레드, WAIT -> BLOCKED
}
public synchronized String take() {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 소비자 대기");
try {
wait();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, notify() 호출");
notify(); // 대기 스레드, WAIT -> BLOCKED
//notifyAll(); // 모든 대기 스레드, WAIT -> BLOCKED return data;
put(data) - wait(), notify()
synchronized
를 통해 임계영역을 설정한다. 생산자 스레드는 락 획득을 시도한다.Object.wait()
를 사용해서 대기한다. 참고로 대기할 때는 락을 반납하고 대기한다. 그리고 대기에서 깨어나면 반복문에서 큐의 빈공간을 체크한다.wait()
를호출해서대기하는경우 RUNNABLE
WAITING
상태가된다.notify()
를 통해 저장된 데이터가 있다고 대기하는 스레드에 알려주어 야 한다. 예를 들어서 큐에 데이터가 없어서 대기하는 소비자 스레드가 있다고 가정하자. 이때 notify()
를 호 출하면 소비자 스레드는 깨어나서 저장된 데이터를 획득할 수 있다.take() - wait(), notify()
synchronized
를 통해 임계 영역을 설정한다. 소비자 스레드는 락 획득을 시도한다.Object.wait()
을 사용해서 대기한다. 참고로 대기할 때 락을 반납하고 대기한다. 그리고 대기 상태에서 깨 어나면, 다시 반복문에서 큐에 데이터가 있는지 체크한다.RUNNABLE
WAITING
상태가 된다.notify()
를 통해 큐에 저장할 여유 공간이 생겼다고, 대기하는 스레드에게 알려주어야 한다. 예를 들어서 큐에 데이터가 가득 차서 대기하는 생산자 스레드가 있다고 가정하자. 이때notify()
를 호출하면 생산자 스레드는 깨어나서 데이터를 큐에 저장할 수 있다.
Object - wait, notify - 한계
지금까지 살펴본 Object.wait()
, Object.notify()
방식은 스레드 대기 집합 하나에 생산자, 소비자 스레드를 모두 관리한다. 그리고 notify()
를 호출할 때 임의의 스레드가 선택된다. 따라서 앞서 살펴본 것 처럼 큐에 데이터가 없는 상황에 소비자가 같은 소비자를 깨우는 비효율이 발생할 수 있다. 또는 큐에 데이터가 가득 차있는데 생산자가 같 은 생산자를 깨우는 비효율도 발생할 수 있다.
public class BoundedQueueV4 implements BoundedQueue {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV4(int max) {
this.max = max;
}
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("[put] 큐가 가득 참, 생산자 대기");
try {
condition.await();
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, signal() 호출");
condition.signal();
} finally {
lock.unlock();
}
}
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 소비자 대기");
try {
condition.await();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, signal() 호출");
condition.signal();
return data;
} finally {
lock.unlock();
}
}
}
Condition
Condition condition = lock.newCondition()
에서 Condition
은 ReentarantLock
을 사용하는 스레드가 대기하는 스레드 공간이다.
condition.await()
Object.wait()
와 유사한 기능이다. 지정한 condition
에 현재 스레드를 대기(Waiting
)상태로 보관한다. 이때 ReentrantLock
에서 획득한 락을 반납하고 대기 상태로 condition
에 보관된다.
condition.signal()
Object.notify()
와 유사한 가능이다. 지정한 condition
에서 대기중인 스레드를 하나 깨운다. 깨어난 스레드는 condition
에서 빠져나온다.
private final Lock lock = new ReentrantLock();
private final Condition producerCond = lock.newCondition();
private final Condition consumerCond = lock.newCondition();
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("[put] 큐가 가득 참, 생산자 대기");
try {
producerCond.await();
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, consumerCond.signal() 호출");
consumerCond.signal();
} finally {
lock.unlock();
}
}
@Override
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 소비자 대기");
try {
consumerCond.await();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, producerCond.signal() 호출");
producerCond.signal();
return data;
} finally {
lock.unlock();
}
}
Condition 분리
cinsumerCond:
생산자를 위한 스레드 대기 공간producerCond:
소비자를 위한 스레드 대기 공간put(data) - 생산자 스레드가 호출
producerCond.await()
를 호출해서 생산자 스레드를 생산자 전용 스레드 대기 공간에 보관한다.consumerCond.signal()
를 호출해서 소비자 전용 스레드 대기 공간에 신호를 보낸다. 이렇게 하면 대기 중인 소비자 스레드가 하나 깨어나서 데이터를 소비할 수 있다.take() - 소비자 스레드가 호출
consumerCond.await()
를 호출해서 소비자 스레드를 소비자 전용 스레드 대기 공간에 보관한다.producerCond.signal()
를 호출해서 생산자 전용 스레드 대기 공간에 신호를 보낸다.synchronized 대기
Blocked
상태로 락 획득 대기synchronized
를 시작할 때 락이 없으면 대기synchronized
를 빠져나갈 때 대기가 풀리며 락획득 시도Waiting
상태로 대기wait()
를 호출했을 때 스레드 대기 집한에서 대기notify()
를 호출했을 때 빠져나감ReentrantLock 대기
ReentrantLcok
의 대기 큐에서 관리Waiting
상태로 락획득 대기lock.lock
을 호출했을 때 락이 없으면 대기lock.unlock()
을 호출했을 때 대기가 풀리며 락 획득 시도, 락을 획득하면 대기 큐를 빠져 나감condition.await()
를 호출했을 때 condition
객체의 스레드 공간에서 관리Waiting
상태로 대기condition.signal()
을 호출했을 때 condition
객체의 스레드 대기 공간에서 빠져나감Throws Exception - 대기시 예외
IllegalStateException
예외를 던진다.NoSuchElementException
예외를 던진 다.NoSuchElementException
예외를 던진다.Special Value - 대기시 즉시 반환
false
를 반환한다.null
을 반환한다.null
을 반환한다.Blocks - 대기
Times Out - 시간 대기
false
를 반환한다.null
을 반환한다.