금일은 지난 시간에 이어 Java 의 생산자 소비자 문제를 해결하기 위한 멀티스레드 자료 구조인 BlockingQueue 에 대해 알아보는 시간입니다. 말 그대로 스레드를 차단 할 수 있는 큐를 말합니다.
BlockingQueue 는 인터페이스 이며 스레드 관점에서 보면 큐가 특정 조건이 만족될 때까지 스레드의 작업을 blocking 한다.
Queue 를 상속받으며 데이터 추가, 획득 메서드가 존재한다.
해당 구현체로는 다음과 같다.
수정된 put, take method
private BlockingQueue<String> queue;
public BoundedQueueV6_1(int max) {
this.queue = new ArrayBlockingQueue<>(max);
}
@Override
public void put(String data) {
try {
queue.put(data);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String take() {
try {
return queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
put() 메서드에 들어가 ArrayBlockkingQueue 의 구현체를 보면 다양한 함수의 기능을 확인 가능합니다.
차이점으로는 lock.lock() 대신에
lock.lockInterruptibly() 을 사용한 점과, 내부 자료 구조의 차이 정도이다
큐가 가득 찼을 때 생각할 수 있는 선택지는 4가지가 있다.
해당 문제를 해결하기 위해 BlockingQueue
는 다양한 메서드를 제공한다.
Throws Exception - 대기시 예외
Special Value - 대기시 즉시 반환
Blocks - 대기
Times Out - 시간 대기
해당 Method 들은 인터럽트를 제공한다
@Override
public void put(String data) {
try {
// 대기 시간 설정 가능
boolean result = queue.offer(data, 1, TimeUnit.NANOSECONDS);
log("저장 시도 결과 = " + result);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String take() {
try {
// 대기 시간 설정 가능
return queue.poll(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
버퍼가 가득차거나 데이터가 없다면 해당 시간 동안 기다리고 없다면 null 을 반환하게 된다.
public BoundedQueueV6_4(int max) {
this.queue = new ArrayBlockingQueue<>(max);
}
@Override
public void put(String data) {
queue.add(data); // java.lang.IllegalStateException: Queue full
}
@Override
public String take() {
return queue.remove(); // java.util.NoSuchElementException
}
해당 method 는 대기 발생시 예외를 발생시킨다.
해당 생산자의 호출은 main method 이며 max 는 capacity 를 의미한다.
BlockingQueue 인터페이스를 바로 상속 받아도 아무 문제가 없다.