생산자 소비자 문제2

황상익·2024년 10월 17일

Inflearn JAVA

목록 보기
51/61

Lock Condition

생산자 스레드가 대기하는 곳과, 소비사 스레드가 대기하는 곳을 나누면 비효율성 문제를 해결!
Lock, ReentrantLock을 사용

//아직 공간을 분리하지 않아 비효율성 문제는 발생
public class BoundedQueueV4 implements BoundedQueue {

    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition(); //ReentrantLock을 사용하는 스레드가 기다리는 대기 집합

    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    public BoundedQueueV4(int max) {
        this.max = max;
    }

    @Override
    public void put(String data) {
        lock.lock();

        try {
            while (queue.size() == max) {
                log("[put] 큐가 가득 참, 생산자 대기");
                try {
                    condition.await(); //condition이라는 곳에 들어가서 대기 WAITING
                    log("[put] 생산자가 깨어남");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            queue.offer(data);
            log("[put] 생산자 데이터 저장, signal() 호출");
            condition.signal(); //signal을 주면 대기하던 스레드가 깨어난다.
        } finally {
            lock.unlock();
        }
    }

    @Override
    public String take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                log("[take] 큐에 데이터가 없음, 소비자 대기");
                try {
                    condition.await();
                    log("[take] 소비자 깨어남");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            String data = queue.poll();
            log("[take] 소비자 데이터 획득, signal() 호출");
            condition.signal();
            return data;
        } finally {
            lock.unlock();
        }
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}
  1. Condition
    ReentrantLock이 스레드가 대기하는 공간.
    lock.newCondition() 메서드를 호출, 스레드 대기 공간 형성

  2. condition.await()
    현재 스레드를 대기상태로 보관
    ReentrantLock에서 획득한 락을 반납하고 대기 상태로 condition에 보관

  3. condition.signal()
    condition에서 대기중인 스레드를 하나 깨움.


lock은 synchornized에서 사용하는 객체 내부에 있는 모니터 락이 아니라, ReentrantLock을 의미

생산자 소비자 대기 공간 분리

//생산자는 소비자를 깨우고, 소비자는 생산자를 깨운다.
public class BoundedQueueV5 implements BoundedQueue {

    private final Lock lock = new ReentrantLock();
    private final Condition producerCond = lock.newCondition(); //생산자 대기 공간
    private final Condition consumerCond = lock.newCondition(); //소비자 대기 공간

    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    public BoundedQueueV5(int max) {
        this.max = max;
    }

    @Override
    public void put(String data) {
        lock.lock();

        try {
            while (queue.size() == max) {
                log("[put] 큐가 가득 참, 생산자 대기");
                try {
                    producerCond.await(); //condition이라는 곳에 들어가서 대기 WAITING
                    log("[put] 생산자가 깨어남");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            queue.offer(data);
            log("[put] 생산자 데이터 저장, consumerCond() 호출");
            consumerCond.signal(); //signal을 주면 대기하던 스레드가 깨어난다.
        } finally {
            lock.unlock();
        }
    }

    @Override
    public String take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                log("[take] 큐에 데이터가 없음, 소비자 대기");
                try {
                    consumerCond.await();
                    log("[take] 소비자 깨어남");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            String data = queue.poll();
            log("[take] 소비자 데이터 획득, producerCond() 호출");
            producerCond.signal();
            return data;
        } finally {
            lock.unlock();
        }
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}
  1. Condition 분리
    consumerCond : 생산자를 위한 스레드 대기 공간
    producerCond : 소비자를 위한 스레드 대기 공간

  2. put(data) - 생산자 스레드 호출
    producerCond.await() 호출을 통해 생산자 스레드를 생산자 전용 스레드 대기 공간에 보관
    consumerCond.signal() 호출해서 소비자 전용 스레드 대기 공간에 신호 보냄

  3. take() - 소비자 스레드가 호출
    consumerCond.await() : 큐가 비었을 경우 소비자 스레드를 소비자 전용 스레드 대기 공간에 보관
    producerCond.signal() : 소비자가 데이터를 소비한 경우, 큐에 여유공간이 생김. 생산자 전용 스레드 대기 공간에 신호를 보냄

Object.notify() vs Condition.signal()

  • Object.notify
    대기중인 스레드 중 임의의 스레드를 선택해서 깨움, 순서 보장 X
    synchronized block 내에서 모니터 락을 갖고 잇는 스레드가 호출

  • Condition.signal
    대기중인 스레드중 하나를 깨우며, 일반적으로 FIFO 순서로 깨움

스레드 대기

synchronized 대기

락 획득 대기

  • BLOCKED 상태로 락 획득 대기
  • synchronzied 시작할때 락이 없으면 대기
  • 다른 스레드가 synchronized를 빠져 나갈때 대기가 풀리면서 락 획득 시도

wait

  • WAITING 상태로 대기
  • wait()을 호출했을때 스레드 대기 집합에서 대기
  • notify 호출시 빠져나감


개념상으로 보면, 락 대기 집합이 1차 대기소, 스레드 대기 집합이 2차 대기소
2차를 거쳐 1차 대기소까지 나와야 임계영역에서 로직을 수행 할 수 있다.

ReentrantLock 대기

ReentrantLock 락 획득 대기

  • ReentrantLock의 대기 큐에서 관리
  • WAITING 상태로 락 획득 대기
  • lock.lock을 호출시 락이 없다면 대기
  • lock.unlock()을 호출 했을 때 대기가 풀리며 락 획득 시도, 락을 획득하면 대기 큐를 빠르게 빠져나감

await() 대기

  • condition.await()를 호출 했을 때 condition 객체의 스레드 대기 공간에서 관리
  • WAITING 상태로 대기
  • condition.signal()을 호출했을때 condition 객체의 스레드 대기 공간에서 빠져나감

ReentrantLock 또한 2차 대기소

Blocking Queue

데이터 추가 차단

  • 큐가 가득 차면 데이터 추가 작업 put을 시도하는 스레드는 공간이 생길 때 까지 차단
    데이터 획득 차단
  • 큐가 비어있으면, 획득 잡업 take을 시도하는 스레드는 큐에 데이터가 들어올때 까지 차단
BlockingQueue API
package java.util.concurrent;
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
boolean remove(Object o);
/

대표적 구현체

  • ArrayBlockingQueue : 배열 기반 구현, 버퍼의 크기가 고정
  • LinkedBlockingQueue : 링크 기반 구현, 버퍼의 크기가 고정. 무한하게 사용
public class BoundedQueueV6_1 implements BoundedQueue {

    private BlockingQueue<String> queue; //스레드를 강제로 block 시킬 수 있는 것

    public BoundedQueueV6_1(int max) {
        queue = new ArrayBlockingQueue<String>(max); //size를 넣어줘야 함
    }

    @Override
    public void put(String data) {
        try {
            queue.put(data);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String take() {
        try {
            return queue.take();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}

ArrayBlockingQueue.put()

public class ArrayBlockingQueue {
final Object[] items;
int count;
ReentrantLock lock;
Condition notEmpty; //소비자 스레드가 대기하는 condition
Condition notFull; //생산자 스레드가 대기하는 condition
public void put(E e) throws InterruptedException {
lock.lockInterruptibly();
try {
while (count == items.length) {
notFull.await();
}
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E e) {
items[putIndex] = e;
count++;
notEmpty.signal();
}
}
  • ArrayBlockingQueue는 내부에서 ReentrantLock을 사용 -> 생산자 전용 대기실과 , 소비자 전용 대기실이 구현

기능 설명

실무에서 멀티스레드를 사용할 때는 응답성이 중요

큐가 가득 찼을 때 생각할 수 있는 선택지는 4가지가 있다.
1. 예외를 던진다. 예외를 받아서 처리한다.
2. 대기하지 않는다. 즉시 false를 반환한다.
3. 대기한다.
4. 특정 시간 만큼만 대기한다.

Throws Exception - 대기시 예외

  • add(e) : 저장된 요소를 큐에 추가, 큐가 가득 차면 IllegalStateException 예외를 던짐
  • remove() : 큐에서 요소를 제거 반환, 큐가 비어있으면 NoSuchElementException 을 던짐
  • element() : 큐의 머리 요소를 반환, 요소를 큐에서 제거하지는 않음, 큐가 비었다면 NoSuchElementException 를 반환

Special Value - 대기시 즉시 반환

  • offer(e) : 지정된 요소를 큐에 추가하려고 시도하며, 큐가 가득 차면 false 를 반환
  • poll() : 큐에서 요소를 제거하고 반환한다. 큐가 비어 있으면 null 을 반환
  • peek() : 큐의 머리 요소를 반환하지만, 요소를 큐에서 제거하지 않는다. 큐가 비어 있으면 null 을 반환.

Blocks - 대기

  • put(e) : 지정된 요소를 큐에 추가할 때까지 대기한다. 큐가 가득 차면 공간이 생길 때까지 대기
  • take() : 큐에서 요소를 제거하고 반환한다. 큐가 비어 있으면 요소가 준비될 때까지 대기
  • Examine (관찰) : 해당 사항 없음

Times Out - 시간 대기

  • offer(e, time, unit) : 지정된 요소를 큐에 추가하려고 시도하며, 지정된 시간 동안 큐가 비워지기를 기다리다가 시간이 초과되면 false 를 반환한다.
  • poll(time, unit) : 큐에서 요소를 제거하고 반환한다. 큐에 요소가 없다면 지정된 시간 동안 요소가 준비되기를 기다리다가 시간이 초과되면 null 을 반환한다.
  • Examine (관찰) : 해당 사항 없음.
public class BoundedQueueV6_2 implements BoundedQueue {

    private BlockingQueue<String> queue; //스레드를 강제로 block 시킬 수 있는 것

    public BoundedQueueV6_2(int max) {
        queue = new ArrayBlockingQueue<String>(max); //size를 넣어줘야 함
    }

    @Override
    public void put(String data) {
        boolean rst = queue.offer(data);
        // 버퍼가 가득 차있는 경우 데이터를 추가하지 않고 바로 false
        log("저장 시도 결과 = " + rst);
    }

    @Override
    public String take() {
        // 버퍼가 비어있는 경우 null
        return queue.poll();
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}
public class BoundedQueueV6_3 implements BoundedQueue {

    private BlockingQueue<String> queue; //스레드를 강제로 block 시킬 수 있는 것

    public BoundedQueueV6_3(int max) {
        queue = new ArrayBlockingQueue<String>(max); //size를 넣어줘야 함
    }

    @Override
    public void put(String data) {
        try {
            //대기 시간 설정 가능
            boolean rst = queue.offer(data, 1, TimeUnit.NANOSECONDS);
            log("저장 시도 결과 = " + rst);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String take() {
        try {
            //대기 시간 설정 가능
            return queue.poll(2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}
public class BoundedQueueV6_4 implements BoundedQueue {

    private BlockingQueue<String> queue; //스레드를 강제로 block 시킬 수 있는 것

    public BoundedQueueV6_4(int max) {
        queue = new ArrayBlockingQueue<String>(max); //size를 넣어줘야 함
    }

    @Override
    public void put(String data) {
        queue.add(data); //IllegalStateException : Queue full
    }

    @Override
    public String take() {
        return queue.remove(); //NoSuchElementException
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}
profile
개발자를 향해 가는 중입니다~! 항상 겸손

0개의 댓글