Sink는 Reactor 프레임워크에서 프로그래밍 방식으로 리액티브 스트림(Mono/Flux)에 데이터를 발행(emit)하는 데 사용되는 도구이다.
쉽게 말해서 외부 소스에서 발생한 이벤트를 리액티브 스트림 파이프라인으로 흘려보내는 "입구" 역할을 한다.
Flux.interval, WebClient)하거나, 다른 리액티브 소스(ex: Flux.fromIterable, Mono.fromCallable)에서 가져온다.Sinks가 유용하게 사용된다.Sinks는 데이터를 발행(emitting)하는 역할에만 집중한다. 데이터를 소비하거나 변환하는 로직은 Sink에서 얻는 Mono나 Flux를 통해 처리해야 한다.Sinks는 스레드 안정성을 보장하며, tryEmitXXX()같은 메소드를 통해 발행 성공 여부를 즉시 반환하여 발행 중 발생할 수 있는 오류나 백프레셔 상황을 안전하게 처리할 수 있도록 돕는다.Sinks.One, Sinks.Many와 같은 다양한 팩토리 메소드를 통해 단일 값 발행, 다중 값 발행 등 스트림의 종류와 발행 전략(ex: unicast, multicast, replay)을 선택할 수 있다.
Sinks는 발행하려는 데이터의 양과 구독자 수에 따라 크게 두 가지 종류라 나뉜다.
Sinks.One<T>Mono<T>와 연결된다.Sinks.one()Sinks.Many<T>Flux<T>와 연결된다.unicast(): 단 하나의 구독자만 허용한다. 첫 구독자가 연결되면 Sink는 "활성" 상태가 되고 다른 구독자가 연결을 시도하면 오류를 발생시킨다.multicast(): 여러 구독자를 허용한다. 발행되는 데이터는 연결된 모든 구독자에게 전달된다.replay(): 여러 구독자를 허용하며, Sink에 발행된 과거 데이터를 새로 구독하는 구독자에게도 다시 발행한다. 캐싱과 유사한 효과를 가진다.@SpringBootTest
public class SinkTest {
@Test
void sinksManyUnicastOnBackpressure() throws InterruptedException {
System.out.println("--- Unicast Sink Example ---");
// Unicast Sink 생성: 단 하나의 구독자만 허용
Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
// 첫 번째 구독자 연결
unicastSink.asFlux()
.subscribe(
data -> System.out.println("구독자 1 (Unicast): " + data),
error -> System.err.println("구독자 1 (Unicast) 에러: " + error.getMessage()),
() -> System.out.println("구독자 1 (Unicast) 완료")
);
// 데이터 발행
unicastSink.tryEmitNext("데이터 A");
unicastSink.tryEmitNext("데이터 B");
// 두 번째 구독자 연결 시도 (에러 발생 예상)
try {
unicastSink.asFlux()
.subscribe(
data -> System.out.println("구독자 2 (Unicast): " + data),
error -> System.err.println("구독자 2 (Unicast) 에러: " + error.getMessage()),
() -> System.out.println("구독자 2 (Unicast) 완료")
);
} catch (IllegalStateException e) {
System.err.println("두 번째 구독자 연결 시도 실패: " + e.getMessage());
}
unicastSink.tryEmitNext("데이터 C"); // 여전히 첫 번째 구독자에게만 발행
Thread.sleep(100); // 비동기 처리를 위한 대기
unicastSink.tryEmitComplete(); // 스트림 완료
System.out.println("Unicast Sink 완료.");
Thread.sleep(100);
}
@Test
void sinkManyMulticastOnBackpressure() throws InterruptedException {
System.out.println("\n--- Multicast Sink Example ---");
// Multicast Sink 생성: 여러 구독자 허용
Sinks.Many<String> multicastSink = Sinks.many().multicast().onBackpressureBuffer();
// 첫 번째 구독자 연결
multicastSink.asFlux()
.subscribe(
data -> System.out.println("구독자 1 (Multicast): " + data),
error -> System.err.println("구독자 1 (Multicast) 에러: " + error.getMessage()),
() -> System.out.println("구독자 1 (Multicast) 완료")
);
// 구독 전에 데이터 발행
multicastSink.tryEmitNext("사전 발행 데이터 X");
// 데이터 발행
multicastSink.tryEmitNext("데이터 D");
multicastSink.tryEmitNext("데이터 E");
Thread.sleep(50); // 구독자 2 연결 전 발행 데이터 수신 대기
// 두 번째 구독자 연결 (이전 데이터는 받지 못함)
multicastSink.asFlux()
.subscribe(
data -> System.out.println("구독자 2 (Multicast): " + data),
error -> System.err.println("구독자 2 (Multicast) 에러: " + error.getMessage()),
() -> System.out.println("구독자 2 (Multicast) 완료")
);
// 두 번째 구독자 연결 후에 데이터 발행
multicastSink.tryEmitNext("데이터 F");
multicastSink.tryEmitNext("데이터 G");
Thread.sleep(100);
multicastSink.tryEmitComplete(); // 스트림 완료
System.out.println("Multicast Sink 완료.");
Thread.sleep(100);
}
@Test
void sinksManyReplayLatest() throws InterruptedException {
System.out.println("\n--- Replay Sink Example (latest) ---");
// Replay Sink (latest): 마지막 발행된 데이터만 재발행
Sinks.Many<String> replayLatestSink = Sinks.many().replay().latest();
// 데이터 발행 (구독자 연결 전)
replayLatestSink.tryEmitNext("초기 데이터 1");
replayLatestSink.tryEmitNext("초기 데이터 2"); // 마지막 데이터
// 첫 번째 구독자 연결 (마지막 데이터 2만 받음)
replayLatestSink.asFlux()
.subscribe(
data -> System.out.println("구독자 1 (Replay Latest): " + data),
error -> System.err.println("구독자 1 (Replay Latest) 에러: " + error.getMessage()),
() -> System.out.println("구독자 1 (Replay Latest) 완료")
);
Thread.sleep(50);
// 추가 데이터 발행
replayLatestSink.tryEmitNext("데이터 H");
replayLatestSink.tryEmitNext("데이터 I"); // 이 시점의 마지막 데이터
Thread.sleep(50);
// 두 번째 구독자 연결 (데이터 H, I를 받음)
replayLatestSink.asFlux()
.subscribe(
data -> System.out.println("구독자 2 (Replay Latest): " + data),
error -> System.err.println("구독자 2 (Replay Latest) 에러: " + error.getMessage()),
() -> System.out.println("구독자 2 (Replay Latest) 완료")
);
Thread.sleep(100);
replayLatestSink.tryEmitComplete(); // 스트림 완료
System.out.println("Replay Latest Sink 완료.");
Thread.sleep(100);
}
@Test
void sinksManyReplayAll() throws InterruptedException {
System.out.println("\n--- Replay Sink Example (all) ---");
// Replay Sink (all): 모든 과거 데이터 재발행
Sinks.Many<String> replayAllSink = Sinks.many().replay().all();
// 데이터 발행 (구독자 연결 전)
replayAllSink.tryEmitNext("모든 초기 데이터 X");
replayAllSink.tryEmitNext("모든 초기 데이터 Y");
// 첫 번째 구독자 연결 (X, Y 모두 받음)
replayAllSink.asFlux()
.subscribe(
data -> System.out.println("구독자 1 (Replay All): " + data),
error -> System.err.println("구독자 1 (Replay All) 에러: " + error.getMessage()),
() -> System.out.println("구독자 1 (Replay All) 완료")
);
Thread.sleep(50);
// 추가 데이터 발행
replayAllSink.tryEmitNext("모든 데이터 Z");
Thread.sleep(50);
// 두 번째 구독자 연결 (X, Y, Z 모두 받음)
replayAllSink.asFlux()
.subscribe(
data -> System.out.println("구독자 2 (Replay All): " + data),
error -> System.err.println("구독자 2 (Replay All) 에러: " + error.getMessage()),
() -> System.out.println("구독자 2 (Replay All) 완료")
);
Thread.sleep(100);
replayAllSink.tryEmitComplete(); // 스트림 완료
System.out.println("Replay All Sink 완료.");
Thread.sleep(100);
}
}
Sinks.Many를 사용하여 외부 이벤트를 리액티브 스트림으로 변환하는 예제이다.
package com.webfluxpractice;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitResult;
@SpringBootTest
public class SinkTest {
@Test
void sink() throws InterruptedException {
// 1. Sinks.Many 생성 (여러 구독자를 허용하는 Multicast Sink)
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
// 2. Sink에서 Flux를 얻어 구독자 연결
// 이 Flux는 sink를 통해 데이터가 발행될 때마다 이를 수신합니다.
sink.asFlux()
.subscribe(
data -> System.out.println("구독자 1: " + data),
error -> System.err.println("구독자 1 에러: " + error.getMessage()),
() -> System.out.println("구독자 1 완료")
);
sink.asFlux()
.subscribe(
data -> System.out.println("구독자 2: " + data),
error -> System.err.println("구독자 2 에러: " + error.getMessage()),
() -> System.out.println("구독자 2 완료")
);
// 3. 외부 이벤트(가상)를 Sink를 통해 발행
System.out.println("데이터 발행 시작...");
EmitResult result1 = sink.tryEmitNext("Hello"); // 데이터 발행 시도
System.out.println("Emit 'Hello' result: " + result1);
Thread.sleep(100); // 비동기 처리를 위한 짧은 대기
EmitResult result2 = sink.tryEmitNext("World");
System.out.println("Emit 'World' result: " + result2);
Thread.sleep(100);
// 4. 모든 데이터 발행이 끝났음을 알리고 스트림 완료
EmitResult completeResult = sink.tryEmitComplete();
System.out.println("Emit complete result: " + completeResult);
// 완료된 Sink에 데이터를 다시 발행하면 실패합니다.
EmitResult failResult = sink.tryEmitNext("This will fail");
System.out.println("Emit 'This will fail' result: " + failResult); // EXPECTED: FAIL_TERMINATED
Thread.sleep(100); // 모든 구독자의 완료 메시지 출력을 위한 대기
}
}