Reactor: Execution Control 1 - Scheduler

xellos·2022년 5월 28일
0

JAVA-Reactor

목록 보기
9/11

소개

Reactor에 대하여 학습하면서 Filtering, Transforming, Collecting 등의 작업을 살펴보았다. 여기서 살펴본 대부분의 작업은 추가적인 쓰레드를 사용하지 않고 Main에서 동작한다. 그러나, Reactor에서는 Schedulers를 사용하여 멀티 쓰레딩과 Concurrency를 설정할 수 있다.

Scheduler

Reactor는 모든 연산자를 하나의 Schedulers를 사용하여 동작시킨다. Reactor의 Scheduler는 java.util.concurrent API에 속해있지 않다. Java의 concurrent API 제법 low-level이다.
반면에, Reactor 체인의 모든 작업은 Reactor engine에 의해 수행된다. 결론적으로 우리는 작업을 관리하는데 low-level의 API를 필요로 하지 않는다. 대신에 Reactor는 Scheduler를 조절하고 chain의 동작을 변형시킬 수 있는 선언적인 모델을 제공한다.

이를 확인할 수 있는 코드를 먼저 확인해보자.

1) Reactor 싱글 쓰레드

검증

static void print(String text) {
	System.out.println("[" + Thread.currentThread().getName() + "] " + text);
}

@Test
void singleThread() {
	Flux<Long> fibonacciGenerator = Flux.generate(
    	() -> Tuples.<Long, Long> og(0L, 1L),
        (state, sink) -> {
        	if(state.getT1() < 0) sink.complete();
            else sink.next(state.getT1());
            print("Generating next of " + state.getT2());
            return Tuples.of(state.getT2(), state.getT1() + state.getT2());
        });
        
    fibonacciGenerator
    	.filter(x -> {
        	print("Executing Filter");
            return x < 100;
        })
        .doOnNext(x -> print("Next value is " + x))
        .doFinally(x -> print("Closing "))
        .subscribe(x -> print("sub received : " + x));
}
  • 결과
[Test worker] Executing Filter
[Test worker] Next value is 0
[Test worker] sub received : 0
[Test worker] Generating next of 1
[Test worker] Executing Filter
[Test worker] Next value is 1
[Test worker] sub received : 1
[Test worker] Generating next of 1
[Test worker] Executing Filter
[Test worker] Next value is 1
[Test worker] sub received : 1
[Test worker] Generating next of 2
[Test worker] Executing Filter
[Test worker] Next value is 2
[Test worker] sub received : 2
...
[Test worker] Generating next of 4660046610375530309
[Test worker] Executing Filter
[Test worker] Generating next of 7540113804746346429
[Test worker] Executing Filter
[Test worker] Generating next of -6246583658587674878
[Test worker] Closing 
[Test worker] Generating next of 1293530146158671551

위에서 싱글 쓰레드로 동작하기 때문에 Thread.sleep 이나 latch.await 를 사용해 쓰레드를 정지시키지 않았다. 그러나 위에서 확인한 내용을 부분적으로만 맞는 내용이다. Reactor에서 chain 실행 동작을 변경하는 연산자도 있다.


예외적인 경우 검증

이벤트를 발행하는데 지연을 위하여 Duration을 추가하였다.

@Test 
void singleDelayThread() throws InterruptedExceptinon {
	fibonacciGenerator
    	.filter(x -> {
        	print("Executing Filter");
            return x < 100;
        })
		.delayElements(Duration.ZERO)
        .doOnNext(x -> print("Next value is " + x))
        .doFinally(x -> print("Closing "))
        .subscribe(x -> print("Sub received : " + x));
        
    Thread.sleep(500);
}
  • 결과
    doOnNext 이벤트 콜백과, 발행된 에빈트를 처리하는 부분이 Test worker 가 아닌 다른 쓰레드에서 처리되는 것을 확인할 수 있다.
    delayElements 연산자가 2개의 쓰레드를 가진 쓰레드 풀을 추가하몄다. (parallel-1, parallel-2)
[Test worker] Executing Filter
[Test worker] Generating next of 1
[Test worker] Executing Filter
[Test worker] Generating next of 1
[Test worker] Executing Filter
[Test worker] Generating next of 2
[Test worker] Executing Filter
[Test worker] Generating next of 3
[Test worker] Executing Filter
[Test worker] Generating next of 5
[Test worker] Executing Filter
[Test worker] Generating next of 8
[parallel-1] Next value is 0
[Test worker] Executing Filter
[Test worker] Generating next of 13
[Test worker] Executing Filter
[Test worker] Generating next of 21
[parallel-1] sub received : 0
[Test worker] Executing Filter
[Test worker] Generating next of 34
[Test worker] Executing Filter
...
[Test worker] Generating next of 987
[Test worker] Executing Filter
[parallel-2] Next value is 1
[parallel-2] sub received : 1
[Test worker] Generating next of 1597
[Test worker] Executing Filter
[Test worker] Generating next of 2584
[Test worker] Executing Filter
[Test worker] Generating next of 4181
[Test worker] Executing Filter
[Test worker] Generating next of 6765
[Test worker] Executing Filter
[Test worker] Generating next of 10946
[Test worker] Executing Filter
[parallel-3] Next value is 1
[parallel-3] sub received : 1

[Test worker] Generating next of 1293530146158671551

2) Reactor Schedulers

위에서 확인했듯, Reactor 연산자는 chain의 동작을 조절한다. 그러나, 그러한 동작은 다른 scheduler를 사용해서 변경할 수 있다. 대부분의 연산자는 overload메서드를 가지며 이는 scheduler를 인자로 받을 수 있다.
이번에는 Reactor에서 사용할 수 있는 다양한 schedulers를 살펴본다: Reactor는 Schedulers 라는 유틸리티 클래스를 제공한다.

Immediate scheduler

Schedulers.immediate 스케줄러는 현재 실행중인 쓰레드에서 동작을 수해야한다. 모든 작업은 호출한 쓰레드에서 동작한다. 어떠한 작업도 평행선상에서 실행되지 않는다.

  • Reactor 작업 모델의 Default 값이다.
@Test
void immediateScheduler() throws InterruptedException {
	Flux<Long> fibonacciGenerator = Flux.generate(
    	() -> Tuples.<Long, Long> 0f(0L, 1L),
        (state, sink) -> {
        	if(state.getT1() < 0) sink.complete();
            else sink.next(state.getT1());
            print("Generating next of " + state.getT2());
            return Tuples.of(state.getT2(), state.getT1() + state.getT2());
        });
        
    fibonacciGenerator
    	.delayElements(Duration.ofNanos(10), Schedulers.immediate())
        .doOnNext(x -> print("Next value is " + x))
        .doFinally(x -> print("Closing " ))
        .subscribe(x -> print("Sub received : " + x));
        
    Thread.sleep(500);
}
  • 결과
    chain에 delayElements를 추가했기 때문에, 테스트를 delay를 main 쓰레드에서 동작시키려고 하기 때문에 에러가 발생한다. → Main 쓰레드는 time-based scheduling capacity이기 때문이다.
[ERROR] (Test worker) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable
Caused by: reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable

Single Scheduler

Scheduler.singlesingle-worker쓰레드 풀에서 작업을 수행한다. 싱글 쓰레드에서 동작하기 때문에 모든 작업은 순차적으로 실행되며 어떠한 작업도 동시에(concurrent) 수행되지 않는다.

  • 특정한 on-threadsafe한 작업업을 쓰레드 하나에서 고립시켜서 동작시키는데 유용하다.
@Test
void singleScheduler() throws InterruptedException {
	fibonacciGenerator
    	.delayElements(Duration.ofNanos(10), Schedulers.single())
        .doOnNext(x -> print("Next value is " + x))
        .doFinally(x -> print("Closing " ))
        .subscribe(x -> print("Sub received : " + x));
        
    Thread.sleep(500);
}
  • 결과
    chain 에 delayElement를 추가했다. 아래의 결과를 보면 single 쓰레드에 delay를 관리하려도 하는 것을 볼 수 있다.
[Test worker] Generating next of 1
[Test worker] Generating next of 1
[Test worker] Generating next of 2
[Test worker] Generating next of 3
[Test worker] Generating next of 5
[Test worker] Generating next of 8
[Test worker] Generating next of 13
[Test worker] Generating next of 21
[Test worker] Generating next of 34
[Test worker] Generating next of 55
[single-1] Next value is 0
[Test worker] Generating next of 89
[single-1] Sub received : 0
[Test worker] Generating next of 144
[Test worker] Generating next of 233
[Test worker] Generating next of 377
[Test worker] Generating next of 610
[Test worker] Generating next of 987
[Test worker] Generating next of 1597
[single-1] Next value is 1
[single-1] Sub received : 1
[Test worker] Generating next of 2584
[single-1] Next value is 1
[Test worker] Generating next of 4181
[single-1] Sub received : 1
[Test worker] Generating next of 6765
...
[single-1] Next value is 1779979416004714189
[single-1] Sub received : 1779979416004714189
[single-1] Next value is 2880067194370816120
[single-1] Sub received : 2880067194370816120
[single-1] Next value is 4660046610375530309
[single-1] Sub received : 4660046610375530309
[single-1] Next value is 7540113804746346429
[single-1] Sub received : 7540113804746346429
[single-1] Closing 

Single Scheduler는 논블로킹, 연산 집약적인 연산자 역할로 만들어졌다. 이는 논블로킹 작업을 내부의 queue에서 실행하는 이벤트 루프처럼 다룰 수 있다. 만약, 내부에서 blocking API 를 호출하면 에러를 발생시킨다.

@Test
void singleScheduler2() throws InterruptedException {
	fibonacciGenerator
    	.filter(x -> {
        	print("Executing Filter");
            return x < 100;
        })
        .delayElements(Duration.ZERO, Schedulers.single())
        .window(10) //사이즈에 맞는 버퍼를 생성한다.
        .doOnNext(x -> print("Next Value is " + x))
        .doFinally(x -> print("Closing " + x))
        .subscribe(x -> print("Sub received : " + x.blockFirst());
        
    Thread.sleep(500);
}
  • 결과
    위의 코드에서 blockFirst API 를 사용하여 첫 번째 이벤트로 돌아가도록 했기 때문에 에러가 발생한다.
[Test worker] Executing Filter
[Test worker] Generating next of 1
[Test worker] Executing Filter
[Test worker] Generating next of 1
[Test worker] Executing Filter
[Test worker] Generating next of 2
[Test worker] Executing Filter
...
[Test worker] Generating next of 5527939700884757
[Test worker] Executing Filter
[ERROR] (single-1) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread single-1
[Test worker] Generating next of 8944394323791464
[Test worker] Executing Filter
[Test worker] Generating next of 14472334024676221
[Test worker] Executing Filter
[Test worker] Generating next of 23416728348467685
[Test worker] Executing Filter
[Test worker] Generating next of 37889062373143906
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread single-1
[Test worker] Executing Filter
[Test worker] Generating next of 61305790721611591
Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread single-1
[Test worker] Executing Filter
[Test worker] Generating next of 99194853094755497

parallel scheduler

Schedulers.parallelmultiple-worker 쓰레드 풀에서 동작을 수행합니다. 이는 가용 가능한 processor를 기반으로 worker를 생성합니다. 다양한 Reactor 연산자에서 기본값으로 사용됩니다.

@Test
void parallelScheduler() throws InterruptedException {
	fibonacciGenerator
    	.delayElements(Duration.ofNanos(10), Schedulers.parallel())
        .doOnNext(x -> print("Next value is " + x)
        .doFinally(x -> print("Closing "))
        .subscribe(x -> print("Sub received : "+ x));
     
    Thread.sleep(500);
}
  • 결과
    Single Scheduler와 유사하게, parallel 역시 비동기 작업을 전제로 만들어졌습니다. 마찬가지로 blocking API 를 호출하면 예외가 발생합니다.
[Test worker] Generating next of 1
[Test worker] Generating next of 1
[Test worker] Generating next of 2
[parallel-1] Next value is 0
[Test worker] Generating next of 3
[parallel-1] Sub received : 0
[Test worker] Generating next of 5
...
[Test worker] Generating next of 832040
[parallel-3] Next value is 1
[parallel-3] Sub received : 1
[Test worker] Generating next of 1346269
[Test worker] Generating next of 2178309
[parallel-4] Next value is 2
[parallel-4] Sub received : 2
[parallel-5] Next value is 3
...
[parallel-4] Next value is 4660046610375530309
[parallel-4] Sub received : 4660046610375530309
[parallel-5] Next value is 7540113804746346429
[parallel-5] Sub received : 7540113804746346429
[parallel-5] Closing 

boundedElastic Scheduler

ExecutorService 기반의 Worker를 생성하여 사용이 종료되면 재사용합니다. worker가 1분 이상 유휴상태이면 제거됩니다.

  • 아래의 예제에서 확인할 수 있듯, 위의 schduler와 달리 blocking 작업을 호출해도 예외를 발생시키지 않습니다.
@Test
void boundedElasticScheduler() throws InterrupedException {
	fibonacciGenerator
    	.filter(x -> {
        	print("Executing Filter");
            return x < 100;
        })
        .delayElements(Duration.ZERO, Schedulers.boundedElastic())
        .window(10)
		.doOnNext(x -> print("Next value is " + x))
        .doFinally(x -> print("Closing " + x))
        .subscribe(x -> print("Sub received : " + x.blockingFirst()));
       
    Thread.sleep(500);
}

ExecutorService scheduler

Schedulers.formExecutor Java ExecutorService를 이용하여 scheduler를 생성한다. scheduler가 쓰레드 생성을 점유하지 않고, ExecutorService가 통제한다. scheduler는 다른 scheduler가 개입할 수 없기 때문에 개발자가 반드시 ExecutorService 를 사용해 생명주기를 관리해야 한다.

@Test
void executorServiceScheduler() throws InterruptedException {
	ExecutorService executor = Executors.newStinleThreadExecutor();
    
    fibonacciGenerator
    	.filter(x -> {
        	print("Executing Filter");
            return x < 100;
        })
        .delayElements(Duration.ZERO, Schedulers.fromExecutor(executor))
        .doOnNext(x -> print("Next value is + x))
        .doFinally(x -> print("Closing " + executor.isShutdown()))
        .subscribe(x -> print("Sub received : "+ x));
        
    Thread.sleep(5000);
    //executor.shutdownNow());
    print("Is shutdown ? " + executor.isShutdown());
}
  • 결과
[Test worker] Executing Filter
[pool-1-thread-1] Next value is 89
[pool-1-thread-1] Sub received : 89
...
[Test worker] Generating next of 7540113804746346429
[Test worker] Executing Filter
[Test worker] Generating next of -6246583658587674878
[Test worker] Closing false
[Test worker] Generating next of 1293530146158671551
[Test worker] Is shutdown ? false

0개의 댓글