리액티브 스트림즈의 Signal을 프로그래밍 방식으로 푸시할 수 있는 구조
- Publisher와 Subscriber의 기능을 모두 지닌 Processor의 향상된 기능을 제공
- Multi Thread 기반
- 스레드 안정성 보장
generate()
Operatorcreate()
Operatorpublic static void main(String[] args) throws InterruptedException {
int tasks = 6;
Flux
.create((FluxSink<String> sink) -> {
IntStream
.range(1, tasks)
.forEach(n -> sink.next(doTask(n)));
})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(n -> log.info("# create(): {}", n))
.publishOn(Schedulers.parallel())
.map(result -> result + " success!")
.doOnNext(n -> log.info("# map(): {}", n))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
}
private static String doTask(int taskNumber) {
// now tasking.
// complete to task.
return "task " + taskNumber + " result";
}
/*
15:59:53.213 [boundedElastic-1] INFO - # create(): task 1 result
15:59:53.215 [boundedElastic-1] INFO - # create(): task 2 result
15:59:53.215 [boundedElastic-1] INFO - # create(): task 3 result
15:59:53.215 [boundedElastic-1] INFO - # create(): task 4 result
15:59:53.216 [boundedElastic-1] INFO - # create(): task 5 result
15:59:53.219 [parallel-2] INFO - # map(): task 1 result success!
15:59:53.220 [parallel-2] INFO - # map(): task 2 result success!
15:59:53.220 [parallel-1] INFO - # onNext: task 1 result success!
15:59:53.220 [parallel-2] INFO - # map(): task 3 result success!
15:59:53.220 [parallel-1] INFO - # onNext: task 2 result success!
15:59:53.220 [parallel-2] INFO - # map(): task 4 result success!
15:59:53.220 [parallel-1] INFO - # onNext: task 3 result success!
15:59:53.220 [parallel-2] INFO - # map(): task 5 result success!
15:59:53.220 [parallel-1] INFO - # onNext: task 4 result success!
15:59:53.220 [parallel-1] INFO - # onNext: task 5 result success!
*/
public static void main(String[] args) throws InterruptedException {
int tasks = 6;
Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<String> fluxView = unicastSink.asFlux();
IntStream
.range(1, tasks)
.forEach(n -> {
try {
new Thread(() -> {
unicastSink.emitNext(doTask(n), Sinks.EmitFailureHandler.FAIL_FAST);
log.info("# emitted: {}", n);
}).start();
Thread.sleep(100L);
} catch (InterruptedException e) {
log.error(e.getMessage());
}
});
fluxView
.publishOn(Schedulers.parallel())
.map(result -> result + " success!")
.doOnNext(n -> log.info("# map(): {}", n))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(200L);
}
private static String doTask(int taskNumber) {
// now tasking.
// complete to task.
return "task " + taskNumber + " result";
}
/*
18:17:58.332 [Thread-0] INFO - # emitted: 1
18:17:58.431 [Thread-1] INFO - # emitted: 2
18:17:58.536 [Thread-2] INFO - # emitted: 3
18:17:58.641 [Thread-3] INFO - # emitted: 4
18:17:58.743 [Thread-4] INFO - # emitted: 5
18:17:58.909 [parallel-2] INFO - # map(): task 1 result success!
18:17:58.909 [parallel-2] INFO - # map(): task 2 result success!
18:17:58.909 [parallel-1] INFO - # onNext: task 1 result success!
18:17:58.909 [parallel-2] INFO - # map(): task 3 result success!
18:17:58.910 [parallel-2] INFO - # map(): task 4 result success!
18:17:58.910 [parallel-1] INFO - # onNext: task 2 result success!
18:17:58.910 [parallel-2] INFO - # map(): task 5 result success!
18:17:58.910 [parallel-1] INFO - # onNext: task 3 result success!
18:17:58.910 [parallel-1] INFO - # onNext: task 4 result success!
18:17:58.910 [parallel-1] INFO - # onNext: task 5 result success!
*/
What Is Thread-Safety and How to Achieve It? | Baeldung
Sinks.one()
emitValue()
EmitFailureHandler.FAIL_FAST
public interface EmitFailureHandler {
EmitFailureHandler FAIL_FAST = (signalType, emission) -> false;
boolean onEmitFailure(SignalType signalType, EmitResult emitResult);
}
asMono()
with Mono semantics
Sinks.One<String> sinkOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();
sinkOne.emitValue("Hello Reactor", FAIL_FAST);
sinkOne.emitValue("Hi Reactor", FAIL_FAST);
sinkOne.emitValue(null, FAIL_FAST);
mono.subscribe(data -> log.info("# Subscriber1 {}", data));
mono.subscribe(data -> log.info("# Subscriber2 {}", data));
/*
22:07:57.893 [main] DEBUG- onNextDropped: Hi Reactor
22:07:57.895 [main] INFO - # Subscriber1 Hello Reactor
22:07:57.896 [main] INFO - # Subscriber2 Hello Reactor
*/
Sinks.many()
ManySpec
을 리턴public interface ManySpec {
UnicastSpec unicast();
MulticastSpec multicast();
MulticastReplaySpec replay();
}
asFlux()
Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> fluxView = unicastSink.asFlux();
unicastSink.emitNext(1, FAIL_FAST);
unicastSink.emitNext(2, FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
unicastSink.emitNext(3, FAIL_FAST);
/* 주석 제거 시, IllegalStateException 발생
Caused by: java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber */
// fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
/*
22:20:11.465 [main] INFO - # Subscriber1: 1
22:20:11.466 [main] INFO - # Subscriber1: 2
22:20:11.466 [main] INFO - # Subscriber1: 3
*/
Sinks.Many<Integer> multicastSink =
Sinks.many().multicast().onBackpressureBuffer();
Flux<Integer> fluxView = multicastSink.asFlux();
multicastSink.emitNext(1, FAIL_FAST);
multicastSink.emitNext(2, FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
multicastSink.emitNext(3, FAIL_FAST);
/*
22:34:19.836 [main] INFO - # Subscriber1: 1
22:34:19.837 [main] INFO - # Subscriber1: 2
22:34:19.838 [main] INFO - # Subscriber1: 3
22:34:19.838 [main] INFO - # Subscriber2: 3
*/
replay
)Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2);
Flux<Integer> fluxView = replaySink.asFlux();
replaySink.emitNext(1, FAIL_FAST);
replaySink.emitNext(2, FAIL_FAST);
replaySink.emitNext(3, FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
replaySink.emitNext(4, FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
/*
22:37:18.104 [main] INFO - # Subscriber1: 2
22:37:18.105 [main] INFO - # Subscriber1: 3
22:37:18.105 [main] INFO - # Subscriber1: 4
22:37:18.105 [main] INFO - # Subscriber2: 3
22:37:18.105 [main] INFO - # Subscriber2: 4
*/