생산자 소비자 예제로 알아보는 스레드

이동건 (불꽃냥펀치)·2025년 1월 9일
0

생산자 소비자 문제는 멀티스레드 프로그래밍에서 자주 등장하는 문제중 하나로 여러 스레드가 동시에 데이터를 생산하고 소비하는 상황을 다룬다. 멀티스레드의 핵심을 제대로 이해하려면 반드시 생산자 소비자 문제를 이해하고, 올바른 해결 방안도 함께 알아두어야 한다.


기본 개념

  • 생산자: 데이터를 생성하는 역할을 한다. 앞서 프린터 예제에서 사용자의 입력을 프린터 큐에 전달하는 스레드가 생산자의 역할이다.
  • 소비자: 생성된 데이터를 사용하는 역할을 한다. 프린터 예제에서 프린터 큐에 전달된 데이터를 받아서 출력하는 스레드가 소비자 역할이다.
  • 버퍼: 생산자가 생성한 데이터를 일시적으로 저장하는 공간이다. 이 버퍼는 한정된 크기를 가지며, 예제에서 프린터 큐가 버퍼 역할이다.

문제 상황

  • 생산자가 너무 빠를 때: 버퍼가 가득 차서 더이상 데이터를 넣을 수 없을 때까지 생산자가 데이터를 생성한다. 버퍼가 가득 찬 경우 생산자는 버퍼에 빈 공간이 생길 때까지 기다려야 한다.
  • 소비자가 너무 빠를 때: 버퍼가 비어서 더 이상 소비할 데이터가 없을 때까지 소비자가 데이터를 처리한다. 버퍼가 비었을 때는 소비자는 버퍼에 데이터가 들어올 때까지 기다려야 한다.


생산자 소비자 문제 - 예제 1 코드

 public class BoundedQueueV1 implements BoundedQueue {
    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;
    public BoundedQueueV1(int max) {
        this.max = max;
	}
    @Override
    public synchronized void put(String data) {
		if (queue.size() == max) {
			log("[put] 큐가 가득 참, 버림: " + data); 
            return;
		}
        queue.offer(data);
    }
    @Override
    public synchronized String take() {
        if (queue.isEmpty()) {
            return null;
		}
        return queue.poll();
    }
    @Override
    public String toString() {
        return queue.toString();
    }
}
  • BoundedQueueV1: 한정된 버퍼 역할을 하는 가장 단순한 구현체이다. 이후에 버전이 점점 올라가면서 코드를 개선한다.
  • Queue,ArrayDeque: 데이터를 중간에 보관하는 버퍼로 큐를 사용한다. 구현체로는 ArrayDeque를 사용한다.
  • int max: 한정된 버퍼이므로 버퍼에 저장할 수 있는 최대 크기를 지정한다.
  • put(): 큐에 데이터를 저장한다. 큐가 가득 찬 경우 더는 데이터를 보관할 수 없으므로 데이터를 버린다.
  • take(): 큐의 데이터를 가져간다. 큐에 데이터가 없는 경우 null을 반환한다.
    -toString(): 버퍼 역할을 하는 queue정보를 출력한다.
import static util.MyLogger.log;
 public class ProducerTask implements Runnable {
     private BoundedQueue queue;
     private String request;
     public ProducerTask(BoundedQueue queue, String request) {
         this.queue = queue;
         this.request = request;
	}
     @Override
     public void run() {
        log("[생산 시도] " + request + " -> " + queue); 
        queue.put(request);
        log("[생산 완료] " + request + " -> " + queue);
	}
}
  • 데이터를 생성하는 스레드가 실행하는 클래스로 Runnable을 구현한다.
public class ConsumerTask implements Runnable {
    private BoundedQueue queue;
    public ConsumerTask(BoundedQueue queue) {
        this.queue = queue;
	}
     @Override
    public void run() {
		log("[소비 시도] ? <- " + queue);
		String data = queue.take();
		log("[소비 완료] " + data + " <- " + queue);
	} 
}
  • 데이터를 소비하는 소비자 스레드가 실행하는 클래스, Runnable을 구현한다.
public class BoundedMain {
	public static void main(String[] args) {
// 1. BoundedQueue 선택
		BoundedQueue queue = new BoundedQueueV1(2);
// 2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택! 
		producerFirst(queue); // 생산자 먼저 실행 
        //consumerFirst(queue); // 소비자 먼저 실행
	}
	private static void producerFirst(BoundedQueue queue) {
		log("== [생산자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + "==");
		List<Thread> threads = new ArrayList<>();
		startProducer(queue, threads);
		printAllState(queue, threads);
		startConsumer(queue, threads);
		printAllState(queue, threads);
		log("== [생산자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + "=="); 
        
        }
   		private static void consumerFirst(BoundedQueue queue) {
			log("== [소비자 먼저 실행] 시작, " + queue.getClass().getSimpleName()+ "==");
			List<Thread> threads = new ArrayList<>();
            startConsumer(queue, threads);
            printAllState(queue, threads);
            startProducer(queue, threads);
            printAllState(queue, threads);
            log("== [소비자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " =="); 
            }
            
   		 private static void startProducer(BoundedQueue queue, List<Thread> threads){
            System.out.println(); log("생산자 시작");
            for (int i = 1; i <= 3; i++) {
            	Thread producer = new Thread(new ProducerTask(queue, "data" + i),
				"producer" + i);
          		  threads.add(producer);
           		 producer.start();
            	sleep(100);
		} 
	}
      private static void startConsumer(BoundedQueue queue, List<Thread> threads){
			System.out.println(); 
            log("소비자 시작");
			for (int i = 1; i <= 3; i++) {
           		 threads.add(consumer);
            	consumer.start();
           		 sleep(100);
		} 
	}
 }

producerFirst 코드 분석

  • threads: 스레드의 결과 상태를 한꺼번에 출력하기 위해 생성한 스레드를 보관해둠
  • startProducer: 생산자 스레드를 3개 만들어서 실행한다.
  • startConsumer: 소비자 스레드를 3개 만들어서 실행한다.

  • p는 생산자 스레드를 뜻한다.
  • c는 소비자 스레드를 뜻한다.
  • 임계영역은 synchronized 영역을 뜻한다. 스레드가 이 영역에서 들어가려면 모니터 락이 필요하다.

생산자 스레드 실행 시작

  • p1,p2는 값을 생산해서 boundedQueue에 저장을 완료했다.
  • p3은 큐가 가득 차서 데이터를 추가할 수 없다. 따라서 put() 내부에서 data3은 버린다.

소비자 스레드 실행 시작

  • c1,c2가 순차적으로 데이터를 소비했다.
  • 하지만 c3은 큐에 데이터가 없기 때문에 데이터를 획득할 수 없다. 대신에 null을 반환받는다.

문제점

  • 생산자 스레드가 먼저 실행되는 경우 p3이 보관하는 data3은 버려지고 c3은 데이터를 받지 못한다.
  • 소비자 스레드가 먼저 실행되는 경우 c1,c2,c3은 데이터를 받지 못한다. 그리고 p3이 보관하는data3은 버려진다.



생산자 소비자 문제 - 예제 2 코드

Object-wait,notify

synchronized를 사용한 임계영역에서 락을 가지고 무한 대기하는 문제는 Object클래스에 해결방안이 있다. Object 클래스는 무한 대기 문제를 해결하는 wait(),notify()라는 메서드를 제공한다.

  • Object.wait()
    • 현재 스레드가 가진 락을 반납하고 대기한다(Waiting)
    • 현재 스레드를 대기(Waiting) 상태로 전환한다. 이 메서드는 현재 스레드가 synchronized 블록이나 메서드에서 락을 소유하고 있을 때만 호출할 수 있다. 호출한 스레드는 락을 반납하고 다른 스레드가 해당 락을 획득 할 수 있도록한다. 이렇게 대기 상태로 전환된 스레드는 다른 스레드가 notify()/notifyAll()을 호출할 때 까지 대기상태를 유지한다.


  • Object.notify()
    • 대기중인 스레드 하나를 깨운다.
    • 이 메서드는 synchronized블록이나 메서드에서 호출되어야한다. 깨운 스레드는 다시 락을 획득할 기회를 얻게된다. 만약 대기중인 스레드가 여러개라면 그중 하나만이 깨워지게 된다.

  • Object.notifyAll()
    • 대기 중인 모든 스레드를 깨운다.
    • 이 메서드 역시 synchronized 블럭이나 메서드에서 호출되어야 하며, 모든 대기 중인 스레드가 락을 획득할 기회를 얻게 된다.

코드 변경

 public synchronized void put(String data) {
        while (queue.size() == max) {
		log("[put] 큐가 가득 참, 생산자 대기");
        try {
			wait(); // RUNNABLE -> WAITING, 락 반납
			log("[put] 생산자 깨어남");
		} catch (InterruptedException e) { 
        	throw new RuntimeException(e);
		} 
	}
	queue.offer(data);
	log("[put] 생산자 데이터 저장, notify() 호출"); 
    notify(); 
// 대기 스레드, WAIT -> BLOCKED //notifyAll(); // 모든 대기 스레드, WAIT -> BLOCKED
}

  public synchronized String take() {
        while (queue.isEmpty()) {
		log("[take] 큐에 데이터가 없음, 소비자 대기"); 
        try {
			wait();
			log("[take] 소비자 깨어남");
		} catch (InterruptedException e) { 
        throw new RuntimeException(e);
		} 
	}
	String data = queue.poll();
	log("[take] 소비자 데이터 획득, notify() 호출"); 
    notify(); // 대기 스레드, WAIT -> BLOCKED 
//notifyAll(); // 모든 대기 스레드, WAIT -> BLOCKED return data;

put(data) - wait(), notify()

  • synchronized를 통해 임계영역을 설정한다. 생산자 스레드는 락 획득을 시도한다.
  • 락을 획득한 생산자 스레드는 반복해서 큐에 빈공간이 생기는 지 체크한다.
    만약 빈공간이 생기면 Object.wait()를 사용해서 대기한다. 참고로 대기할 때는 락을 반납하고 대기한다. 그리고 대기에서 깨어나면 반복문에서 큐의 빈공간을 체크한다.
  • wait() 를호출해서대기하는경우 RUNNABLE WAITING 상태가된다.
  • 생산자가 데이터를 큐에 저장하고 나면 notify() 를 통해 저장된 데이터가 있다고 대기하는 스레드에 알려주어 야 한다. 예를 들어서 큐에 데이터가 없어서 대기하는 소비자 스레드가 있다고 가정하자. 이때 notify() 를 호 출하면 소비자 스레드는 깨어나서 저장된 데이터를 획득할 수 있다.

take() - wait(), notify()

  • synchronized 를 통해 임계 영역을 설정한다. 소비자 스레드는 락 획득을 시도한다.
  • 락을 획득한 소비자 스레드는 반복문을 사용해서 큐에 데이터가 있는지 주기적으로 체크한다. 만약 데이터가 없다 면 Object.wait() 을 사용해서 대기한다. 참고로 대기할 때 락을 반납하고 대기한다. 그리고 대기 상태에서 깨 어나면, 다시 반복문에서 큐에 데이터가 있는지 체크한다.
  • 대기하는 경우 RUNNABLE WAITING 상태가 된다.
  • 소비자가 데이터를 획득하고 나면 notify() 를 통해 큐에 저장할 여유 공간이 생겼다고, 대기하는 스레드에게 알려주어야 한다. 예를 들어서 큐에 데이터가 가득 차서 대기하는 생산자 스레드가 있다고 가정하자. 이때
    notify() 를 호출하면 생산자 스레드는 깨어나서 데이터를 큐에 저장할 수 있다.




Object - wait, notify - 한계

지금까지 살펴본 Object.wait() , Object.notify() 방식은 스레드 대기 집합 하나에 생산자, 소비자 스레드를 모두 관리한다. 그리고 notify() 를 호출할 때 임의의 스레드가 선택된다. 따라서 앞서 살펴본 것 처럼 큐에 데이터가 없는 상황에 소비자가 같은 소비자를 깨우는 비효율이 발생할 수 있다. 또는 큐에 데이터가 가득 차있는데 생산자가 같 은 생산자를 깨우는 비효율도 발생할 수 있다.





생산자 소비자 문제 - 예제 2 코드


문제를 해결하는 것의 핵심은 스레드는 데이터를 생성하고, 대기중인 소비자 스레드에게 알려줘야 한다는 점이다. 반대로 소비자 스레드는 데이터를 소비하고 대기중인 생산자 스레드에게 알려주면 된다.이는 생산자와 소비자 스레드가 대기하는 공간을 둘로 나누면 해결된다. 생산자 스레드가 데이터를 생산하면 소비자 스레드에게 알려주고 반대로 소비자 스레드가 데이터를 소비하면 생산자 스레드에게 알려주면 된다. `Lock,ReentrantLock`구현체를 사용하면 공간을 둘로 나눌 수 있다.
public class BoundedQueueV4 implements BoundedQueue {

    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;
    public BoundedQueueV4(int max) {
        this.max = max;
	}

    public void put(String data) {
        lock.lock();
        try {
            while (queue.size() == max) {
				log("[put] 큐가 가득 참, 생산자 대기"); 
                try {
                    condition.await();
					log("[put] 생산자 깨어남");
				} catch (InterruptedException e) { 
                	throw new RuntimeException(e);
				} 
			}
			queue.offer(data);
			log("[put] 생산자 데이터 저장, signal() 호출"); 
            condition.signal();
        } finally {
            lock.unlock();
		} 
	}
     public String take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
				log("[take] 큐에 데이터가 없음, 소비자 대기");
                try {
                  condition.await();
				log("[take] 소비자 깨어남");
				} catch (InterruptedException e) { 
                	throw new RuntimeException(e);
				} 
			}
			String data = queue.poll();
			log("[take] 소비자 데이터 획득, signal() 호출"); 
            condition.signal();
			return data;
         } finally {
             lock.unlock();
		} 
	}
}

Condition
Condition condition = lock.newCondition()에서 ConditionReentarantLock을 사용하는 스레드가 대기하는 스레드 공간이다.

condition.await()
Object.wait()와 유사한 기능이다. 지정한 condition에 현재 스레드를 대기(Waiting)상태로 보관한다. 이때 ReentrantLock에서 획득한 락을 반납하고 대기 상태로 condition에 보관된다.

condition.signal()
Object.notify()와 유사한 가능이다. 지정한 condition에서 대기중인 스레드를 하나 깨운다. 깨어난 스레드는 condition에서 빠져나온다.


생산자-소비자 대기 공간 분리

    private final Lock lock = new ReentrantLock();
    private final Condition producerCond = lock.newCondition();
    private final Condition consumerCond = lock.newCondition();
    
     @Override
    public void put(String data) {
        lock.lock();
        try {
			while (queue.size() == max) { 
            	log("[put] 큐가 가득 참, 생산자 대기"); 
                try {
                	producerCond.await();
					log("[put] 생산자 깨어남");
				} catch (InterruptedException e) { 
                	throw new RuntimeException(e);
				} 
			}
			queue.offer(data);
			log("[put] 생산자 데이터 저장, consumerCond.signal() 호출"); 						
            consumerCond.signal();
        } finally {
            lock.unlock();
		} 
	}
    
       @Override
    public String take() {
     	lock.lock();
        try {
			while (queue.isEmpty()) {
			log("[take] 큐에 데이터가 없음, 소비자 대기"); 
            try {
            	consumerCond.await();
				log("[take] 소비자 깨어남");
			} catch (InterruptedException e) { 
            	throw new RuntimeException(e);
			} 
		}
		String data = queue.poll();
		log("[take] 소비자 데이터 획득, producerCond.signal() 호출"); 			
        producerCond.signal();
		return data;
         } finally {
             lock.unlock();
		} 	
	}
    

Condition 분리

  • cinsumerCond: 생산자를 위한 스레드 대기 공간
  • producerCond: 소비자를 위한 스레드 대기 공간

put(data) - 생산자 스레드가 호출

  • 큐가 가득찬 경우: producerCond.await()를 호출해서 생산자 스레드를 생산자 전용 스레드 대기 공간에 보관한다.
  • 데이터를 저장한 경우: 생산자가 데이터를 생산하면 큐에 데이터가 저장되고, 이때 소비자를 깨운다.
    consumerCond.signal()를 호출해서 소비자 전용 스레드 대기 공간에 신호를 보낸다. 이렇게 하면 대기 중인 소비자 스레드가 하나 깨어나서 데이터를 소비할 수 있다.

take() - 소비자 스레드가 호출

  • 큐가 빈경우: consumerCond.await()를 호출해서 소비자 스레드를 소비자 전용 스레드 대기 공간에 보관한다.
  • 데이터를 소비한 경우: 소비자가 데이터를 소비한 경우 큐에 여유 공간이 생긴다. 따라서 생산자를 깨우는 것이 좋으니 producerCond.signal() 를 호출해서 생산자 전용 스레드 대기 공간에 신호를 보낸다.



스레드의 대기(synchronized vs ReentrantLock)

synchronized 대기

  • 대기1: 락획득 대기
    • Blocked 상태로 락 획득 대기
    • synchronized를 시작할 때 락이 없으면 대기
    • 다른 스레드가 synchronized를 빠져나갈 때 대기가 풀리며 락획득 시도
  • 대기2: wait() 대기
    • Waiting상태로 대기
    • wait()를 호출했을 때 스레드 대기 집한에서 대기
    • 다른 스레드가 notify()를 호출했을 때 빠져나감

ReentrantLock 대기

  • 대기1: ReentrantLock 락 획득 대기
    • ReentrantLcok의 대기 큐에서 관리
    • Waiting상태로 락획득 대기
    • lock.lock을 호출했을 때 락이 없으면 대기
    • 다른 스레드가 lock.unlock()을 호출했을 때 대기가 풀리며 락 획득 시도, 락을 획득하면 대기 큐를 빠져 나감
  • 대기2: await() 대기
    • condition.await()를 호출했을 때 condition객체의 스레드 공간에서 관리
    • Waiting 상태로 대기
    • 다른 스레드가 condition.signal()을 호출했을 때 condition객체의 스레드 대기 공간에서 빠져나감



BlockingQueue - 기능 설명


Throws Exception - 대기시 예외

  • add(e): 지정된 요소를 큐에 추가하며, 큐가 가득 차면 IllegalStateException 예외를 던진다.
  • remove(): 큐에서 요소를 제거하며 반환한다. 큐가 비어 있으면 NoSuchElementException 예외를 던진 다.
  • element(): 큐의 머리 요소를 반환하지만, 요소를 큐에서 제거하지 않는다. 큐가 비어 있으면
    NoSuchElementException 예외를 던진다.

Special Value - 대기시 즉시 반환

  • offer(e): 지정된 요소를 큐에 추가하려고 시도하며, 큐가 가득 차면 false 를 반환한다.
  • poll(): 큐에서 요소를 제거하고 반환한다. 큐가 비어 있으면 null 을 반환한다.
  • peek(): 큐의 머리 요소를 반환하지만, 요소를 큐에서 제거하지 않는다. 큐가 비어 있으면 null 을 반환한다.

Blocks - 대기

  • put(e): 지정된 요소를 큐에 추가할 때까지 대기한다. 큐가 가득 차면 공간이 생길 때까지 대기한다.
  • take(): 큐에서 요소를 제거하고 반환한다. 큐가 비어 있으면 요소가 준비될 때까지 대기한다.
  • Examine (관찰): 해당 사항 없음.

Times Out - 시간 대기

  • offer(e, time, unit): 지정된 요소를 큐에 추가하려고 시도하며, 지정된 시간 동안 큐가 비워지기를 기다리다가 시간이 초과되면 false 를 반환한다.
  • poll(time, unit): 큐에서 요소를 제거하고 반환한다. 큐에 요소가 없다면 지정된 시간 동안 요소가 준비되기를 기다리다가 시간이 초과되면 null 을 반환한다.
  • Examine (관찰): 해당 사항 없음.
profile
자바를 사랑합니다

0개의 댓글

관련 채용 정보