Scheduler 의 종류

알파로그·2023년 11월 10일

✏️ Scheduler 의 종류

📍 immediate()

  • 별도의 쓰레드를 추가 할당하지 않고, 현재 쓰레드에서 실행시킴
public void immediate() throws InterruptedException {
    Flux.fromArray(new Integer[]{1, 3, 5, 7})
            .publishOn(Schedulers.parallel())
            .filter(data -> data > 3)
            .doOnNext(data -> print("filter : " + data))
            .publishOn(Schedulers.immediate())
            .map(data -> data * 10)
            .doOnNext(data -> print("map : " + data))
            .subscribe(Start::print);

    Thread.sleep(500L);
}
  • 첫번째 Thread 할당에서 parallel() 을 지정하고,
    두번째에서 immediate() 을 사용해 기존 쓰레드를 계속 사용하도록 설정함
[parallel-1] INFO -- filter : 5
[parallel-1] INFO -- map : 50
[parallel-1] INFO -- sub : 50
[parallel-1] INFO -- filter : 7
[parallel-1] INFO -- map : 70
[parallel-1] INFO -- sub : 70

📍 single()

  • 하나의 쓰레드를 생성해 Scheduler 가 제거될 때 까지 사용함
  • low latency (저지연) 일회성 실행에 최적화 되어 있다.
public void single() throws InterruptedException {
    Flux.fromArray(new Integer[]{1, 3, 5, 7})
            .publishOn(Schedulers.single())
            .filter(data -> data > 3)
            .doOnNext(data -> print("filter : " + data))
            .publishOn(Schedulers.single())
            .map(data -> data * 10)
            .doOnNext(data -> print("map : " + data))
            .subscribe(Start::print);

    Thread.sleep(500L);
}
  • immediate() 와 다르게 single() 이 처음 호출되면 새로운 Thread 를 생성하고,
    한번 더 호출될 경우 새로 생성하지 않고 그대로 재사용 되고 있다.
[single-1] INFO -- filter : 5
[single-1] INFO -- filter : 7
[single-1] INFO -- map : 50
[single-1] INFO -- sub : 50
[single-1] INFO -- map : 70
[single-1] INFO -- sub : 70

📍 boundedElasic()

  • 쓰레드 풀을 생성해 생성된 쓰레드를 재사용
  • 생성할 수 있는 쓰레드 수에 제한이 있다.
    • Default 는 CPU 코어 수 * 10
  • Blocking I/O 같이 긴 실행 시간을 가진 작업에 최적화

📍 parallel()

  • 여러 쓰레드를 할당해 동시에 작업을 수행
  • Non-Blocking I/O 작업에 최적화
    • CPU 코어의 수 만큼 쓰레드를 생성할 수 있음

📍 fromExecutorService()

  • 기존의 ExecutorService 를 사용해 쓰레드를 생성
  • 의미있는 식별자를 제공하기 때문에 Metric 에서 주로 사용된다.

📍 new…()

  • 다양한 유형의 새로운 Scheduler 를 생성할 수 있다.
    • newSingle(), newParallel(), newBoundedElasic()
  • 호출 할 때 마다 무조건 새로운 Thread 를 생성함
    • 중혹 호출시 재활용 되는 single 도 new 를 붙이면 항상 Thread 를 생성함
  • newBoundedElastic() 으로 Thread 의 이름과 수를 설정할 수 있다.
public void newBoundedElastic() {
    Scheduler scheduler = Schedulers.newBoundedElastic(2, 2, "my-Thread");
    Mono<Integer> mono = Mono
            .just(1)
            .subscribeOn(scheduler);

    for (int i = 1; i < 7; i++)
				Long ms = i > 2 ? 0L : 3000L;
        subscribe(mono, ms, i);
}

private void subscribe(Mono<Integer> mono, Long ms, int number) {
    mono.subscribe(data -> {
        log.info("subscribe " + number + " doing : {}", data);
        sleep(ms);
        log.info("subscribe " + number + " done : {}", data);
    });
}
  • subscribe 1 과 2 가 먼저 Thread 를 할당받는다.
    • newBoundedElastic() 는 2개의 큐를 갖는 2개의 Thread 를 생성한다.
  • 더이상 Thread 가 없기 때문에 sub3 부터는 큐에서 대기한다.
    • 이 때 큐의 size 또한 2개로 설정했기 때문에 sub 이 하나 더 추가될 경우 오류가 발생하게 된다.
  • 3초 후 sub 1 과 2의 작업이 완료되고 Q 에서 대기중인 작업들이 순차적으로 실행된다.
  • newBoundedElastic() 가 생성한 Thread 는 데몬 Thread 가 아닌 유저 Thread 로 작업이 완료되어도 일정시간동안 앱이 종료되지 않는다.
[my-Thread-2] INFO -- subscribe 2 doing : 1
[my-Thread-1] INFO -- subscribe 1 doing : 1
[my-Thread-2] INFO -- subscribe 2 done : 1
[my-Thread-1] INFO -- subscribe 1 done : 1

[my-Thread-1] INFO -- subscribe 3 doing : 1
[my-Thread-2] INFO -- subscribe 4 doing : 1
[my-Thread-1] INFO -- subscribe 5 doing : 1
[my-Thread-2] INFO -- subscribe 6 doing : 1
profile
잘못된 내용 PR 환영

0개의 댓글