Reactor: Execution Control 2 - Parallel Processing

xellos·2022년 5월 29일
1

JAVA-Reactor

목록 보기
10/11

소개

Reactor 발행자와 구독자는 쓰레드를 생성하지 않는다. 그러나 이전 글에서 확인했듯이, 이러한 행동을 변경할 수 있는 연산자가 존재한다. 이전 글에서는 delay 연산자가 Reactor chain의 메인 쓰레드가 아닌 다른 곳으로 실행을 바꾸는 것을 보았다. 그러나 실행을 변경하기 위해서 delay 나 timeout을 사용하지는 않는다.

Reactor는 체인 실행 변경을 하기 위한 목적으로 publishOnsubscribeOn 연산자를 제공한다. 두 연산자 모두 반응형 chain의 실행 흐름을 변경할 수 있다.

1) PublishOn 연산자

publishOn 연산자는 실행 chain에 설정된 포인트에 기반하여 발행자로 부터 이벤트를 가로채서 나머지 체인의 다른 scheduler로 보낸다. 결과적으로, 반응형 chain 의 downstream 쓰레드 context 를 변경한다.

@Test
void publishOn() throws InterruptedException {
	Flux<Long> fibonacciGenerator = Flux.generate(
    	() -> Tuples.<Long, Long>of(0L, 1L),
        (state, sink) -> {
        	if(state.getT1() < 0) sink.complete();
            else sink.next(state.getT1());
            print("Generating next of " + state.getT2());
            return Tuples.of(state.getT2(), state.getT1() + state.getT2());
        });
        
    fibonacciGenerator
    	.publishOn(Schedulers.single())
        .filter(x -> {
        	return x < 50;
        })
        .doOnNext(x -> print("Next value is " + x))
        .doFinally(x -> print("Closing "))
        .subscribe(x -> print("Sub received : " + x));
    
    Thread.sleep(500);
}

  • 결과
    Main 쓰레드에서 테스트를 실행하지 않기 때문에 Thread.sleep 으로 메인 쓰레드를 정지시켰다.
    이벤트 자체는 Main 에서 생성하지만, 이후 처리를 single-1 쓰레드로 넘기는 것을 확인할 수 있다.
[Test worker] Generating next of 1
[single-1] Executing Filter
[single-1] Next Value is 0
[Test worker] Generating next of 1
[Test worker] Generating next of 2
[Test worker] Generating next of 3
[Test worker] Generating next of 5
[Test worker] Generating next of 8
[single-1] Sub received : 0
[Test worker] Generating next of 13
[single-1] Executing Filter
[Test worker] Generating next of 21
[single-1] Next Value is 1
[Test worker] Generating next of 34
[single-1] Sub received : 1

2) SubscribeOn 연산자

subscribeOn 연산자는 execution chain에서 이벤트를 가로채서 complete chain 을 위해 다른 스케줄러로 보낸다. 여기서 중요한 것은 해당 연산자가 complete chain 을 위하여 실행 컨텍스트르 바꾼다는 것이다. 이는 downstream chain의 실행만 바꾸는 publishOn 연산자와 차이가 있다.

@Test
void subscribeOn() throws InterruptedException {
	Flux<Long> fibonacciGenerator = Flux.generate(
    	() -> Tuples.<Long, Long> of(0L, 1L),
        (state, sink) -> {
        	if(state.getT1() < 0) sink.complete();
            else sink.next(state.getT1());
            print("Generating next of " + state.getT2());
            return Tuples.of(state.getT2(), state.getT1() + state.getT2());
        });
        
    fibonacciGenerator
    	.filter(x -> {
        	print("Executing Filter");
            return x < 100;
        })
        .doOnNext(x -> print("Next value is " + x))
        .doFinally(x -> print("Closing "))
        .subscribeOn(Schedulers.single())
        .subscribe(x -> print("Sub received : " + x));
        
    Thread.sleep(500);
}

  • 결과
    모든 연산이 subscribeOn 메서드에 설정한 single thread 에서 실행된 것을 확인할 수 있다.
[single-1] Executing Filter
[single-1] Next value is 0
[single-1] Sub received : 0
[single-1] Generating next of 1
[single-1] Executing Filter
[single-1] Next value is 1
[single-1] Sub received : 1
[single-1] Generating next of 1
[single-1] Executing Filter
[single-1] Next value is 1
[single-1] Sub received : 1
[single-1] Generating next of 2
[single-1] Executing Filter
[single-1] Next value is 2
[single-1] Sub received : 2
[single-1] Generating next of 3
[single-1] Executing Filter
[single-1] Next value is 3
[single-1] Sub received : 3
[single-1] Generating next of 5
[single-1] Executing Filter
[single-1] Next value is 5
[single-1] Sub received : 5
...
[single-1] Generating next of 2880067194370816120
[single-1] Executing Filter
[single-1] Generating next of 4660046610375530309
[single-1] Executing Filter
[single-1] Generating next of 7540113804746346429
[single-1] Executing Filter
[single-1] Generating next of -6246583658587674878
[single-1] Closing 
[single-1] Generating next of 1293530146158671551

3) PublishOn 연산자와 SubscribeOn 연산자 동시 설정

같은 chain에 대하여 subscribeOnpublishOn을 같이 설정할 수 있다.

  • subscribeOn 연산자는 완전한 reactive chain 을 설정된 스케줄러에서 실행한다.
  • 그러나 publishOn 연산자는 downstream chain을 바꾸어 명시된 스케줄러에서 처리하도록 한다.
@Test
void publishOnAndSubscribeOn() throws InterruptedException {
	fibonacciGenerator
    	.publishOn(Schedulers.parallel())
        .filter(x -> {
        	print("Executing Filter");
            return x < 100;
        })
        .doOnNext(x -> print("Next Value is " + x))
        .doFinally(x -> print("Closing "))
        .subscribeOn(Schedulers.single())
        .subscribe(x -> print("Sub received : " + x));
        
    Thread.sleep(500);
}

  • 결과
    이벤트는 subscribeOn 메서드에서 설정한 single-1 쓰레드에서 생성한다. 나머지 체인은 publishOn 메서드에서 설정한 parallel 쓰레드에서 실행된다.
[single-1] Generating next of 1
[single-1] Generating next of 1
[single-1] Generating next of 2
[single-1] Generating next of 3
[parallel-1] Executing Filter
[single-1] Generating next of 5
[parallel-1] Next Value is 0
[single-1] Generating next of 8
[single-1] Generating next of 13
[parallel-1] Sub received : 0
[parallel-1] Executing Filter
[parallel-1] Next Value is 1
[single-1] Generating next of 21
[parallel-1] Sub received : 1
[parallel-1] Executing Filter
[single-1] Generating next of 34
[parallel-1] Next Value is 1
[parallel-1] Sub received : 1
[parallel-1] Executing Filter
[parallel-1] Next Value is 2
[parallel-1] Sub received : 2
[parallel-1] Executing Filter
...
[single-1] Generating next of 1779979416004714189
[single-1] Generating next of 2880067194370816120
[single-1] Generating next of 4660046610375530309
[parallel-1] Executing Filter
[parallel-1] Executing Filter
[parallel-1] Executing Filter
[single-1] Generating next of 7540113804746346429
[single-1] Generating next of -6246583658587674878
[single-1] Generating next of 1293530146158671551
[parallel-1] Executing Filter
[parallel-1] Closing 

4) ParallelFlux

Reactor는 ParallelFlux 를 제공하여 스트림은 round-robin 방식으로 다수의 스트림으로 분할하는 기능을 제공합니다. ParallelFlux는 Flux 에 parallel 연산자를 사용하여 만들 수 있습니다.

기본적으로, 스트림은 현재 가용 가능한 CPU 코어의 수 만큼 나누어집니다. ParallelFlux는 스트림을 분할할 뿐 실행 모델을 변경하지 않습니다. 분할된 스트림은 runOn 연산자를 사용하여 parallel로 처리할 수 있습니다. 이는 publishOn 연산자와 비슷하게 downstream 에 명시된 스케줄러에서 동작을 수행합니다.

  • ParallelFlux는 doFinally 생명주기 Hook을 제공하지 않습니다.
  • sequential() 연산자를 이용하여 다시 Flux로 변환할 수 있습니다. 이렇게 변환하면 다시 doFinally Hook 을 사용할 수 있습니다.
@Test
void parallel() throws InterruptedException {
	fibonacciGenerator
    	.parallel()
        .runOn(Schedulers.parallel())
        .filter(x -> {
        	print("Executing Filter");
            return x < 100;
        })
        .doOnNext(x -> print("Next value is " + x))
        .sequential()
        .doFinally(x -> print("Closing "))
        .subscribeOn(Schedulers.single())
        .subscribe(x -> print("sub received : " + x));
        
    Thread.sleep(500);
}
  • 결과
[single-1] Generating next of 1
[single-1] Generating next of 1
[parallel-2] Executing Filter
[parallel-1] Executing Filter
[parallel-2] Next value is 1
[parallel-1] Next value is 0
[parallel-3] Executing Filter
[parallel-3] Next value is 1
[single-1] Generating next of 2
[parallel-2] Sub received : 1
[single-1] Generating next of 3
[parallel-4] Executing Filter
[single-1] Generating next of 5
[parallel-4] Next value is 2
[single-1] Generating next of 8
[parallel-2] Sub received : 0
[parallel-2] Sub received : 1
[parallel-7] Executing Filter
[parallel-7] Next value is 8
[single-1] Generating next of 13
[parallel-6] Executing Filter
[parallel-8] Executing Filter
[parallel-5] Executing Filter
[parallel-5] Next value is 3
[parallel-6] Next value is 5
[single-1] Generating next of 21
[single-1] Generating next of 34
[parallel-2] Sub received : 2
[parallel-2] Sub received : 3
[single-1] Generating next of 55
...
[single-1] Generating next of 1779979416004714189
[single-1] Generating next of 2880067194370816120
[single-1] Generating next of 4660046610375530309
[parallel-3] Executing Filter
[parallel-2] Executing Filter
[parallel-4] Executing Filter
[single-1] Generating next of 7540113804746346429
[single-1] Generating next of -6246583658587674878
[parallel-5] Executing Filter
[single-1] Generating next of 1293530146158671551
[parallel-5] Closing 

0개의 댓글