[Project Reactor] 9. Reactor Sinks: 외부 이벤트를 리액티브 스트림에 푸시하는 방법

y001·2025년 5월 1일

Reactive Programming

목록 보기
9/30
post-thumbnail

Reactor는 대부분 데이터가 안에서 바깥으로 흘러가는 구조(Pull)를 따르지만, 때로는 외부에서 임의로 데이터를 Push해야 할 때가 있다. 예를 들어, 외부에서 전달된 콜백 결과나 수동으로 제어하는 신호를 스트림에 넣어야 하는 상황이다. 이럴 때 사용하는 것이 바로 Sinks다.


1. Sinks란?

Sinks는 외부에서 프로그래밍 방식으로 시그널을 밀어 넣을 수 있는 Reactive 스트림의 진입점 역할을 한다.
기존에는 generate, create와 같은 오퍼레이터로 동작했지만, 이는 싱글 스레드 기반으로 제한이 많았다.
Sinks는 멀티 스레드 환경에서도 안전하게 사용할 수 있으며, 시그널 처리 목적에 맞게 다양한 구현체를 제공한다.


2. Sinks 종류와 특성

2.1 Sinks.One: 단 한 번의 emit

Sinks.One은 최대 한 번만 데이터를 emit할 수 있다.
내부적으로 Mono로 변환되며, 여러 구독자에게 동일한 단일 값을 전송할 수 있다.

Sinks.One<String> sink = Sinks.one();
Mono<String> mono = sink.asMono();

mono.subscribe(data -> System.out.println("Subscriber1: " + data));
mono.subscribe(data -> System.out.println("Subscriber2: " + data));

sink.tryEmitValue("Hello Reactor!");
sink.tryEmitValue("Dropped!"); // 무시됨

출력 결과:

Subscriber1: Hello Reactor!
Subscriber2: Hello Reactor!

→ 두 번째 emit은 무시되며, 구독자는 동일한 값을 공유한다.
→ 비동기 작업의 결과 전파단일 응답 처리에 적합하다.


2.2 Sinks.Many: 여러 개 데이터 발행

Sinks.ManyFlux 기반으로 여러 값을 emit할 수 있다.
emit 방식과 구독자 모델에 따라 아래 세 가지로 나뉜다.


(1) unicast(): 단일 구독자 전용

  • 한 명의 구독자만 허용
  • emit 전에 구독이 먼저 연결돼야 함
  • 반드시 onBackpressureBuffer()를 지정해야 함
Sinks.Many<Integer> sink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> flux = sink.asFlux();

flux.subscribe(data -> System.out.println("# Subscriber1: " + data));

sink.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST);
sink.emitNext(2, Sinks.EmitFailureHandler.FAIL_FAST);

// 두 번째 구독자는 에러 발생
flux.subscribe(data -> System.out.println("# Subscriber2: " + data));

출력 결과:

# Subscriber1: 1
# Subscriber1: 2
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Sinks.many().unicast() sinks only allow a single Subscriber

단방향 통신, Queue-like 구조에 적합


(2) multicast(): 실시간 스트리밍 전송

  • 여러 구독자 허용
  • Hot Publisher처럼 동작
  • 구독 시점 이후의 데이터만 수신 가능
Sinks.Many<Integer> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<Integer> flux = sink.asFlux();

flux.subscribe(data -> System.out.println("# Subscriber1: " + data));

sink.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST);
sink.emitNext(2, Sinks.EmitFailureHandler.FAIL_FAST);

flux.subscribe(data -> System.out.println("# Subscriber2: " + data));

sink.emitNext(3, Sinks.EmitFailureHandler.FAIL_FAST);

출력 결과:

# Subscriber1: 1
# Subscriber1: 2
# Subscriber1: 3
# Subscriber2: 3

실시간 알림, 브로드캐스트 시나리오에 적합


(3) replay(): 과거 이벤트도 공유

  • emit된 값을 메모리에 저장하고 새 구독자에게 재전달
  • limit(n)으로 저장 용량 제어 가능
Sinks.Many<Integer> sink = Sinks.many().replay().limit(2);
Flux<Integer> flux = sink.asFlux();

sink.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST);
sink.emitNext(2, Sinks.EmitFailureHandler.FAIL_FAST);
sink.emitNext(3, Sinks.EmitFailureHandler.FAIL_FAST);

flux.subscribe(data -> System.out.println("# Subscriber1: " + data));

sink.emitNext(4, Sinks.EmitFailureHandler.FAIL_FAST);

flux.subscribe(data -> System.out.println("# Subscriber2: " + data));

출력 결과:

# Subscriber1: 2
# Subscriber1: 3
# Subscriber1: 4
# Subscriber2: 3
# Subscriber2: 4

최근 이벤트 캐시, 지연 구독 보정 등에 적합


3. 정리

유형특징적합한 시나리오
Sinks.One단일 emit, 여러 구독자 가능응답 처리, 완료 알림
Sinks.Many.unicast()단일 구독자, 선입선출 큐단방향 데이터 처리
Sinks.Many.multicast()다수 구독자, 실시간 데이터만 수신스트리밍, 알림, 채팅
Sinks.Many.replay()emit 값 저장 후 재전송캐시, 상태 공유, 지연 보정

마무리

Sinks는 Reactor의 흐름을 외부에서 제어해야 할 때 매우 강력한 도구다.
이벤트가 외부에서 발생하고, 이를 reactive하게 처리하려면 Sinks를 사용하여 스트림과 연결할 수 있다.
프로듀서-컨슈머 구조, 이벤트 브로커, HTTP 콜백 처리 등 다양한 시나리오에서 활용할 수 있으며, 상황에 맞는 Sinks 유형을 선택하는 것이 중요하다.

0개의 댓글