BlockingQueue

sungs·2025년 7월 16일

자바

목록 보기
43/95

BlockingQueue

생산자-소비자 문제를 해결하기 위해 도입된 큐. 특정 조건에 따라 스레드를 대기시키거나 boolean을 반환하거나 할 수 있다.
생산자가 버퍼에 데이터를 집어넣거나 소비자가 데이터를 소비하는 걸 좀 더 편리하게 만들어 준다.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom; // 임의의 숫자를 생성하기 위함
import java.util.concurrent.TimeUnit; // 시간 단위를 사용하기 위함

public class BlockingQueueExample {

    public static void main(String[] args) {
        // 용량이 5인 ArrayBlockingQueue를 생성합니다.
        // 큐가 가득 차면 put 메서드는 블로킹되고, 큐가 비면 take 메서드는 블로킹됩니다.
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

        // 생산자 스레드 시작
        Thread producerThread = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    int data = ThreadLocalRandom.current().nextInt(100); // 0-99 사이의 임의의 숫자 생성
                    System.out.println("생산자: " + data + "를 큐에 넣으려고 합니다.");
                    // 큐가 가득 차면 블로킹됩니다.
                    queue.put(data);
                    System.out.println("생산자: " + data + "를 큐에 넣었습니다. 현재 큐 크기: " + queue.size());
                    Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500)); // 100ms ~ 500ms 대기
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 인터럽트 상태 복원
                System.out.println("생산자 스레드 인터럽트됨.");
            }
        });

        // 소비자 스레드 시작
        Thread consumerThread = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    System.out.println("소비자: 큐에서 데이터를 가져오려고 합니다.");
                    // 큐가 비어 있으면 블로킹됩니다. 최대 2초까지 기다립니다.
                    Integer data = queue.poll(2, TimeUnit.SECONDS); // 2초 타임아웃 설정
                    if (data != null) {
                        System.out.println("소비자: " + data + "를 큐에서 가져왔습니다. 현재 큐 크기: " + queue.size());
                    } else {
                        System.out.println("소비자: 2초 동안 큐에서 데이터를 가져오지 못했습니다. 큐가 비어있는 것 같습니다.");
                    }
                    Thread.sleep(ThreadLocalRandom.current().nextInt(300, 800)); // 300ms ~ 800ms 대기
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 인터럽트 상태 복원
                System.out.println("소비자 스레드 인터럽트됨.");
            }
        });

        producerThread.start();
        consumerThread.start();

        // 스레드들이 종료될 때까지 대기
        try {
            producerThread.join();
            consumerThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("메인 스레드 인터럽트됨.");
        }

        System.out.println("\n모든 스레드 작업 완료. 최종 큐 상태: " + queue);
    }
}

BlockingQueue 인터페이스 구현체

  • ArrayBlockingQueue
    : 배열 기반으로 구현되어 있고, 버퍼의 크기가 고정되어 있다.
  • LinkedBlockingQueue
    : 링크 기반으로 구현되어 있고, 버퍼의 크기를 고정할 수도, 또는 무한하게 사용할 수
    도 있다.
  • BlockingDeque
    : 동시성 자료 구조.

기능

Queue가 가득 찼을 때 수행할 작업을 정할 수 있다.

  • 예외 던지기
  • 대기하지 않고 즉시 false를 반환하기
  • 대기하기
  • 일정 시간 동안 대기하기

각각 작업마다 BlockingQueue가 메서드를 제공한다.

대기하지 않고 즉시 false 반환

  • offer(e): 요소를 큐에 추가한다. 찼으면 false를 반환한다.
  • poll(): 큐에서 요소를 제거한다. 비어있으면 null을 반환한다.
  • peek(): 머리 요소를 제거하지 않고 반환한다. 비어 있으면 null을 반환한다.

예외 던지고 처리하기

  • add(e): 요소를 큐에 추가한다. 찼으면 IllegalStateException 예외를 던진다.

  • remove(): 요소를 큐에서 제거한다. 비어있으면 NoSuchElementException 예외를 던진
    다.

  • element(): 머리 요소를 반환하지만 제거하지는 않는다. NoSuchElementException
    예외를 던진다.

    대기하기

  • put(e): 요소를 큐에 추가한다. 찼으면 큐가 빌 때까지 waiting 상태로 대기한다.

  • take(): 요소를 제거한다. 비어있으면 요소가 들어올 때까지 waiting 상태로 대기한다.

    일정 시간 동안 대기하기

  • offer(e, time, unit): 요소에 큐를 추가한다. 일정 시간동안 대기한 다음 여전히 큐가 안 비면 false를 반환한다.

  • poll(e, time, unit): 요소에 큐를 제거한다. 일정 시간동안 대기한 다음 여전히 큐에 요소가 안 들어오면 null을 반환한다.

    여러 가지 기능이 있으므로 객체를 만들어 준 다음 자유롭게 사용하면 된다.

profile
앱 개발 공부 중

0개의 댓글