Java_스케줄러

Minki CHO·2023년 1월 29일
0

CodeStates

목록 보기
34/43

스케줄러 Scheduler

? 스케줄러 Scheduler란?
:쓰레드를 관리하는 관리자 역할을 함
:Reactor Sequence 상에서 처리되는 동작들을 하나 이상의 쓰레드에서 동작하도록 별도의 쓰레드를 제공해줌
:"Reactor의 Scheduler는 복잡한 멀티쓰레딩 프로세스를 단순하게 해줌"

Scheduler 전용 Operator

:Reactor에서는 Scheduler를 위한 별도의 Operator 제공
:즉 적절한 상황에 맞는 쓰레드를 추가로 생성하는 Operator임

1 Scheduler를 추가하지 않은 Sequence의 쓰레드 확인

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

/**
 * Scheduler를 추가하지 않을 경우
 */
@Slf4j
public class SchedulersExample01 {
    public static void main(String[] args) {
        Flux
            .range(1, 10)
            .filter(n -> n % 2 == 0)
            .map(n -> n * 2)
            .subscribe(data -> log.info("# onNext: {}", data));
    }
}

:1부터 10개의 숫자 emit -> 짝수만 필터링 -> 필터링 된 데이터에 2를 곱하여 subscriber에게 전달

결과

12:55:24.957 [main] INFO com.codestates.example.schedulers.SchedulersExample01 - # onNext: 4
12:55:24.958 [main] INFO com.codestates.example.schedulers.SchedulersExample01 - # onNext: 8
12:55:24.958 [main] INFO com.codestates.example.schedulers.SchedulersExample01 - # onNext: 12
12:55:24.958 [main] INFO com.codestates.example.schedulers.SchedulersExample01 - # onNext: 16
12:55:24.958 [main] INFO com.codestates.example.schedulers.SchedulersExample01 - # onNext: 20

:[main] :main 쓰레드에서 실행됨

2 subscribeOn() Operator 추가하여 쓰레드 하나 더 생성

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

/**
 * subscribeOn() Operator를 이용해서 Scheduler를 추가할 경우
 */
@Slf4j
public class SchedulersExample02 {
    public static void main(String[] args) throws InterruptedException {
        Flux
            .range(1, 10)
            .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))  // 1)
            .subscribeOn(Schedulers.boundedElastic())     // 2)
            .filter(n -> n % 2 == 0)
            .map(n -> n * 2)
            .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(100L);
    }
}

1) .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
2) .subscribeOn(Schedulers.boundedElastic())
:Schedulers.boundedElastic()
:Scheduler 지정
:구독 직후 실행되는 쓰레드가 main 쓰레드에서 Scheduler로 지정한 쓰레드로 바뀜

결과

13:51:51.897 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample02 - # doOnSubscribe
13:51:51.902 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample02 - # onNext: 4
13:51:51.903 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample02 - # onNext: 8
13:51:51.904 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample02 - # onNext: 12
13:51:51.904 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample02 - # onNext: 16
13:51:51.904 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample02 - # onNext: 20

:doOnSubscribe() Operator에서 출력되는 로그부터
[boundedElastic-1] 이름의 쓰레드에서 실행됨

subscribeOn() Operator
:구독 직후 실행되는 Operator 체인의 실행 쓰레드를 Scheduler에서 지정한 쓰레드로 변경함

doOnSubscribe() Operator
:구독 발생 직후에 트리거 되는 Operator
:구독 직후에 실행되는 쓰레드와 동일 쓰레드에서 실행
:만약 구독 직후 어떤 동작을 수행하고 싶을 떄, doOnSubscribe()에 로직 작성하면 됨

3 publishOn() Operator 추가

@Slf4j
public class SchedulersExample03 {
    public static void main(String[] args) throws InterruptedException {
        Flux
            .range(1, 10)
            .subscribeOn(Schedulers.boundedElastic())
            .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))

            .publishOn(Schedulers.parallel())  // 1)
            .filter(n -> n % 2 == 0)
            .doOnNext(data -> log.info("# filter doOnNext"))  // 2)

            .publishOn(Schedulers.parallel())    // 3)
            .map(n -> n * 2)
            .doOnNext(data -> log.info("# map doOnNext")) // 4)

            .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(100L);
    }
}

:두 개의 publishOn() 추가됨
:publishOn() 추가할 때마다 추가한 publishOn()을 기준으로 Downstream쪽 쓰레드가 publishOn()에서 Scheduler로 지정한 쓰레드로 변경됨

결과

14:18:10.652 [main] INFO com.codestates.example.schedulers.SchedulersExample03 - # doOnSubscribe
14:18:10.660 [parallel-2] INFO com.codestates.example.schedulers.SchedulersExample03 - # filter doOnNext
14:18:10.660 [parallel-2] INFO com.codestates.example.schedulers.SchedulersExample03 - # filter doOnNext
14:18:10.660 [parallel-2] INFO com.codestates.example.schedulers.SchedulersExample03 - # filter doOnNext
14:18:10.660 [parallel-2] INFO com.codestates.example.schedulers.SchedulersExample03 - # filter doOnNext
14:18:10.660 [parallel-2] INFO com.codestates.example.schedulers.SchedulersExample03 - # filter doOnNext
14:18:10.660 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # map doOnNext
14:18:10.660 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # onNext: 4
14:18:10.662 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # map doOnNext
14:18:10.662 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # onNext: 8
14:18:10.662 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # map doOnNext
14:18:10.662 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # onNext: 12
14:18:10.662 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # map doOnNext
14:18:10.662 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # onNext: 16
14:18:10.662 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # map doOnNext
14:18:10.662 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample03 - # onNext: 20

doOnNext()
:doOnNext() 바로 앞에 위치한 Operator가 실행될 때 트리거 되는 Operator임
:여기서는 filter()와 map() Operator가 어느 쓰레드에서 실행되는지 확인하는 용도로 사용

subscribeOn() vs publishOn()

Flux
            .range(1, 10)
            .subscribeOn(Schedulers.boundedElastic())
            .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))

            .publishOn(Schedulers.parallel())  // (1)
            .filter(n -> n % 2 == 0)
            .doOnNext(data -> log.info("# filter doOnNext"))  // (2)

            .publishOn(Schedulers.parallel())    // (3)
            .map(n -> n * 2)
            .doOnNext(data -> log.info("# map doOnNext")) // (4)

            .subscribe(data -> log.info("# onNext: {}", data));

-subscribOn()
:구독 시점 직후의 실행 흐름을 다른 쓰레드로 바꾸는데 사용

(? 구독 시점 직후 실행되는 작업은?
:위 코드에서 range() Operator처럼
원본 데이터를 생성하고, 생성한 데이터를 emit하는 작업이 구독 직후 실행됨<노이해중>)

:즉 주로 데이터 소스에서 데이터를 emti하는 원본Publisher의 실행 쓰레드를 지정하는 역할을 함

-publishOn()
:전달받은 데이터를 가공하는 Operator 앞에 추가해서
실행 쓰레드를 별도로 추가하는 역할을 함

-주의할 점:
publishOn(): Operator앞에 여러번 추가할 경우 별도의 쓰레드가 추가 생성됨
subscribeOn(): 여러번 추가해도 하나의 쓰레드만 생성됨

profile
Developer

0개의 댓글