Sinks는 Processor의 기능을 개선해 Reactor 3.5.0 버전부터 지원되었다.
Flux, Mono는 onNext와 같은 Signal을 내부적으로 전송해 주는 방식이었고, Sinks는 프로그래밍 코드를 통해 명시적으로 Signal을 전송할 수 있다.
public static void main(String[] args) throws InterruptedException {
int tasks = 6;
Flux
.create((FluxSink<String> sink) -> {
IntStream
.range(1, tasks)
.forEach(n -> sink.next(doTask(n)));
})
.subscribeOn(Schedulers.boundedElastic()) //작업 처리
.doOnNext(n -> log.info("# create(): {}", n))
.publishOn(Schedulers.parallel()) //result 가공
.map(result -> result + " success!")
.doOnNext(n -> log.info("# map(): {}", n))
.publishOn(Schedulers.parallel()) //subscriber에게 가공된 결과 전달
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
}
private static String doTask(int taskNumber) {
return "task " + taskNumber + " result";
}
스레드를 지정할 때 subscribeOn, publishOn 구분하는 이유가 뭘까..
위 코드에서 doTask() 메서드가 여러 개의 스레드에서 각각 다른 작업들을 처리한 후, 결과를 반환받으면 문제 발생할 수 있다. 따라서 Sinks를 사용할 수 있다.
Sinks를 이용해 프로그래밍 방식으로 signal을 전송할 수 있는 방법은 Sinks.One
, Sinks.Many
가 있다.
Sinks.One<String> sinkOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();
sinkOne.emitValue("Hello Reactor", FAIL_FAST);
sinkOne.emitValue("Hi Reactor", FAIL_FAST); //Drop
mono.subscribe(data -> log.info("# Subscriber1 {}", data));
mono.subscribe(data -> log.info("# Subscriber2 {}", data));
Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> fluxView = unicastSink.asFlux();
unicastSink.emitNext(1, FAIL_FAST);
unicastSink.emitNext(2, FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
unicastSink.emitNext(3, FAIL_FAST);
IllegalStateException
이 발생한다. Sinks.Many<Integer> multicastSink = Sinks.many().multicast().onBackpressureBuffer();
Flux<Integer> fluxView = multicastSink.asFlux();
multicastSink.emitNext(1, FAIL_FAST);
multicastSink.emitNext(2, FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
multicastSink.emitNext(3, FAIL_FAST);