Reactor에 대하여 학습하면서 Filtering, Transforming, Collecting 등의 작업을 살펴보았다. 여기서 살펴본 대부분의 작업은 추가적인 쓰레드를 사용하지 않고 Main에서 동작한다. 그러나, Reactor에서는 Schedulers를 사용하여 멀티 쓰레딩과 Concurrency를 설정할 수 있다.
Reactor는 모든 연산자를 하나의 Schedulers를 사용하여 동작시킨다. Reactor의 Scheduler는 java.util.concurrent
API에 속해있지 않다. Java의 concurrent API 제법 low-level이다.
반면에, Reactor 체인의 모든 작업은 Reactor engine에 의해 수행된다. 결론적으로 우리는 작업을 관리하는데 low-level의 API를 필요로 하지 않는다. 대신에 Reactor는 Scheduler를 조절하고 chain의 동작을 변형시킬 수 있는 선언적인 모델을 제공한다.
이를 확인할 수 있는 코드를 먼저 확인해보자.
static void print(String text) {
System.out.println("[" + Thread.currentThread().getName() + "] " + text);
}
@Test
void singleThread() {
Flux<Long> fibonacciGenerator = Flux.generate(
() -> Tuples.<Long, Long> og(0L, 1L),
(state, sink) -> {
if(state.getT1() < 0) sink.complete();
else sink.next(state.getT1());
print("Generating next of " + state.getT2());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator
.filter(x -> {
print("Executing Filter");
return x < 100;
})
.doOnNext(x -> print("Next value is " + x))
.doFinally(x -> print("Closing "))
.subscribe(x -> print("sub received : " + x));
}
[Test worker] Executing Filter
[Test worker] Next value is 0
[Test worker] sub received : 0
[Test worker] Generating next of 1
[Test worker] Executing Filter
[Test worker] Next value is 1
[Test worker] sub received : 1
[Test worker] Generating next of 1
[Test worker] Executing Filter
[Test worker] Next value is 1
[Test worker] sub received : 1
[Test worker] Generating next of 2
[Test worker] Executing Filter
[Test worker] Next value is 2
[Test worker] sub received : 2
...
[Test worker] Generating next of 4660046610375530309
[Test worker] Executing Filter
[Test worker] Generating next of 7540113804746346429
[Test worker] Executing Filter
[Test worker] Generating next of -6246583658587674878
[Test worker] Closing
[Test worker] Generating next of 1293530146158671551
위에서 싱글 쓰레드로 동작하기 때문에 Thread.sleep
이나 latch.await
를 사용해 쓰레드를 정지시키지 않았다. 그러나 위에서 확인한 내용을 부분적으로만 맞는 내용이다. Reactor에서 chain 실행 동작을 변경하는 연산자도 있다.
이벤트를 발행하는데 지연을 위하여 Duration을 추가하였다.
@Test
void singleDelayThread() throws InterruptedExceptinon {
fibonacciGenerator
.filter(x -> {
print("Executing Filter");
return x < 100;
})
.delayElements(Duration.ZERO)
.doOnNext(x -> print("Next value is " + x))
.doFinally(x -> print("Closing "))
.subscribe(x -> print("Sub received : " + x));
Thread.sleep(500);
}
Test worker
가 아닌 다른 쓰레드에서 처리되는 것을 확인할 수 있다.delayElements
연산자가 2개의 쓰레드를 가진 쓰레드 풀을 추가하몄다. (parallel-1
, parallel-2
)[Test worker] Executing Filter
[Test worker] Generating next of 1
[Test worker] Executing Filter
[Test worker] Generating next of 1
[Test worker] Executing Filter
[Test worker] Generating next of 2
[Test worker] Executing Filter
[Test worker] Generating next of 3
[Test worker] Executing Filter
[Test worker] Generating next of 5
[Test worker] Executing Filter
[Test worker] Generating next of 8
[parallel-1] Next value is 0
[Test worker] Executing Filter
[Test worker] Generating next of 13
[Test worker] Executing Filter
[Test worker] Generating next of 21
[parallel-1] sub received : 0
[Test worker] Executing Filter
[Test worker] Generating next of 34
[Test worker] Executing Filter
...
[Test worker] Generating next of 987
[Test worker] Executing Filter
[parallel-2] Next value is 1
[parallel-2] sub received : 1
[Test worker] Generating next of 1597
[Test worker] Executing Filter
[Test worker] Generating next of 2584
[Test worker] Executing Filter
[Test worker] Generating next of 4181
[Test worker] Executing Filter
[Test worker] Generating next of 6765
[Test worker] Executing Filter
[Test worker] Generating next of 10946
[Test worker] Executing Filter
[parallel-3] Next value is 1
[parallel-3] sub received : 1
[Test worker] Generating next of 1293530146158671551
위에서 확인했듯, Reactor 연산자는 chain의 동작을 조절한다. 그러나, 그러한 동작은 다른 scheduler
를 사용해서 변경할 수 있다. 대부분의 연산자는 overload메서드를 가지며 이는 scheduler
를 인자로 받을 수 있다.
이번에는 Reactor에서 사용할 수 있는 다양한 schedulers를 살펴본다: Reactor는 Schedulers
라는 유틸리티 클래스를 제공한다.
Schedulers.immediate
스케줄러는 현재 실행중인 쓰레드에서 동작을 수해야한다. 모든 작업은 호출한 쓰레드에서 동작한다. 어떠한 작업도 평행선상에서 실행되지 않는다.
@Test
void immediateScheduler() throws InterruptedException {
Flux<Long> fibonacciGenerator = Flux.generate(
() -> Tuples.<Long, Long> 0f(0L, 1L),
(state, sink) -> {
if(state.getT1() < 0) sink.complete();
else sink.next(state.getT1());
print("Generating next of " + state.getT2());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator
.delayElements(Duration.ofNanos(10), Schedulers.immediate())
.doOnNext(x -> print("Next value is " + x))
.doFinally(x -> print("Closing " ))
.subscribe(x -> print("Sub received : " + x));
Thread.sleep(500);
}
time-based scheduling capacity
이기 때문이다.[ERROR] (Test worker) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable
Caused by: reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable
Scheduler.single
은 single-worker
쓰레드 풀에서 작업을 수행한다. 싱글 쓰레드에서 동작하기 때문에 모든 작업은 순차적으로 실행되며 어떠한 작업도 동시에(concurrent) 수행되지 않는다.
@Test
void singleScheduler() throws InterruptedException {
fibonacciGenerator
.delayElements(Duration.ofNanos(10), Schedulers.single())
.doOnNext(x -> print("Next value is " + x))
.doFinally(x -> print("Closing " ))
.subscribe(x -> print("Sub received : " + x));
Thread.sleep(500);
}
[Test worker] Generating next of 1
[Test worker] Generating next of 1
[Test worker] Generating next of 2
[Test worker] Generating next of 3
[Test worker] Generating next of 5
[Test worker] Generating next of 8
[Test worker] Generating next of 13
[Test worker] Generating next of 21
[Test worker] Generating next of 34
[Test worker] Generating next of 55
[single-1] Next value is 0
[Test worker] Generating next of 89
[single-1] Sub received : 0
[Test worker] Generating next of 144
[Test worker] Generating next of 233
[Test worker] Generating next of 377
[Test worker] Generating next of 610
[Test worker] Generating next of 987
[Test worker] Generating next of 1597
[single-1] Next value is 1
[single-1] Sub received : 1
[Test worker] Generating next of 2584
[single-1] Next value is 1
[Test worker] Generating next of 4181
[single-1] Sub received : 1
[Test worker] Generating next of 6765
...
[single-1] Next value is 1779979416004714189
[single-1] Sub received : 1779979416004714189
[single-1] Next value is 2880067194370816120
[single-1] Sub received : 2880067194370816120
[single-1] Next value is 4660046610375530309
[single-1] Sub received : 4660046610375530309
[single-1] Next value is 7540113804746346429
[single-1] Sub received : 7540113804746346429
[single-1] Closing
Single Scheduler는 논블로킹, 연산 집약적인 연산자 역할로 만들어졌다. 이는 논블로킹 작업을 내부의 queue에서 실행하는 이벤트 루프처럼 다룰 수 있다. 만약, 내부에서 blocking API 를 호출하면 에러를 발생시킨다.
@Test
void singleScheduler2() throws InterruptedException {
fibonacciGenerator
.filter(x -> {
print("Executing Filter");
return x < 100;
})
.delayElements(Duration.ZERO, Schedulers.single())
.window(10) //사이즈에 맞는 버퍼를 생성한다.
.doOnNext(x -> print("Next Value is " + x))
.doFinally(x -> print("Closing " + x))
.subscribe(x -> print("Sub received : " + x.blockFirst());
Thread.sleep(500);
}
[Test worker] Executing Filter
[Test worker] Generating next of 1
[Test worker] Executing Filter
[Test worker] Generating next of 1
[Test worker] Executing Filter
[Test worker] Generating next of 2
[Test worker] Executing Filter
...
[Test worker] Generating next of 5527939700884757
[Test worker] Executing Filter
[ERROR] (single-1) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread single-1
[Test worker] Generating next of 8944394323791464
[Test worker] Executing Filter
[Test worker] Generating next of 14472334024676221
[Test worker] Executing Filter
[Test worker] Generating next of 23416728348467685
[Test worker] Executing Filter
[Test worker] Generating next of 37889062373143906
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread single-1
[Test worker] Executing Filter
[Test worker] Generating next of 61305790721611591
Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread single-1
[Test worker] Executing Filter
[Test worker] Generating next of 99194853094755497
Schedulers.parallel
은 multiple-worker
쓰레드 풀에서 동작을 수행합니다. 이는 가용 가능한 processor를 기반으로 worker를 생성합니다. 다양한 Reactor 연산자에서 기본값으로 사용됩니다.
@Test
void parallelScheduler() throws InterruptedException {
fibonacciGenerator
.delayElements(Duration.ofNanos(10), Schedulers.parallel())
.doOnNext(x -> print("Next value is " + x)
.doFinally(x -> print("Closing "))
.subscribe(x -> print("Sub received : "+ x));
Thread.sleep(500);
}
[Test worker] Generating next of 1
[Test worker] Generating next of 1
[Test worker] Generating next of 2
[parallel-1] Next value is 0
[Test worker] Generating next of 3
[parallel-1] Sub received : 0
[Test worker] Generating next of 5
...
[Test worker] Generating next of 832040
[parallel-3] Next value is 1
[parallel-3] Sub received : 1
[Test worker] Generating next of 1346269
[Test worker] Generating next of 2178309
[parallel-4] Next value is 2
[parallel-4] Sub received : 2
[parallel-5] Next value is 3
...
[parallel-4] Next value is 4660046610375530309
[parallel-4] Sub received : 4660046610375530309
[parallel-5] Next value is 7540113804746346429
[parallel-5] Sub received : 7540113804746346429
[parallel-5] Closing
ExecutorService
기반의 Worker를 생성하여 사용이 종료되면 재사용합니다. worker가 1분 이상 유휴상태이면 제거됩니다.
@Test
void boundedElasticScheduler() throws InterrupedException {
fibonacciGenerator
.filter(x -> {
print("Executing Filter");
return x < 100;
})
.delayElements(Duration.ZERO, Schedulers.boundedElastic())
.window(10)
.doOnNext(x -> print("Next value is " + x))
.doFinally(x -> print("Closing " + x))
.subscribe(x -> print("Sub received : " + x.blockingFirst()));
Thread.sleep(500);
}
Schedulers.formExecutor
Java ExecutorService를 이용하여 scheduler를 생성한다. scheduler가 쓰레드 생성을 점유하지 않고, ExecutorService가 통제한다. scheduler는 다른 scheduler가 개입할 수 없기 때문에 개발자가 반드시 ExecutorService 를 사용해 생명주기를 관리해야 한다.
@Test
void executorServiceScheduler() throws InterruptedException {
ExecutorService executor = Executors.newStinleThreadExecutor();
fibonacciGenerator
.filter(x -> {
print("Executing Filter");
return x < 100;
})
.delayElements(Duration.ZERO, Schedulers.fromExecutor(executor))
.doOnNext(x -> print("Next value is + x))
.doFinally(x -> print("Closing " + executor.isShutdown()))
.subscribe(x -> print("Sub received : "+ x));
Thread.sleep(5000);
//executor.shutdownNow());
print("Is shutdown ? " + executor.isShutdown());
}
[Test worker] Executing Filter
[pool-1-thread-1] Next value is 89
[pool-1-thread-1] Sub received : 89
...
[Test worker] Generating next of 7540113804746346429
[Test worker] Executing Filter
[Test worker] Generating next of -6246583658587674878
[Test worker] Closing false
[Test worker] Generating next of 1293530146158671551
[Test worker] Is shutdown ? false