[Reactive] 10. Scheduler

Jimin Lim·2024년 4월 8일
0

Spring

목록 보기
17/18
post-thumbnail
post-custom-banner

10.1 스레드의 개념 이해

  • 물리적인 스레드: 물리적인 코어를 논리적으로 나눈 코어
  • 논리적인 스레드: 프로세스 내에서 실행되는 세부 작업의 단위, 이론적으로는 메모리가 허용하는 범위 내에서 얼마든지 만들 수 있지만 물리적인 스레드의 가용 범위 내에서 실행 가능

10.2 Scheduler란?

  • Scheduler: 어떤 스레드에서 무엇을 처리할지 제어

-> 스레드 간의 경쟁 조건 등 고려해야 하는데, Reactor에서 Scheduler로 문제 최소화 가능

10.3 Scheduler를 위한 전용 Operator

subscribeOn()

  • 구독이 발생한 직후 실행될 스레드를 지정하는 Operator
  • subscribeOn()구독 시점 직후에 실행되므로 원본 Publisher의 동작을 수행하기 위한 스레드임
        Flux.fromArray(new Integer[] {1, 3, 5, 7})
                .subscribeOn(Schedulers.boundedElastic()) //구독 발생 직후 원본 Publisher 동작 처리
                .doOnNext(data -> log.info("# doOnNext: {}", data))
                .doOnSubscribe(subscription -> log.info("# doOnSubscribe")) //구독 발생 시점
                .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(500L);
  • doOnSubscribe은 메인 스레드에서 실행, emit 될때 스레드 변경됨
  • 다음 로그들은 boundedElastic 스레드에서 실행

publishOn()

  • Downstream으로 Signal 전송할 때 실행되는 스레드 제어
  • publishOn() 기준으로 Downstream의 실행 스레드가 변경됨
        Flux.fromArray(new Integer[] {1, 3, 5, 7})
                .doOnNext(data -> log.info("# doOnNext: {}", data))
                .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
                .publishOn(Schedulers.parallel())
                .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(500L);
  • doOnSubscribe은 메인 스레드에서 실행, publishOn 이후는 parallel 스레드에서 실행

parallel()

  • subscribeOn, publishOn은 동시성을 가지는 논리적인 스레드에 해당하지만 parallel은 병렬성을 가지는 물리적인 스레드에 해당
    • 라운드 로빈 방식으로 CPU 코어 개수만큼 스레드를 병렬로 실행한다.
  • runOn() operator 로 스레드 할당을 해줘야 하며, parallel() 인자에서 스레드 개수를 지정해줄 수 있다.

10.4 publishOn()과 subscribeOn()의 동작 이해

publishOn() 을 여러 개 사용할 경우

  • publishOn() 이후로 지정한 스케줄러로 실행

subscribeOn(), publishOn() 을 같이 사용한 경우

  • subscribeOn()은 구독이 발생한 직후 실행될 스레드를 지정하는 것이므로 fromArray()는 A thread에서 실행
  • filter는 별도의 publishOn()이 추가되지 않았으므로 여전히 같은 스레드에서 실행
  • publishOn()이 추가되면서 B스레드로 변경

정리

  • publishOn()은 한 개 이상 사용 가능하며 실행 스레드를 목적에 맞게 분리
  • subscribeOn()은 체인상 어느 위치에 있든 구독 시점 직후, 즉 Publisher가 데이터를 emit하기 전 실행 스레드 변경

10.5 Scheduler 종류

  • single(): 스레드 하나만 생성, 새롭게 구독하더라도 첫 번째 호출에서 생성된 스레드 재사용
  • newSingle(): 호출할 때마다 새로운 스레드 하나 생성
  • boundedElastic(): ExecutorService 기반의 스레드 풀을 생성한 후 스레드 사용 후 반납, Blocking I/O에 최적화
  • parallel(): CPU 코어 수 만큼 스레드 생성, Non-Blocking I/O에 최적화

궁금 & 알아볼 것,,

  • operator parallel()과 다른 operator로 Schedulers.parallel() 지정하는거 차이
  • 10-1, 10-2에서 예제의 스케줄러 차이로 실행 순서가 변경될 것 같은데 구체적인 동작 이해 @_@
profile
💻 ☕️ 🏝 🍑 🍹 🏊‍♀️
post-custom-banner

0개의 댓글