Producer , Comsumer 패턴

이승준·2025년 3월 4일

JAVA

목록 보기
3/4
post-thumbnail

Producer, Consumer 패턴이란?

  • Producer-Consumer 패턴이란 대부분의 상용 시스템 구현 과정에서 주로 사용되는 패턴이다.
  • 하나의 공유된 큐안에서 자원을 생성,소비하는 과정으로 동작한다.




Producer, Consumer 패턴 활용 사례


1. 데이터베이스 커넥션 풀

  • 연결지향성으로 DB와 통신할때 매 요청마다 커넥션 객체를 생성해서 처리하는 것은 TCP 소켓 연결 과정에서 Three-way handshake가 발생하기에 동작이 무겁다.

  • Producer,Consumer 패턴을 사용하면 미리 일정 개수의 커넥션 객체를 Producer가 생성해두고, Consumer가 사용자의 요청을 수행하기 위해 커넥션 객체를 사용한다.

  • EX. HikariCP


2. 웹 서버의 스레드 풀

  • Connection Pool과 같이 쓰레드도 요청마다 쓰레드를 생성하는것은 운영체제에서 새로운 커널 스레드를 생성해야하기 때문에 성능 저하를 일으킨다.
  • 따라서 Producer는 서버를 실행하는 과정에서 미리 일정 개수의 스레드를 생성해두고, 웹 서버는 사용자의 HTTP 요청을 큐에 순서대로 적재

Thread 생성 과정
1. JVM이 OS에 새로운 스레드 요청 (pthread_create)
2. OS 커널이 스레드 메타데이터 할당 (PCB, TCB 생성)
3. 메모리 할당 (스택, 힙 설정)
4. 스케줄러에 추가하여 CPU에서 실행 가능 상태로 변경
5. 컨텍스트 스위칭을 통해 실행


Thread 메모리 할당

  • 자바에서 쓰레드를 생성하면 각 스레드는 별도의 스택 메모리를 할당받는다.
  • 스레드 개수가 많아질수록 메모리 사용량이 증가하여 OutOfMemoryError가 발생할 수 있다.
  • 👉 너무 많은 스레드를 생성하면 JVM이 더 이상 메모리를 할당할 수 없어서 성능 저하 및 OOM 발생


컨텍스트 스위칭 비용

  • 운영체제가 멀티태스킹을 위해 CPU 코어를 스레드에 분배한다.
  • 동시에 실행할 수 있는 스레드 개수가 정해져있고, 하나의 스레드에서 다른 스레드로 전환할 때 컨텍스트 스위칭 비용이 발생한다.
  • 너무 많은 스레드를 생성하면 컨텍스트 스위칭이 빈번하게 발생하여 오히려 성능이 나빠짐

결론: 적절한 양의 스레드를 생성해서 스레드 풀로 관리하여 새 스레드를 생성하는 비용을 줄임과 동시에, 너무 많은 스레드로 인한 성능 저하를 방지할 수 있음.



3. 로그 처리 시스템

  • log.info("사용자 로그인");과 같이 로그를 처리하는 코드가 실행될 때마다, 로그를 파일이나 DB에 저장하는 과정에서 성능 저하 발생.

  • Producer는 로그 메시지를 큐에 적재하고, Consumer는 비동기적으로 로그 메시지를 처리해준다.
    ex) Logback, Log4j


4. 메시지 큐 시스템

로그 처리 시스템과 유사하게 파일 업로드, 이메일 전송 등과 같이 백엔드 서버에서 처리 시간이 오래걸리는 작업을 동기적으로 처리하기엔 아쉽다.

따라서, 메시지 큐를 활용해서 비동기적으로 처리하는데에 Producer-Consumer 패턴이 사용된다.

  • Producer는 처리할 작업을 메시지의 형태로 큐에 저장
  • Consumer는 큐에서 메시지를 소비하여 처리

ex.RabbitMQ, Kafka와 같은 메시지 큐잉 라이브러리



패턴 구현해보기

별개의 개별 스레드로 소비자와 생산자 패턴을 구현해보려고 한다.

  • 각각의 쓰레드는 데이터 큐를 공유한다.
  • 생산자는 계속 데이터를 생산한다.
  • 소비자는 데이터를 소비한다.

공유자원이기에 여기도 동시성에 대한 고민을 해야하나? 추가학습 필요



2-1. 메시지 클래스

생산자가 생산하고, 소비자가 소비할 데이터를 추상화한 클래스

상용 환경에서의 JSON이나 복잡한 객체, 숫자 등이 될 수 있음
나의 경우엔 이미지나 로그데이터로 추후에 적용해보고 싶다.

public class Message {
    private double data;

    // constructors and getter/setters
}



2-2. Message를 적재할 Queue 클래스

생산자가 생산한 메시지는 Queue에 적재되며,
소비자는 메시지를 소비하기 위해 Queue에서 꺼내 소비

public class DataQueue {

	private final Queue<Message> queue = new LinkedList<>();

	// 프로듀서가 큐에 메시지를 적재
    public void add(Message message) {
        queue.add(message);
    }

    // 컨슈머가 큐에서 메시지를 꺼내 소비
    public Message poll() {
        Message poppedMessage = queue.poll();
        return poppedMessage;
    }
}
	// etc..



2-3. 메시지를 생산할 생산자 클래스

메시지의 생산은 Producer 클래스를 통해 생성되어 큐에 적재됨

public class Producer implements Runnable {

	private final DataQueue dataQueue;

	public Producer(DataQueue dataQueue) {
		super();
		this.dataQueue = dataQueue;
	}

	public void run() {
		produce();
	}

	public void produce() {
		for (int i = 0; i < 5; i++) {
			Message message = produceMessage();

			dataQueue.add(message);

			// 현실성을 위해 랜덤 시간동안 지연
			sleep((long)(Math.random() * 100));
		}
	}

	private Message produceMessage() {
		Message message = new Message(generateUniqueString());

		System.out.println(String.format("[%s] 메시지 생산: %s%n", Thread.currentThread().getName(), message.getData()));

		return message;
	}

	private String generateUniqueString() {
		// 현재 시간으로부터 고유한 문자열 생성
		var sdf = new SimpleDateFormat("HHmmss");
		String timePart = sdf.format(new Date());

		// 뒤에 1~2개의 랜덤 대문자 알파벳 추가
		Random rand = new Random();
		int randomLength = rand.nextInt(2) + 1;
		StringBuilder randomChars = new StringBuilder();

		for (int i = 0; i < randomLength; i++) {
			char randomChar = (char)('A' + rand.nextInt(26)); // A-Z 범위의 대문자 랜덤 선택
			randomChars.append(randomChar);
		}

		return timePart + "-" + randomChars.toString(); // ex. "093410-AB"
	}
}



2-4. 메시지를 소비할 소비자 클래스

메시지의 소비는 Consumer 클래스를 통해 소비됨

public class Consumer implements Runnable {
	private final DataQueue dataQueue;

	public Consumer(DataQueue dataQueue) {
		super();
		this.dataQueue = dataQueue;
	}

	@Override
	public void run() {
		consume();
	}

	private void consume() {
		for (int i = 0; i < 5; i++) {
			Message message = dataQueue.poll();
			consumeMessage(message); // 메시지 소비

			// 현실성을 위해 랜덤 시간동안 지연
			ThreadUtil.sleep((long) (Math.random() * 100));
		}
	}

	private void consumeMessage(Message message) {
		System.out.println(String.format("  [%s] 메시지 소비: %s%n",
			Thread.currentThread().getName(), message.getData()));
	}
}



동시성 이슈 해결


우리가 구현한 Consumer-Producer의 문제점이 뭘까?
1. Consumer가 DataQueue가 비어있어도 Data를 소비하려고 한다.
2. Producer가 DataQueue가 꽉차있어도 Data를 생산하려고 한다.

해결법은? Consumer는 소비할 데이터가 있을때까지 wait 해야한다. Producer는 dataqueue의 자리가 생길때까지 wait해야 한다.

  • consumer가 data가 생길때까지 wait해주고, Producer가 data을 생산할 때 notify로 깨워준다.
  • Producer가 data를 생산할 자리가 생길때까지 wait하고, Consumer가 data를 소비할때 notify로 깨워준다.



코드

public class DataQueue {
	private final Queue<Message> queue = new LinkedList<>();
	private final int capacity; // 최대 큐 크기 제한

	public DataQueue(int capacity) {
		this.capacity = capacity;
	}

	// 생산자가 큐에 메시지를 적재
	public synchronized void add(Message message) {
		while (queue.size() >= capacity) { // 큐가 가득 차 있으면 대기
			try {
				System.out.println("Producer 대기 (큐가 가득 참)");
				wait();
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
			}
		}

		queue.add(message);
		notify(); // 대기 중인 Consumer 스레드들에게 알림
	}

	// 소비자가 큐에서 메시지를 꺼내 소비
	public synchronized Message poll() {
		while (queue.isEmpty()) { // 큐가 비어 있으면 대기
			try {
				System.out.println("Consumer 대기 (큐가 비어있음)");
				wait();
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
			}
		}

		Message poppedMessage = queue.poll();
		notify(); // 생산자를 깨워서 새 메시지를 추가하도록 함
		return poppedMessage;
	}

	public boolean isEmpty() {
		return queue.isEmpty();
	}
}

Trouble 1 Wait과 Notify를 다른 객체에서 호출하니 정상동작하지않음.

  • 문제상황: 처음엔 Consumer 클래스의 consume 함수에서 Message를 꺼내기전에 isEmpty()를 통해 상태를 체크하고 비어있다면 wait하게 해주었다. 하지만 이렇게 하니까 notify를 선언해도 쓰레드가 wait상태에서 깨어나지 않았다.

  • 원인: wait과 notify는 같은 모니터락을 공유해야한다. 서로 다른 객체에서 wait과 notify를 호출하면 서로 다른 상태값(모니터락)에 대한 변경을 요청하는것이다.

  • 해결방법: dataQueue에서 add와 poll에서 직접 wait과 notify를 해주니까 정상동작했다.

Trouble 2 : java.lang.IllegalMonitorStateException: current thread is not owner

  • 문제상황: DataQueue의 add와 poll 메서드에서 synchronized 키워드를 붙여주지 않았을 때 발생한 이슈이다.

  • 원인: 자바의 모든 객체는 모니터 락을 소유한다. 쓰레드들은 모니터 락을 이용해 공유자원의 사용을 관리한다.
    synchronized 키워드를 사용하면, 모니터 락을 소유한 쓰레드만 해당 메서드의 진입할 수 있고, 나머지 쓰레드들은 EntrySet에 대기하게 된다.
    우리가 사용한 wait 같은 경우 모니터락을 소유한채로 호출되어야한다.
    wait이 필요한 부분이 공유자원의 소비와 생산 작업이므로 모니터락으로 접근을 제한해주어야한다.
    Synchronized 메서드에서 wait이 호출될 때, 소유한 모니터락을 해제하고 다음 쓰레드에 락을 넘겨준다. 그 후 해당 쓰레드는 wait 상태로 들어가게된다.

  • 그런데, synchronized 키워드를 붙여주지 않으면 모니터를 소유하지않은 채로 wait이 호출되어 어떤 락을 해제해주어야 하는지 알 수 없게된다.
    또, 해당 프로세스가 wait이 끝날때도 모니터를 다시 얻어야하기 때문에 synchronized 키워드를 사용해 모니터를 해당 쓰레드가 소유하게 해주어야한다.

  • 해결방법: Synchronized 키워드를 사용해 모니터를 쓰레드가 소유하게 해주어야한다.

Blocking Queue


  • 위에서 우리가 구현한 Consumer,Producer와 동기화 기법들을 자동으로 적용해주는 Queue이다.
  • Blocking Queue는 인터페이스이므로 알맞은 구현체를 골라서 사용해야 한다.
  • 나는 ArrayBlockingQueue를 사용했다. 이 구현체는 미리 사이즈를 고정으로 선언해두어야한다.
  • Blocking Queue의 특징은 put과 take메서드의 동기화 기법이 적용되어있고, 큐가 비어있으면 take는 대기상태에 들어가고, 큐가 가득차 있으면 put은 대기상태에 들어간다.

사용예시

public class DataQueue {
	private final BlockingQueue<Message> queue = new ArrayBlockingQueue<>(5);

	// 생산자가 큐에 메시지를 적재
	public void add(Message message) {
		try {
			queue.put(message);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	// 소비자가 큐에서 메시지를 꺼내 소비
	public Message poll() {
		Message poppedMessage = null;
		try {
			poppedMessage = queue.take();
			return poppedMessage;
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return poppedMessage;
	}
}

put과 take의 예시코드

put(E e)

public void put(E e) throws InterruptedException {
    Objects.requireNonNull(e); 
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); //  락을 획득 모니터락을 의미하는듯
    try {
        while (count == items.length) //  큐가 가득 차면 대기 메서드내에 자체적으로 대기가 구현되어있음
            notFull.await(); //  notFull 조건 변수로 대기
        enqueue(e); //  실제로 큐에 데이터 추가
    } finally {
        lock.unlock(); //  락 해제
    }
}

우리가 구현했던 방식과 동일하게

  • 락을 휙득해서 스레드가 동시에 접근 못하도록 보호하고
  • 큐가 꽉 차 있다면 락을 해제하고, 대기상태로 들어간다.
  • finally 블럭으로 락을 해제해서 다른 스레드가 접근할 수 있도록 한다.

take()

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); //  락을 획득 (인터럽트 가능)
    try {
        while (count == 0) //  큐가 비어 있으면 대기
            notEmpty.await(); //  notEmpty 조건 변수로 대기
        return dequeue(); //  실제로 큐에서 데이터 꺼내기
    } finally {
        lock.unlock(); //  락 해제
    }
}
  • 마찬가지로, 락을 잡아서 다른 쓰레드가 접근하지 못하도록 보호하고, 큐가 비어있다면 락을 해제하고 대기 상태로 들어간다.
  • finally 블럭으로 락을 해제해서 다른 스레드가 접근 할 수 있도록 해준다.

notFull과 notEmpty란?

  • Java의 condition 인터페이스를 사용한 동기화 기법이다.
  • await(),signal(),signalAll()을 사용하여 특정 조건을 만족할 때까지 스레드를 대기시킴.
  • notFull은 "큐가 가득 차지 않은 상태"를 나타내는 조건
  • notEmpty는 "큐가 비어있지 않는 상태"를 나타내는 조건

한 마디로 자바에서 지원하는 wait과 notify를 보다 세밀하게 제어할 수 있는 고급 동기화 메커니즘이다.



profile
들은것은 잊어버린다 본것은 기억된다 해본것은 내것이 된다

0개의 댓글