Reactor 발행자와 구독자는 쓰레드를 생성하지 않는다. 그러나 이전 글에서 확인했듯이, 이러한 행동을 변경할 수 있는 연산자가 존재한다. 이전 글에서는 delay 연산자가 Reactor chain의 메인 쓰레드가 아닌 다른 곳으로 실행을 바꾸는 것을 보았다. 그러나 실행을 변경하기 위해서 delay 나 timeout을 사용하지는 않는다.
Reactor는 체인 실행 변경을 하기 위한 목적으로 publishOn
과 subscribeOn
연산자를 제공한다. 두 연산자 모두 반응형 chain의 실행 흐름을 변경할 수 있다.
publishOn
연산자는 실행 chain에 설정된 포인트에 기반하여 발행자로 부터 이벤트를 가로채서 나머지 체인의 다른 scheduler로 보낸다. 결과적으로, 반응형 chain 의 downstream 쓰레드 context 를 변경한다.
@Test
void publishOn() throws InterruptedException {
Flux<Long> fibonacciGenerator = Flux.generate(
() -> Tuples.<Long, Long>of(0L, 1L),
(state, sink) -> {
if(state.getT1() < 0) sink.complete();
else sink.next(state.getT1());
print("Generating next of " + state.getT2());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator
.publishOn(Schedulers.single())
.filter(x -> {
return x < 50;
})
.doOnNext(x -> print("Next value is " + x))
.doFinally(x -> print("Closing "))
.subscribe(x -> print("Sub received : " + x));
Thread.sleep(500);
}
Thread.sleep
으로 메인 쓰레드를 정지시켰다.single-1
쓰레드로 넘기는 것을 확인할 수 있다.[Test worker] Generating next of 1
[single-1] Executing Filter
[single-1] Next Value is 0
[Test worker] Generating next of 1
[Test worker] Generating next of 2
[Test worker] Generating next of 3
[Test worker] Generating next of 5
[Test worker] Generating next of 8
[single-1] Sub received : 0
[Test worker] Generating next of 13
[single-1] Executing Filter
[Test worker] Generating next of 21
[single-1] Next Value is 1
[Test worker] Generating next of 34
[single-1] Sub received : 1
subscribeOn
연산자는 execution chain에서 이벤트를 가로채서 complete chain 을 위해 다른 스케줄러로 보낸다. 여기서 중요한 것은 해당 연산자가 complete chain 을 위하여 실행 컨텍스트르 바꾼다는 것이다. 이는 downstream chain의 실행만 바꾸는 publishOn 연산자와 차이가 있다.
@Test
void subscribeOn() throws InterruptedException {
Flux<Long> fibonacciGenerator = Flux.generate(
() -> Tuples.<Long, Long> of(0L, 1L),
(state, sink) -> {
if(state.getT1() < 0) sink.complete();
else sink.next(state.getT1());
print("Generating next of " + state.getT2());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator
.filter(x -> {
print("Executing Filter");
return x < 100;
})
.doOnNext(x -> print("Next value is " + x))
.doFinally(x -> print("Closing "))
.subscribeOn(Schedulers.single())
.subscribe(x -> print("Sub received : " + x));
Thread.sleep(500);
}
[single-1] Executing Filter
[single-1] Next value is 0
[single-1] Sub received : 0
[single-1] Generating next of 1
[single-1] Executing Filter
[single-1] Next value is 1
[single-1] Sub received : 1
[single-1] Generating next of 1
[single-1] Executing Filter
[single-1] Next value is 1
[single-1] Sub received : 1
[single-1] Generating next of 2
[single-1] Executing Filter
[single-1] Next value is 2
[single-1] Sub received : 2
[single-1] Generating next of 3
[single-1] Executing Filter
[single-1] Next value is 3
[single-1] Sub received : 3
[single-1] Generating next of 5
[single-1] Executing Filter
[single-1] Next value is 5
[single-1] Sub received : 5
...
[single-1] Generating next of 2880067194370816120
[single-1] Executing Filter
[single-1] Generating next of 4660046610375530309
[single-1] Executing Filter
[single-1] Generating next of 7540113804746346429
[single-1] Executing Filter
[single-1] Generating next of -6246583658587674878
[single-1] Closing
[single-1] Generating next of 1293530146158671551
같은 chain에 대하여 subscribeOn
과 publishOn
을 같이 설정할 수 있다.
subscribeOn
연산자는 완전한 reactive chain 을 설정된 스케줄러에서 실행한다.publishOn
연산자는 downstream chain을 바꾸어 명시된 스케줄러에서 처리하도록 한다.@Test
void publishOnAndSubscribeOn() throws InterruptedException {
fibonacciGenerator
.publishOn(Schedulers.parallel())
.filter(x -> {
print("Executing Filter");
return x < 100;
})
.doOnNext(x -> print("Next Value is " + x))
.doFinally(x -> print("Closing "))
.subscribeOn(Schedulers.single())
.subscribe(x -> print("Sub received : " + x));
Thread.sleep(500);
}
single-1
쓰레드에서 생성한다. 나머지 체인은 publishOn 메서드에서 설정한 parallel
쓰레드에서 실행된다.[single-1] Generating next of 1
[single-1] Generating next of 1
[single-1] Generating next of 2
[single-1] Generating next of 3
[parallel-1] Executing Filter
[single-1] Generating next of 5
[parallel-1] Next Value is 0
[single-1] Generating next of 8
[single-1] Generating next of 13
[parallel-1] Sub received : 0
[parallel-1] Executing Filter
[parallel-1] Next Value is 1
[single-1] Generating next of 21
[parallel-1] Sub received : 1
[parallel-1] Executing Filter
[single-1] Generating next of 34
[parallel-1] Next Value is 1
[parallel-1] Sub received : 1
[parallel-1] Executing Filter
[parallel-1] Next Value is 2
[parallel-1] Sub received : 2
[parallel-1] Executing Filter
...
[single-1] Generating next of 1779979416004714189
[single-1] Generating next of 2880067194370816120
[single-1] Generating next of 4660046610375530309
[parallel-1] Executing Filter
[parallel-1] Executing Filter
[parallel-1] Executing Filter
[single-1] Generating next of 7540113804746346429
[single-1] Generating next of -6246583658587674878
[single-1] Generating next of 1293530146158671551
[parallel-1] Executing Filter
[parallel-1] Closing
Reactor는 ParallelFlux
를 제공하여 스트림은 round-robin 방식으로 다수의 스트림으로 분할하는 기능을 제공합니다. ParallelFlux
는 Flux 에 parallel 연산자를 사용하여 만들 수 있습니다.
기본적으로, 스트림은 현재 가용 가능한 CPU 코어의 수 만큼 나누어집니다. ParallelFlux
는 스트림을 분할할 뿐 실행 모델을 변경하지 않습니다. 분할된 스트림은 runOn
연산자를 사용하여 parallel로 처리할 수 있습니다. 이는 publishOn
연산자와 비슷하게 downstream 에 명시된 스케줄러에서 동작을 수행합니다.
sequential()
연산자를 이용하여 다시 Flux로 변환할 수 있습니다. 이렇게 변환하면 다시 doFinally Hook 을 사용할 수 있습니다.@Test
void parallel() throws InterruptedException {
fibonacciGenerator
.parallel()
.runOn(Schedulers.parallel())
.filter(x -> {
print("Executing Filter");
return x < 100;
})
.doOnNext(x -> print("Next value is " + x))
.sequential()
.doFinally(x -> print("Closing "))
.subscribeOn(Schedulers.single())
.subscribe(x -> print("sub received : " + x));
Thread.sleep(500);
}
[single-1] Generating next of 1
[single-1] Generating next of 1
[parallel-2] Executing Filter
[parallel-1] Executing Filter
[parallel-2] Next value is 1
[parallel-1] Next value is 0
[parallel-3] Executing Filter
[parallel-3] Next value is 1
[single-1] Generating next of 2
[parallel-2] Sub received : 1
[single-1] Generating next of 3
[parallel-4] Executing Filter
[single-1] Generating next of 5
[parallel-4] Next value is 2
[single-1] Generating next of 8
[parallel-2] Sub received : 0
[parallel-2] Sub received : 1
[parallel-7] Executing Filter
[parallel-7] Next value is 8
[single-1] Generating next of 13
[parallel-6] Executing Filter
[parallel-8] Executing Filter
[parallel-5] Executing Filter
[parallel-5] Next value is 3
[parallel-6] Next value is 5
[single-1] Generating next of 21
[single-1] Generating next of 34
[parallel-2] Sub received : 2
[parallel-2] Sub received : 3
[single-1] Generating next of 55
...
[single-1] Generating next of 1779979416004714189
[single-1] Generating next of 2880067194370816120
[single-1] Generating next of 4660046610375530309
[parallel-3] Executing Filter
[parallel-2] Executing Filter
[parallel-4] Executing Filter
[single-1] Generating next of 7540113804746346429
[single-1] Generating next of -6246583658587674878
[parallel-5] Executing Filter
[single-1] Generating next of 1293530146158671551
[parallel-5] Closing