✏️ Scheduler 의 종류
- 별도의 쓰레드를 추가 할당하지 않고, 현재 쓰레드에서 실행시킴
public void immediate() throws InterruptedException {
Flux.fromArray(new Integer[]{1, 3, 5, 7})
.publishOn(Schedulers.parallel())
.filter(data -> data > 3)
.doOnNext(data -> print("filter : " + data))
.publishOn(Schedulers.immediate())
.map(data -> data * 10)
.doOnNext(data -> print("map : " + data))
.subscribe(Start::print);
Thread.sleep(500L);
}
- 첫번째 Thread 할당에서
parallel() 을 지정하고,
두번째에서 immediate() 을 사용해 기존 쓰레드를 계속 사용하도록 설정함
[parallel-1] INFO -- filter : 5
[parallel-1] INFO -- map : 50
[parallel-1] INFO -- sub : 50
[parallel-1] INFO -- filter : 7
[parallel-1] INFO -- map : 70
[parallel-1] INFO -- sub : 70
📍 single()
- 하나의 쓰레드를 생성해 Scheduler 가 제거될 때 까지 사용함
- low latency (저지연) 일회성 실행에 최적화 되어 있다.
public void single() throws InterruptedException {
Flux.fromArray(new Integer[]{1, 3, 5, 7})
.publishOn(Schedulers.single())
.filter(data -> data > 3)
.doOnNext(data -> print("filter : " + data))
.publishOn(Schedulers.single())
.map(data -> data * 10)
.doOnNext(data -> print("map : " + data))
.subscribe(Start::print);
Thread.sleep(500L);
}
immediate() 와 다르게 single() 이 처음 호출되면 새로운 Thread 를 생성하고,
한번 더 호출될 경우 새로 생성하지 않고 그대로 재사용 되고 있다.
[single-1] INFO -- filter : 5
[single-1] INFO -- filter : 7
[single-1] INFO -- map : 50
[single-1] INFO -- sub : 50
[single-1] INFO -- map : 70
[single-1] INFO -- sub : 70
📍 boundedElasic()
- 쓰레드 풀을 생성해 생성된 쓰레드를 재사용
- 생성할 수 있는 쓰레드 수에 제한이 있다.
- Blocking I/O 같이 긴 실행 시간을 가진 작업에 최적화
📍 parallel()
- 여러 쓰레드를 할당해 동시에 작업을 수행
- Non-Blocking I/O 작업에 최적화
- CPU 코어의 수 만큼 쓰레드를 생성할 수 있음
📍 fromExecutorService()
- 기존의 ExecutorService 를 사용해 쓰레드를 생성
- 의미있는 식별자를 제공하기 때문에 Metric 에서 주로 사용된다.
📍 new…()
- 다양한 유형의 새로운 Scheduler 를 생성할 수 있다.
- newSingle(), newParallel(), newBoundedElasic()
- 호출 할 때 마다 무조건 새로운 Thread 를 생성함
- 중혹 호출시 재활용 되는 single 도 new 를 붙이면 항상 Thread 를 생성함
newBoundedElastic() 으로 Thread 의 이름과 수를 설정할 수 있다.
public void newBoundedElastic() {
Scheduler scheduler = Schedulers.newBoundedElastic(2, 2, "my-Thread");
Mono<Integer> mono = Mono
.just(1)
.subscribeOn(scheduler);
for (int i = 1; i < 7; i++)
Long ms = i > 2 ? 0L : 3000L;
subscribe(mono, ms, i);
}
private void subscribe(Mono<Integer> mono, Long ms, int number) {
mono.subscribe(data -> {
log.info("subscribe " + number + " doing : {}", data);
sleep(ms);
log.info("subscribe " + number + " done : {}", data);
});
}
- subscribe 1 과 2 가 먼저 Thread 를 할당받는다.
newBoundedElastic() 는 2개의 큐를 갖는 2개의 Thread 를 생성한다.
- 더이상 Thread 가 없기 때문에 sub3 부터는 큐에서 대기한다.
- 이 때 큐의 size 또한 2개로 설정했기 때문에 sub 이 하나 더 추가될 경우 오류가 발생하게 된다.
- 3초 후 sub 1 과 2의 작업이 완료되고 Q 에서 대기중인 작업들이 순차적으로 실행된다.
newBoundedElastic() 가 생성한 Thread 는 데몬 Thread 가 아닌 유저 Thread 로 작업이 완료되어도 일정시간동안 앱이 종료되지 않는다.
[my-Thread-2] INFO -- subscribe 2 doing : 1
[my-Thread-1] INFO -- subscribe 1 doing : 1
[my-Thread-2] INFO -- subscribe 2 done : 1
[my-Thread-1] INFO -- subscribe 1 done : 1
[my-Thread-1] INFO -- subscribe 3 doing : 1
[my-Thread-2] INFO -- subscribe 4 doing : 1
[my-Thread-1] INFO -- subscribe 5 doing : 1
[my-Thread-2] INFO -- subscribe 6 doing : 1