-> 스레드 간의 경쟁 조건 등 고려해야 하는데, Reactor에서 Scheduler로 문제 최소화 가능
subscribeOn()
subscribeOn()
은 구독 시점 직후에 실행되므로 원본 Publisher의 동작을 수행하기 위한 스레드임 Flux.fromArray(new Integer[] {1, 3, 5, 7})
.subscribeOn(Schedulers.boundedElastic()) //구독 발생 직후 원본 Publisher 동작 처리
.doOnNext(data -> log.info("# doOnNext: {}", data))
.doOnSubscribe(subscription -> log.info("# doOnSubscribe")) //구독 발생 시점
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
doOnSubscribe
은 메인 스레드에서 실행, emit 될때 스레드 변경됨publishOn()
publishOn()
기준으로 Downstream의 실행 스레드가 변경됨 Flux.fromArray(new Integer[] {1, 3, 5, 7})
.doOnNext(data -> log.info("# doOnNext: {}", data))
.doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
doOnSubscribe
은 메인 스레드에서 실행, publishOn
이후는 parallel 스레드에서 실행parallel()
runOn()
operator 로 스레드 할당을 해줘야 하며, parallel() 인자에서 스레드 개수를 지정해줄 수 있다.single()
: 스레드 하나만 생성, 새롭게 구독하더라도 첫 번째 호출에서 생성된 스레드 재사용newSingle()
: 호출할 때마다 새로운 스레드 하나 생성boundedElastic()
: ExecutorService 기반의 스레드 풀을 생성한 후 스레드 사용 후 반납, Blocking I/O에 최적화 parallel()
: CPU 코어 수 만큼 스레드 생성, Non-Blocking I/O에 최적화궁금 & 알아볼 것,,
parallel()
과 다른 operator로 Schedulers.parallel()
지정하는거 차이