메시지큐 구현하기

Haechan Kim·2024년 7월 11일
0

Spring

목록 보기
67/68

정해진 시간마다 알림 메시지을 보내는 기능을 구현하자.

메시지 생성과 메시지 전송 스레드를 각각 구분해 작업을 분리 -> 메시지 큐를 직접 구현해보자.

메시지 큐와 메시지 컨슈머

메시지 컨슈머가 메시지 큐에서 메시지 꺼내와서 처리.
큐에 메시지 들어있는지 매 초마다 검사 (@Scheduled).
메시지 처리 스레드풀 작동 안한다면 스레드풀 생성해 메시지 처리.
스레드풀을 메시지 처리할때만 생성시키고, 처리 후 소멸.

메시지 큐

메시지 큐 구현은 ConcurrentLinkedQueue를 사용한다.
-> 동시성 처리를 위한 컬렉션.
-> Lock-free 알고리즘 구현한 클래스. -> 여러 스레드 동시 접근 시, 락 없이 최소 하나의 스레드가 안전하게 요소 잡도록 함.

Future

비동기 연산 위한 인터페이스.
비동기 처리 완료 확인, 처리 완료 대기, 처리 결과 반환 메서드 제공.
Future 이용하면 멀티스레드 환경에서 처리된 데이터 다른 스레드에 전달 가능.
Future는 내부적으로 Thread Safe 하기 때문에 synchronized block 사용 안해도 됨.

  • Future의 불편한 점
    - 예외 처리 API 제공 X.
    - 여러 Future 조합 어려움.
    - Future에서 반환하는 결과값을 사용하는 작업은 get() 뒤에 와야 함.
ExecutorService executorService = Executors.newFixedThreadPool(5);
Future<String> future = executorService.submit(() -> "hello!");

// get 하기 전엔 아무것도 못함.
future.get();

Future는 오직 get 호출로만 작업 완료 가능.
get은 블로킹 호출
-> 작업 완료까지 기다렸다 최종 결과 얻음. (Future 객체를 지연 완료 객체 pending Completion라고 부른다.)
-> 비동기 작업 응답에 추가 작업 적합 X.

CompletableFuture

Future의 문제들을 해결한 클래스.

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
	...

CompletionStage는 결국 계산이 완료될 것이라는 약속의 의미.
계산의 완료는 여러 단계로 이어질 수 있고, 각 단계에서 발생한 에러 관리 가능.

  • runAsync
    runAsync는 결과 값 담지않음.
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> 
	System.out.print("hello!");
)
cf.join(); // hello!
  • supplyAsync
    반환값이 있는 경우 비동기 작업 실행.
CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> "hello!");
System.out.print(cf.join()); // hello!

메시지 핸들러

메시지 처리 역할.

...

[참고]

0개의 댓글