Java - 생산자 소비자 문제(Thread)

INHEES·2025년 1월 1일
0

금일은 Java 의 생산자 소비자 문제에 대해 알아보겠습니다.

생산자 소비자 문제는 주로 멀티 스레드 프로그래밍에서 자주 등장하는 동시성 문제입니다.

즉 여러 스레드가 동시에 데이터를 생산하고 소비하는 상황을 다루며 멀티스레드의 핵심을 제대로 이해하기 위해서는 해당 개념의 이해가 필요합니다.


목차

  • 생산자 소비자 문제
  • 생산자 우선
  • 소비자 우선
  • wait, notify

생산자 소비자 문제

기본개념

생산자(Producer): 데이터를 생성하는 역할을 한다. 예를 들어, 파일에서 데이터를 읽어오거나 네트워크에서 데
이터를 받아오는 스레드가 생산자 역할을 할 수 있다.

소비자(Consumer): 생성된 데이터를 사용하는 역할을 한다. 예를 들어, 데이터를 처리하거나 저장하는 스레드
가 소비자 역할을 할 수 있다.

버퍼(Buffer): 생산자가 생성한 데이터를 일시적으로 저장하는 공간이다. 이 버퍼는 한정된 크기를 가지며, 생산
자와 소비자가 이 버퍼를 통해 데이터를 주고받는다.

문제상황

생산자가 너무 빠를 때: 버퍼가 가득 차서 더 이상 데이터를 넣을 수 없을 때까지 생산자가 데이터를 생성한다. 버
퍼가 가득 찬 경우 생산자는 버퍼에 빈 공간이 생길 때까지 기다려야 한다.

소비자가 너무 빠를 때: 버퍼가 비어서 더 이상 소비할 데이터가 없을 때까지 소비자가 데이터를 처리한다. 버퍼가
비어있을 때 소비자는 버퍼에 새로운 데이터가 들어올 때까지 기다려야 한다.


생산자 소비자 문제 예제 코드

    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    public BoundedQueueV1(int max) {
        this.max = max;
    }

    @Override
    public synchronized void put(String data) {
        if (queue.size() == max) {
            log("[put] 큐가 가득 참, 버림: " + data);
            return;
        }
        queue.offer(data);
    }

    @Override
    public synchronized String take() {
        if (queue.isEmpty()) {
            return null;
        }

        return queue.poll();
    }
public class ProducerTask implements Runnable {

    private BoundedQueue queue;
    private String request;

    public ProducerTask(BoundedQueue queue, String request) {
        this.queue = queue;
        this.request = request;
    }

    @Override
    public void run() {
        log("[생산 시도] " + request + " -> " + queue);
        queue.put(request);
        log("[생산 완료] " + request + " -> " + queue);
    }
}
public class ConsumerTask implements Runnable {

    private BoundedQueue queue;

    public ConsumerTask(BoundedQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        log("[소비 시도]     ? <- " + queue);
        String data = queue.take();
        log("[소비 완료] " + data + " <- " + queue);
    }
}

해당 순서로는 버퍼, 생산자, 소비자의 코드이며 main method 에서 실행하도록 하겠습니다.

    public static void main(String[] args) {

        BoundedQueue queue = new BoundedQueueV6_4(2);

        producerFirst(queue); // 소비자도 마찬가지로 실행 
    }

    private static void producerFirst(BoundedQueue queue) {
        log("== [생산자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " ==");
        List<Thread> threads = new ArrayList<>();
        startProducer(queue, threads);
        printAllState(queue, threads);
        startConsumer(queue, threads);
        printAllState(queue, threads);
        log("== [생산자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
    }
    private static void startProducer(BoundedQueue queue, List<Thread> threads) {
        System.out.println();
        log("생산자 시작");
        for (int i = 1; i <= 3; i++) {
            Thread producer = new Thread(new ProducerTask(queue, "data" + i), "producer" + i);
            threads.add(producer);
            producer.start();
            sleep(100);
        }
    }

    private static void startConsumer(BoundedQueue queue, List<Thread> threads) {
        System.out.println();
        log("소비자 시작");
        for (int i = 1; i <= 3; i++) {
            Thread consumer = new Thread(new ConsumerTask(queue), "consumer" + i);
            threads.add(consumer);
            consumer.start();
            sleep(100);
        }
    }

    private static void printAllState(BoundedQueue queue, List<Thread> threads) {
        System.out.println();
        log("현재 상태 출력, 큐 데이터: " + queue);
        for (Thread thread : threads) {
            log(thread.getName() + ": " + thread.getState());
        }
    }

생상자 우선

data1, data2 가 큐에 저장하였고 한정된 크기의 버퍼 역할을 하는 큐가 가득 차 있기 때문에 data3 의 처리방법에 대한 대안 방안은 무엇일까?

  • 단순하게 큐에 빈 공간이 생길 때 까지 data3 의 p3 스레드가 기다리는 방법이 있다.
  • 체크 방법으로는 큐에 빈 공간이 생겼는지 주기적으로 체크하고 만약 빈 공간이 없다면 sleep() 를 짧게 사용해서 잠시 대기하는 방법이 있다.

소비자 입장에서 소비자 스레드가 데이터를 획득하려고 할때 큐에 데이터가 없을 때는 다음과 같은 대안이 있다.

  • 소비자 입장에서 큐에 데이터가 없다면 기다리는 것도 대안이다.
  • null 을 받지 않은 대안이며 물론 생산자 스레드가 계속해서 데이터를 생산한다는 가정이 필요하다.
  • 마찬가지로 sleep 함수를 사용해서 체크하는 방법이 있다.
    @Override
    public synchronized void put(String data) {
        while (queue.size() == max) {
            log("[put] 큐가 가득 참, 생산자 대기");
            sleep(1000);
        }
        queue.offer(data);
    }

해당 위의 코드를 생산자 우선 입장에서 실행해보면 소비자 스레드는 데이터를 받지 못하고 있는 상황이 발생한다.

생산자 스레드 p3 는 락은 획득한 상태로 큐의 빈 자리를 반복해서 확인하며 스레드의 상태는 RUNNABLE -> TIMED_WAITRING 상태가 됩니다.

해당 부분에서의 핵심은 임계 영역에 진입한 스레드가(락을 획득한 상태) 무한정 대기한다는 문제점이 발생하며 소비자 스레드인 c1 은 p3 때문에 임계 영역에 들어가지 못해 데이터를 계속 받지 못하여 null 값만 받는 상태가 되는 것이다.

결과적으로 소비자 스레드인 c1은 p3가 락을 반납할 때 까지 BLOCK 상태로 대기하게 되는 것이다.

위의 문제점의 방안으로 락을 가지고 있는 임계 영역안에 있는 스레드가 락을 양보하면 다른 스레드가 버퍼에 값을 채우거나 버퍼의 값을 가져가 락을 반납가능할 것 같습니다.

함수로는 Object.wait(), Object.notify() 함수로 대기하는 스레드가 대기하는 동안 다른 스레드에게 락을 양보 가능하다.


소비자 우선

소비자 스레드 먼저 실행의 경우 소비자의 스레드는 생산자로 부터 데이터를 받지 못한다. 즉 null 값을 받게 된다.

생산자 스레드인 p3 가 보관한는 data3 는 버려지는 문제가 발생한다. 해당 문제에 대한 대안은 다음과 같습니다.

  • 소비자 입장에서도 큐에 데이터가 없다면 sleep 을 활용하여 대기하는 방법이 있습니다.
    @Override
    public synchronized String take() {
        while (queue.isEmpty()) {
            log("[take] 큐에 데이터가 없음, 소비자 대기");
            sleep(1000);
        }
        return queue.poll();
    }

해당코드를 적용해도 생산자 우선과 마찬가지로 소비자 우선도 c1 의 스레드가 데이터를 계속 확인하기 때문에 락을 반납할 때 까지 무한 대기하는 문제점이 발생한다.


wait(), notify()

Object.wait()

  • 현재 스레드가 가진 락을 반납하고 대기(WAITING) 합니다.
  • 현재 스레드를 대기( WAITING ) 상태로 전환한다. 이 메서드는 현재 스레드가 synchronized 블록이나
    메서드에서 락을 소유하고 있을 때만 호출할 수 있다. 호출한 스레드는 락을 반납하고, 다른 스레드가 해당
    락을 획득할 수 있도록 한다.
  • 이렇게 대기 상태로 전환된 스레드는 다른 스레드가 notify() 또는
    notifyAll() 을 호출할 때까지 대기 상태를 유지한다.

Object.notify(), notifyAll()

  • 대기 중인 스레드를 하나 또는 모든 스레드를 개우는 역할을 한다.
  • 해당 메서드는 synchronized 블록이나 메서드에서 호출되어야 하며 락을 획득할 수 있는 기회를 얻게 된다.
    @Override
    public synchronized void put(String data) {
        while (queue.size() == max) {
            log("[put] 큐가 가득 참, 생산자 대기");
            try {
                wait(); // RUNNABLE -> WAITING, 락 반납
                log("[put] 생산자 깨어남");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        queue.offer(data);
        log("[put] 생산자 데이터 저장, notify() 호출");
        notify(); // 대기 스레드, WAIT -> BLOCKED
    }

    @Override
    public synchronized String take() {
        while (queue.isEmpty()) {
            log("[take] 큐에 데이터가 없음, 소비자 대기");
            try {
                wait();
                log("[take] 소비자 깨어남");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        String data = queue.poll();
        log("[take] 소비자 데이터 획득, notify() 호출");
        notify(); // 대기 스레드, WAIT -> BLOCKED
        return data;
    }

코드 설명

  • wait() 를 호출해서 대기하는 경우 RUNNABLE WAITING 상태가 된다.
  • 생산자가 데이터를 큐에 저장하고 나면 notify() 를 통해 저장된 데이터가 있다고 대기하는 스레드에 알려주어야 한다.

생산자 우선 코드 실행시

소비자 우선 코드 실행시

정리

멀티 스레드 중에 어떤 스레드가 깨어나는가는 JVM 스펙에 명시되어 있다. 때문에 JVM 버전 환경들에 따라서 달라집니다.

대기 집합에 있는 스레드가 깨어난다고 바로 작동하는 것은 아니다.

  • 생산자 우선 코드

    • wait() , notify() 덕분에 스레드가 락을 놓고 대기하고, 또 대기하는 스레드를 필요한 시점에 깨울 수 있습니다.
    • 생산자 스레드가 큐가 가득차서 대기해도, 소비자 스레드가 큐의 데이터를 소비하고 나면 알려주기 때문에, 최적의 타이밍에 깨어나서 데이터를 생산할 수 있습니다.
    • 덕분에 최종 결과를 보면 p1 , p2 , p3 는 모두 데이터를 정상 생산하고, c1 , c2 , c3 는 모두 데이터를 정상 소비할수 있습니다.
  • 소비자 우선 코드

    	- c1,c2,c3 는 대기집합에 존재하며 c1의 작업이 완료되면 c2를 깨우지만 c2는 바로 큐에 데이터가 없다는 점이다.
    • 결국 c2 는 wait() 를 호출해서 대기 상태로 변하며 다시 대기 집합에 들어간다.
    • 이와 같이 c1 이 같은 소비자인 c2를 깨우는 것은 상당히 비효율적이다.

이와 같은 이유로 생산자는 소비자를 소비자는 생산자를 깨우는 코드로 변환이 필요하다.


notify(), wait() 의 한계

생산자 우선 코드의 입장에서 대기집합에 c1 ~ c3, p2, p3 가 있다고 하면 p1 의 스레드의 경우 p2를 깨울 수 있는 것이다.

때문에 불필요한 스레드의 이동이 필요하다.

notify() 의 문제점으로는 어떤 스레드가 깨어날 지 알 수 없기 때문에 스레드 기아 문제 가 있다. 대기 상태의 스레드가 실행 순서를 계속 얻지 못해서 실행되지 않는 상황을 말합니다.

단순한 해결 방법으로 notifyAll() 을 사용하는 방법도 있다. 이방법도 마찬가지로 해당되지 않는 스레드는 다시 스레드 대기 집합으로 들어가야 되는 비효율이 존재한다.


참고자료

inflearn

profile
이유를 찾아보자

0개의 댓글