? 스케줄러 Scheduler란?
:쓰레드를 관리하는 관리자 역할을 함
:Reactor Sequence 상에서 처리되는 동작들을 하나 이상의 쓰레드에서 동작하도록 별도의 쓰레드를 제공해줌
:"Reactor의 Scheduler는 복잡한 멀티쓰레딩 프로세스를 단순하게 해줌"
:Reactor에서는 Scheduler를 위한 별도의 Operator 제공
:즉 적절한 상황에 맞는 쓰레드를 추가로 생성하는 Operator임
1 Scheduler를 추가하지 않은 Sequence의 쓰레드 확인
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
/**
* Scheduler를 추가하지 않을 경우
*/
@Slf4j
public class SchedulersExample01 {
public static void main(String[] args) {
Flux
.range(1, 10)
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.subscribe(data -> log.info("# onNext: {}", data));
}
}
:1부터 10개의 숫자 emit -> 짝수만 필터링 -> 필터링 된 데이터에 2를 곱하여 subscriber에게 전달
결과
12:55:24.957 [main] INFO com.codestates.example.schedulers.SchedulersExample01 - # onNext: 4
12:55:24.958 [main] INFO com.codestates.example.schedulers.SchedulersExample01 - # onNext: 8
12:55:24.958 [main] INFO com.codestates.example.schedulers.SchedulersExample01 - # onNext: 12
12:55:24.958 [main] INFO com.codestates.example.schedulers.SchedulersExample01 - # onNext: 16
12:55:24.958 [main] INFO com.codestates.example.schedulers.SchedulersExample01 - # onNext: 20
:[main]
:main 쓰레드에서 실행됨
2 subscribeOn() Operator 추가하여 쓰레드 하나 더 생성
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
/**
* subscribeOn() Operator를 이용해서 Scheduler를 추가할 경우
*/
@Slf4j
public class SchedulersExample02 {
public static void main(String[] args) throws InterruptedException {
Flux
.range(1, 10)
.doOnSubscribe(subscription -> log.info("# doOnSubscribe")) // 1)
.subscribeOn(Schedulers.boundedElastic()) // 2)
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
}
}
1) .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
2) .subscribeOn(Schedulers.boundedElastic())
:Schedulers.boundedElastic()
:Scheduler 지정
:구독 직후 실행되는 쓰레드가 main 쓰레드에서 Scheduler로 지정한 쓰레드로 바뀜
결과
13:51:51.897 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample02 - # doOnSubscribe
13:51:51.902 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample02 - # onNext: 4
13:51:51.903 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample02 - # onNext: 8
13:51:51.904 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample02 - # onNext: 12
13:51:51.904 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample02 - # onNext: 16
13:51:51.904 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample02 - # onNext: 20
:doOnSubscribe() Operator에서 출력되는 로그부터
[boundedElastic-1]
이름의 쓰레드에서 실행됨
subscribeOn() Operator
:구독 직후 실행되는 Operator 체인의 실행 쓰레드를 Scheduler에서 지정한 쓰레드로 변경함
doOnSubscribe() Operator
:구독 발생 직후에 트리거 되는 Operator
:구독 직후에 실행되는 쓰레드와 동일 쓰레드에서 실행
:만약 구독 직후 어떤 동작을 수행하고 싶을 떄, doOnSubscribe()에 로직 작성하면 됨
3 publishOn() Operator 추가
@Slf4j
public class SchedulersExample03 {
public static void main(String[] args) throws InterruptedException {
Flux
.range(1, 10)
.subscribeOn(Schedulers.boundedElastic())
.doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
.publishOn(Schedulers.parallel()) // 1)
.filter(n -> n % 2 == 0)
.doOnNext(data -> log.info("# filter doOnNext")) // 2)
.publishOn(Schedulers.parallel()) // 3)
.map(n -> n * 2)
.doOnNext(data -> log.info("# map doOnNext")) // 4)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
}
}
:두 개의 publishOn() 추가됨
:publishOn() 추가할 때마다 추가한 publishOn()을 기준으로 Downstream쪽 쓰레드가 publishOn()에서 Scheduler로 지정한 쓰레드로 변경됨
결과
14:18:10.652 [main] INFO com.codestates.example.schedulers.SchedulersExample03 - # doOnSubscribe
14:18:10.660 [parallel-2] INFO com.codestates.example.schedulers.SchedulersExample03 - # filter doOnNext
14:18:10.660 [parallel-2] INFO com.codestates.example.schedulers.SchedulersExample03 - # filter doOnNext
14:18:10.660 [parallel-2] INFO com.codestates.example.schedulers.SchedulersExample03 - # filter doOnNext
14:18:10.660 [parallel-2] INFO com.codestates.example.schedulers.SchedulersExample03 - # filter doOnNext
14:18:10.660 [parallel-2] INFO com.codestates.example.schedulers.SchedulersExample03 - # filter doOnNext
14:18:10.660 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # map doOnNext
14:18:10.660 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # onNext: 4
14:18:10.662 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # map doOnNext
14:18:10.662 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # onNext: 8
14:18:10.662 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # map doOnNext
14:18:10.662 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # onNext: 12
14:18:10.662 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # map doOnNext
14:18:10.662 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # onNext: 16
14:18:10.662 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # map doOnNext
14:18:10.662 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # onNext: 20
doOnNext()
:doOnNext() 바로 앞에 위치한 Operator가 실행될 때 트리거 되는 Operator임
:여기서는 filter()와 map() Operator가 어느 쓰레드에서 실행되는지 확인하는 용도로 사용
subscribeOn() vs publishOn()
Flux
.range(1, 10)
.subscribeOn(Schedulers.boundedElastic())
.doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
.publishOn(Schedulers.parallel()) // (1)
.filter(n -> n % 2 == 0)
.doOnNext(data -> log.info("# filter doOnNext")) // (2)
.publishOn(Schedulers.parallel()) // (3)
.map(n -> n * 2)
.doOnNext(data -> log.info("# map doOnNext")) // (4)
.subscribe(data -> log.info("# onNext: {}", data));
-subscribOn()
:구독 시점 직후의 실행 흐름을 다른 쓰레드로 바꾸는데 사용
(? 구독 시점 직후 실행되는 작업은?
:위 코드에서 range() Operator처럼
원본 데이터를 생성하고, 생성한 데이터를 emit하는 작업이 구독 직후 실행됨<노이해중>)
:즉 주로 데이터 소스에서 데이터를 emti하는 원본Publisher의 실행 쓰레드를 지정하는 역할을 함
-publishOn()
:전달받은 데이터를 가공하는 Operator 앞에 추가해서
실행 쓰레드를 별도로 추가하는 역할을 함
-주의할 점:
publishOn(): Operator앞에 여러번 추가할 경우 별도의 쓰레드가 추가 생성됨
subscribeOn(): 여러번 추가해도 하나의 쓰레드만 생성됨