멀티스레드 프로그래밍에서 자주 등장하는 동시성 문제중 하나, 여러 스레드가 동시에 데이터를 생산, 소비하는 상황을 다룸
생산자 소비자 문제를 이해, 올바른 해결방안 함께 알아둬야 함

문제상황
public interface BoundedQueue {
void put(String data);
//버퍼에 보관된 값 가져감
String take();
}
public class ConsumerTask implements Runnable{
private BoundedQueue boundedQueue;
public ConsumerTask(BoundedQueue boundedQueue) {
this.boundedQueue = boundedQueue;
}
@Override
public void run() {
log("[소비 시도] ? <- " + boundedQueue);
String data = boundedQueue.take();
log("[소비 완료] ? <- " + boundedQueue);
}
}
public class ProducerTask implements Runnable{
private BoundedQueue queue;
private String request;
public ProducerTask(BoundedQueue queue, String request) {
this.queue = queue;
this.request = request;
}
@Override
public void run() {
log("[생산시도]" + request + " -> " + queue);
queue.put(request);
log("[생산완료]" + request + " -> " + queue);
}
}
public class BoundedMain {
public static void main(String[] args) {
// BoundedQueue 선택
//BoundedQueue queue = new BoundedQueueV1(2);
//BoundedQueue queue = new BoundedQueueV2(2);
BoundedQueue queue = new BoundedQueueV3(2);
// 생산자 소비자 실행 순서 선택, 반드시 하나만 선택
producerFirst(queue);
//consumerFirst(queue);
}
private static void producerFirst(BoundedQueue queue) {
log("== [생산자 먼저 실행] 시작 " + queue.getClass().getSimpleName() + " ==");
java.util.List<Thread> threads = new ArrayList<>();
startProducer(queue, threads);
printAllState(queue, threads);
startConsumer(queue, threads);
printAllState(queue, threads);
log("== [생산자 먼저 실행] 종료 " + queue.getClass().getSimpleName() + " ==");
}
private static void consumerFirst(BoundedQueue queue) {
log("== [소비자 먼저 실행] 시작 " + queue.getClass().getSimpleName() + " ==");
List<Thread> threads = new ArrayList<>();
startConsumer(queue, threads);
printAllState(queue, threads);
startProducer(queue, threads);
printAllState(queue, threads);
log("== [소비자 먼저 실행] 종료 " + queue.getClass().getSimpleName() + " ==");
}
private static void startProducer(BoundedQueue queue, List<Thread> threads) {
System.out.println();
log("생산자 시작");
for (int i = 1; i <= 3; i++) {
Thread producer = new Thread(new ProducerTask(queue, "data" + i), ("producer" + i)); //data 생성
threads.add(producer);
producer.start();
sleep(100);
}
}
private static void startConsumer(BoundedQueue queue, List<Thread> threads) {
System.out.println();
log("소비자 시작");
for (int i = 1; i <= 3; i++) {
Thread consumer = new Thread(new ConsumerTask(queue), "consumer" + i);
threads.add(consumer);
consumer.start();
sleep(100);
}
}
//스레드에 담긴 값들을 출력
private static void printAllState(BoundedQueue queue, List<Thread> threads) {
System.out.println();
log("현재 상태 출력, 큐 데이터 : " + queue);
for (Thread thread : threads) {
log(thread.getName() + ": " + thread.getState());
}
}
}
public class BoundedQueueV1 implements BoundedQueue {
//중간에 데이터를 보관하는 Queue
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
//버퍼의 개수 제한
public BoundedQueueV1(int max) {
this.max = max;
}
//한번에 한 스레드 사용 -> 임계값
//핵심 공유 자원 -> ArrayQueue, 한번에 하나의 스레드만 put, take
//중간에 queue의 size가 변하면 안됨
//queue에 max만큼 찬 경우
@Override
public synchronized void put(String data) {
if (queue.size() == max) {
log("[put] 큐가 가득 참, 버림: " + data);
return;
}
queue.offer(data);
}
@Override
public synchronized String take() {
if (queue.isEmpty()) {
return null;
}
return queue.poll();
}
@Override
public String toString() {
return queue.toString();
}
}
원칙적으로 toString에도 synchronized 적용 해야 함, 그래야 toString()을 통한 조회 시점에도 정확한 데이터를 조회 가능.
임계영역
핵심 공유 자원은 바로 queue(ArrayDeque), 여러 스레드가 접근할 예정 synchornized를 사용해서 한번에 하나의 스레드만 put or task 실행 할 할 수 있도록, 안전한 임계영역 생성
BoundedQueue 선택
버퍼의 크기는 2를 사용, 버퍼에는 데이터 2개 까지만 보관
생산자 소비자 실행 순서 선택, 단 하나만

생산자 스레드 실행 시작
p1이 lock을 획득 하고, data1 queue에 저장됨, 그리고 lock을 반납 하고 terminate
p2이 lock을 획득 하고, data2 queue에 저장됨, 그리고 lock을 반납 하고 terminate
p3이 lock을 획득 하고, data3 queue에 저장 안됨, 그리고 버림
소비자 스레드 시작
c1이 lock을 획득 하고, queue에서 data1을 꺼내간다, 그리고 lock을 반납 하고 terminate
c2이 lock을 획득 하고, queue에서 data2을 꺼내간다, 그리고 lock을 반납 하고 terminate
c3이 lock을 획득 하고, queue에는 data 없음, 그리고 버림
큐에 데이터가 없다면 기다리는 방법도 하나.
그렇다면 sleep을 짧게 사용해서 잠시 대기, 깨어난 다음에 다시 반복문에 큐에 데이터가 있는지 체크
한정된 버퍼 문제는 데이터가 가득 찬 상황에 데이터를 생산해서 추가할 때도 문제 발생, 큐에 데이터가 없는데 데이터를 소비할 때도 문제가 발생
큐에 data가 없으므로 null을 반환, 결과적으로 소비자는 값을 받지 못하고 종료
생성자가 데이터를 넣어준다고 가정 -> 스레드는 큐에 데이터가 추가될때 까지 기다리는 것
public class BoundedQueueV2 implements BoundedQueue {
//중간에 데이터를 보관하는 Queue
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
//버퍼의 개수 제한
public BoundedQueueV2(int max) {
this.max = max;
}
//한번에 한 스레드 사용 -> 임계값
//핵심 공유 자원 -> ArrayQueue, 한번에 하나의 스레드만 put, take
//중간에 queue의 size가 변하면 안됨
//queue에 max만큼 찬 경우
@Override
public synchronized void put(String data) {
while (queue.size() == max) {
log("[put] 큐가 가득 참 : 생산자 잠시 대기" );
sleep(1000);
return;
}
queue.offer(data);
}
@Override
public synchronized String take() {
while (queue.isEmpty()) {
log("[take] 큐에 data 없음, 소비자 대기");
sleep(1000);
}
return queue.poll();
}
@Override
public String toString() {
return queue.toString();
}
}
put(data) 데이터를 버리지 않는 대안
큐에 공간이 생길때 까지, 생성자 스레드가 기다리면 됨,
sleep 기능을 활용
take 큐에 데이터 없다면 기다리는 방법
큐에 데이터가 없을때 null을 받지 않는 대안은 큐에 데이터가 추가될 때 까지 소비자 스레드가 기다리는 것
문제
데이터 들어가는 것은 이전 v1 들어가는 것과 동일
하지만 p3에서 큐가 가득 차있기 때문에 생산자 대기, -> TimedWaiting 1초마다 깨어나서 확인
그리고 c1은 임계영역에 들어가기 위해 lock 획득 시도, 하지만 락은 p3가 갖고 있음
-> 무한 대기 문제가 발생한다.
결과적으로 c1,c2,c3 모두 Blocked 상태로 대기
소비자 먼저 실행 분석
소비자 c1은 임계영역에 들어가기 위해 락 획득. 하지만 data 없음 그래서 sleep
무한 반복 시작
c1이 락 키를 갖고 있기 때문에 c2, c3의 값은 들어갈 수 없음

현재 스레드가 가진 락을 반납, 대기
스레드를 대기 상태로 전환, 현재 스레드가 synchronized 블록이나 메서드에서 락을 소유하고 있을때만 호출. 호출한 스레드는 락을 반납, 다른 스레드가 해당 락을 획득 할 수 있도록 함 -> 다른 스레드가 notify 또는 notifyAll을 호출할 때 까지 대기상태 유지
대기중인 스레드중 하나를 깨운다.
synchronized 블록이나 메서드에서 호출 되어야 함. 깨운 스레드는 락을 다시 획들할 기회를 얻는다.
만약 대기중인 스레드가 여러개 라면 하나만 깨워지게 됨
대기중인 모든 스레드를 깨운다.
모든 스레드를 깨워야 할 필요가 있을 경우 유용
public class BoundedQueueV3 implements BoundedQueue {
//중간에 데이터를 보관하는 Queue
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
//버퍼의 개수 제한
public BoundedQueueV3(int max) {
this.max = max;
}
//한번에 한 스레드 사용 -> 임계값
//핵심 공유 자원 -> ArrayQueue, 한번에 하나의 스레드만 put, take
//중간에 queue의 size가 변하면 안됨
//queue에 max만큼 찬 경우
@Override
public synchronized void put(String data) {
while (queue.size() == max) {
log("[put] 큐가 가득 참 : 생산자 잠시 대기" );
try {
wait();
log("[put] 생산자 깨어님");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
queue.offer(data);
log("[put] 생신지 데이터 저장, notify() 호출");
notify(); //wait 한 애가 깨어남. 대기 스레드 wait -> Blocked를 풀어버림
return;
}
queue.offer(data);
}
@Override
public synchronized String take() {
while (queue.isEmpty()) {
log("[take] 큐에 data 없음, 소비자 대기");
try {
wait();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 획득, notify() 호출");
notify();
//notifyAll();
return data;
}
@Override
public String toString() {
return queue.toString();
}
}
put(data) - wait(), notify()
생산자 스레드는 락 획득 시도
락을 획득한 생산자 스레드는 반복문을 사용해 큐에 빈공간이 생기는지 주기적으로 check
만약 빈 공간이 없다면, wait을 사용해 대기, 대기할 때 락을 반납하고 대기
wait을 호출 -> Runnable -> Waiting 상태가 됨
생산자가 데이터를 큐에 저장하고 나면 notify()를 통해 저장된 데이터가 있다고 대기하는 스레드에 알려주어
야 함.
take() - wait(), notify()
데이터가 없다면 Object.wait()을 사용해서 대기한다. 참고로 대기할 때 락을 반납하고 대기
대기의 경우 Runnable -> Waiting
생산자는 생산을 완료하면 notify()로 대기하는 스레드를 깨워서 생산된 데이터를 가져가게 하고, 소비자는 소비를 완료하면 notify()로 대기하는 스레드를 깨워서 데이터를 생산
synchronized 임계 영역 안에서 Object.wait을 호출하면 스레드는 대기상태에 들어간다. 이렇게 대기 상태에 들어간 스레드를 관리하는 것을 대기 집합. 모든 객체는 각자의 대기 집합을 갖고 있음
모든 객체는 락과 대기 집합을 갖고 있음. 둘은 한쌍으로 사용 따라서 락을 획득한 객체의 대기 집합을 사용 -> 구현 인스턴스의 락과 대기집합을 사용
(p1, p2)이 락을 획득하고 큐에 데이터를 저장
큐에 데이터가 추가 되었기 때문에 스레드 대기 집합에 -> notify 사용
현재 대기 집합에는 스레드가 없으므로 아무 일도 발생 X, 만약 소비자 스레드가 대기 집합에 있었다면, 깨어나서 큐에 있는 data 소비
p3 데이터가 생산하려고 하는데 큐가 가득 참 -> wait
락을 반납,
스레드의 상태가 Runnable -> Waiting
스레드 대기 집합에서 관리
소비자 스레드 실행 시작
소비자 스레드가 데이터 획득, 큐에 데이터를 보관할 빈자리 생성
notify -> 스레드 대기 집합실에 이 사실을 알려줌
대기 집합에 있는 스레드 중 하나 깨움
깨어난 스레드는 여전히 임계 영역, -> 바로 작동하는 것은 아님
임계 영역에 있는 코드를 실행하려면 락 필요. p3 대기 집합에서는 나가지만, 여전히 임계 영역에 있으므로, 락을 획득 하기 위해 BLOCKED 상태로 대기
c1이 data를 소비하고 data3가 락키를 획득해서 queue에 저장
그 이후 c2, c3 모두 실행 됨
소비자 우선 실행
우선 큐에 데이터가 없기 때문에 c1,2,3 모두 스레드 대기 집합
이후 생산자가 큐에 데이터를 생산하면 notify를 통해 스레드를 하나씩 깨워서 데이터 소비
p1 락 획득 -> 큐에 데이터를 생산, 큐에 데이터가 있기 대문에 소비자를 하나씩 깨울 수 있음
notify를 통해 스레드 대기 잡합에 이 사실을 알려줌
하지만 소비자 c1,2,3 중 어떤게 깨어날지는 순서를 보장하지 않음
p1이 락 키를 반납 -> c1이 락키를 획득 후 data 꺼내감
그 후 notify를 통해 c2를 깨운다. 하지만 data에는 아무런 값이 없기 때문에 락을 획득 했다가 다시 스레드 대기 집합에 들어간다.
그 후 p2의 값이 큐 데이터에 들어가고, 다시 notify를 통해 스레드 대기 집합에 알려줘 c2가 data를 획득해 온다.
c3도 같은 과정이 반복 됨
최종적으로 보면 p1,2,3 & c1,2,3 모두 소비 된 것을 확인 가능
단, 소비자의 경우 큐에 데이터가 없을 때 CPU 자원만 소비하고, 다시 대기 집합에 들어가는 비효율성 발생
notify -> 같은 스레드를 깨울때 비효율성이 발생
스레드 기아
어떤 스레드가 깨어날 지 모르기 때문에 스레드 기아문제가 발생
notifyAll을 사용하면 일시적인 방법으로 해결이 가능 할 수도 있음
대기 집합에 있는 모드 스레드를 깨워서 락을 우선 획득, 획득하지 못하면 BLOCKED
결과적으로 notifyAll() 을 사용해서 스레드 기아 문제는 막을 수 있지만, 비효율을 막지는 못한다.