
Reactor는 대부분 데이터가 안에서 바깥으로 흘러가는 구조(Pull)를 따르지만, 때로는 외부에서 임의로 데이터를 Push해야 할 때가 있다. 예를 들어, 외부에서 전달된 콜백 결과나 수동으로 제어하는 신호를 스트림에 넣어야 하는 상황이다. 이럴 때 사용하는 것이 바로 Sinks다.
Sinks는 외부에서 프로그래밍 방식으로 시그널을 밀어 넣을 수 있는 Reactive 스트림의 진입점 역할을 한다.
기존에는 generate, create와 같은 오퍼레이터로 동작했지만, 이는 싱글 스레드 기반으로 제한이 많았다.
Sinks는 멀티 스레드 환경에서도 안전하게 사용할 수 있으며, 시그널 처리 목적에 맞게 다양한 구현체를 제공한다.
Sinks.One: 단 한 번의 emitSinks.One은 최대 한 번만 데이터를 emit할 수 있다.
내부적으로 Mono로 변환되며, 여러 구독자에게 동일한 단일 값을 전송할 수 있다.
Sinks.One<String> sink = Sinks.one();
Mono<String> mono = sink.asMono();
mono.subscribe(data -> System.out.println("Subscriber1: " + data));
mono.subscribe(data -> System.out.println("Subscriber2: " + data));
sink.tryEmitValue("Hello Reactor!");
sink.tryEmitValue("Dropped!"); // 무시됨
출력 결과:
Subscriber1: Hello Reactor!
Subscriber2: Hello Reactor!
→ 두 번째 emit은 무시되며, 구독자는 동일한 값을 공유한다.
→ 비동기 작업의 결과 전파나 단일 응답 처리에 적합하다.
Sinks.Many: 여러 개 데이터 발행Sinks.Many는 Flux 기반으로 여러 값을 emit할 수 있다.
emit 방식과 구독자 모델에 따라 아래 세 가지로 나뉜다.
unicast(): 단일 구독자 전용onBackpressureBuffer()를 지정해야 함Sinks.Many<Integer> sink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> flux = sink.asFlux();
flux.subscribe(data -> System.out.println("# Subscriber1: " + data));
sink.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST);
sink.emitNext(2, Sinks.EmitFailureHandler.FAIL_FAST);
// 두 번째 구독자는 에러 발생
flux.subscribe(data -> System.out.println("# Subscriber2: " + data));
출력 결과:
# Subscriber1: 1
# Subscriber1: 2
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Sinks.many().unicast() sinks only allow a single Subscriber
→ 단방향 통신, Queue-like 구조에 적합
multicast(): 실시간 스트리밍 전송Sinks.Many<Integer> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<Integer> flux = sink.asFlux();
flux.subscribe(data -> System.out.println("# Subscriber1: " + data));
sink.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST);
sink.emitNext(2, Sinks.EmitFailureHandler.FAIL_FAST);
flux.subscribe(data -> System.out.println("# Subscriber2: " + data));
sink.emitNext(3, Sinks.EmitFailureHandler.FAIL_FAST);
출력 결과:
# Subscriber1: 1
# Subscriber1: 2
# Subscriber1: 3
# Subscriber2: 3
→ 실시간 알림, 브로드캐스트 시나리오에 적합
replay(): 과거 이벤트도 공유limit(n)으로 저장 용량 제어 가능Sinks.Many<Integer> sink = Sinks.many().replay().limit(2);
Flux<Integer> flux = sink.asFlux();
sink.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST);
sink.emitNext(2, Sinks.EmitFailureHandler.FAIL_FAST);
sink.emitNext(3, Sinks.EmitFailureHandler.FAIL_FAST);
flux.subscribe(data -> System.out.println("# Subscriber1: " + data));
sink.emitNext(4, Sinks.EmitFailureHandler.FAIL_FAST);
flux.subscribe(data -> System.out.println("# Subscriber2: " + data));
출력 결과:
# Subscriber1: 2
# Subscriber1: 3
# Subscriber1: 4
# Subscriber2: 3
# Subscriber2: 4
→ 최근 이벤트 캐시, 지연 구독 보정 등에 적합
| 유형 | 특징 | 적합한 시나리오 |
|---|---|---|
Sinks.One | 단일 emit, 여러 구독자 가능 | 응답 처리, 완료 알림 |
Sinks.Many.unicast() | 단일 구독자, 선입선출 큐 | 단방향 데이터 처리 |
Sinks.Many.multicast() | 다수 구독자, 실시간 데이터만 수신 | 스트리밍, 알림, 채팅 |
Sinks.Many.replay() | emit 값 저장 후 재전송 | 캐시, 상태 공유, 지연 보정 |
Sinks는 Reactor의 흐름을 외부에서 제어해야 할 때 매우 강력한 도구다.
이벤트가 외부에서 발생하고, 이를 reactive하게 처리하려면 Sinks를 사용하여 스트림과 연결할 수 있다.
프로듀서-컨슈머 구조, 이벤트 브로커, HTTP 콜백 처리 등 다양한 시나리오에서 활용할 수 있으며, 상황에 맞는 Sinks 유형을 선택하는 것이 중요하다.