Reactor Sequence에서 사용되는 Thread를 관리해주는 관리자
Flux.fromArray(new Integer[] {1, 3, 5, 7})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(data -> log.info("# doOnNext: {}", data))
.doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
/*
23:23:16.452 [main] INFO - # doOnSubscribe
23:23:16.455 [boundedElastic-1] INFO - # doOnNext: 1
23:23:16.456 [boundedElastic-1] INFO - # onNext: 1
23:23:16.456 [boundedElastic-1] INFO - # doOnNext: 3
23:23:16.456 [boundedElastic-1] INFO - # onNext: 3
23:23:16.456 [boundedElastic-1] INFO - # doOnNext: 5
23:23:16.456 [boundedElastic-1] INFO - # onNext: 5
23:23:16.456 [boundedElastic-1] INFO - # doOnNext: 7
23:23:16.456 [boundedElastic-1] INFO - # onNext: 7
*/
publishOn()
을 기준으로 Downstream의 실행 스레드를 변경한다.Flux.fromArray(new Integer[] {1, 3, 5, 7})
.doOnNext(data -> log.info("# doOnNext: {}", data))
.doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
/*
23:31:21.160 [main] INFO - # doOnSubscribe
23:31:21.166 [main] INFO - # doOnNext: 1
23:31:21.168 [main] INFO - # doOnNext: 3
23:31:21.168 [parallel-1] INFO - # onNext: 1
23:31:21.168 [main] INFO - # doOnNext: 5
23:31:21.168 [main] INFO - # doOnNext: 7
23:31:21.172 [parallel-1] INFO - # onNext: 3
23:31:21.172 [parallel-1] INFO - # onNext: 5
23:31:21.173 [parallel-1] INFO - # onNext: 7
*/
parallel()
runOn()
Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
.parallel(4)
.runOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
/*
23:40:22.340 [parallel-1] INFO - # onNext: 1
23:40:22.340 [parallel-4] INFO - # onNext: 7
23:40:22.341 [parallel-3] INFO - # onNext: 5
23:40:22.340 [parallel-2] INFO - # onNext: 3
23:40:22.344 [parallel-3] INFO - # onNext: 13
23:40:22.345 [parallel-4] INFO - # onNext: 15
23:40:22.345 [parallel-1] INFO - # onNext: 9
23:40:22.345 [parallel-2] INFO - # onNext: 11
23:40:22.345 [parallel-1] INFO - # onNext: 17
23:40:22.345 [parallel-2] INFO - # onNext: 19
*/
publishOn()
Operator는 한 개 이상 사용할 수 있다.subscribeOn()
Operator와 publishOn()
Operator를 함께 사용해서 원본 Publisher에서 데이터를 emit하는 스레드와 emit된 데이터를 가공 처리하는 스레드를 적절하게 분리할 수 있다.subscribeOn()
는 Operator 체인상에서 어떤 위치에 있든 간에 구독 시점 직후Flux
.fromArray(new Integer[] {1, 3, 5, 7})
.doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
.filter(data -> data > 3)
.doOnNext(data -> log.info("# doOnNext filter: {}", data))
.map(data -> data * 10)
.doOnNext(data -> log.info("# doOnNext map: {}", data))
.subscribe(data -> log.info("# onNext: {}", data));
/*
19:33:03.233 [main] INFO - # doOnNext fromArray: 1
19:33:03.234 [main] INFO - # doOnNext fromArray: 3
19:33:03.234 [main] INFO - # doOnNext fromArray: 5
19:33:03.234 [main] INFO - # doOnNext filter: 5
19:33:03.234 [main] INFO - # doOnNext map: 50
19:33:03.235 [main] INFO - # onNext: 50
19:33:03.235 [main] INFO - # doOnNext fromArray: 7
19:33:03.235 [main] INFO - # doOnNext filter: 7
19:33:03.235 [main] INFO - # doOnNext map: 70
19:33:03.235 [main] INFO - # onNext: 70
*/
publishOn()
아래 쪽 Operator들의 실행 스레드를 변경한다.Flux
.fromArray(new Integer[] {1, 3, 5, 7})
.doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
.publishOn(Schedulers.parallel())
.filter(data -> data > 3)
.doOnNext(data -> log.info("# doOnNext filter: {}", data))
.map(data -> data * 10)
.doOnNext(data -> log.info("# doOnNext map: {}", data))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
/*
19:36:14.638 [main] INFO - # doOnNext fromArray: 1
19:36:14.639 [main] INFO - # doOnNext fromArray: 3
19:36:14.639 [main] INFO - # doOnNext fromArray: 5
19:36:14.639 [main] INFO - # doOnNext fromArray: 7
19:36:14.639 [parallel-1] INFO - # doOnNext filter: 5
19:36:14.640 [parallel-1] INFO - # doOnNext map: 50
19:36:14.640 [parallel-1] INFO - # onNext: 50
19:36:14.640 [parallel-1] INFO - # doOnNext filter: 7
19:36:14.640 [parallel-1] INFO - # doOnNext map: 70
19:36:14.640 [parallel-1] INFO - # onNext: 70
*/
publishOn()
을 만나기 전까지 publishOn()
아래 쪽 Operator들의 실행 스레드를 변경한다.Flux
.fromArray(new Integer[] {1, 3, 5, 7})
.doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
.publishOn(Schedulers.parallel())
.filter(data -> data > 3)
.doOnNext(data -> log.info("# doOnNext filter: {}", data))
.publishOn(Schedulers.parallel())
.map(data -> data * 10)
.doOnNext(data -> log.info("# doOnNext map: {}", data))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
/*
19:37:54.802 [main] INFO - # doOnNext fromArray: 1
19:37:54.803 [main] INFO - # doOnNext fromArray: 3
19:37:54.803 [main] INFO - # doOnNext fromArray: 5
19:37:54.803 [main] INFO - # doOnNext fromArray: 7
19:37:54.803 [parallel-2] INFO - # doOnNext filter: 5
19:37:54.804 [parallel-2] INFO - # doOnNext filter: 7
19:37:54.804 [parallel-1] INFO - # doOnNext map: 50
19:37:54.804 [parallel-1] INFO - # onNext: 50
19:37:54.804 [parallel-1] INFO - # doOnNext map: 70
19:37:54.804 [parallel-1] INFO - # onNext: 70
*/
subscribeOn()
은 구독 직후에 실행될 쓰레드를 지정한다.publishOn()
을 만나기 전까지 스레드를 변경하지 않는다.Flux
.fromArray(new Integer[] {1, 3, 5, 7})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
.filter(data -> data > 3)
.doOnNext(data -> log.info("# doOnNext filter: {}", data))
.publishOn(Schedulers.parallel())
.map(data -> data * 10)
.doOnNext(data -> log.info("# doOnNext map: {}", data))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
/*
19:40:02.915 [boundedElastic-1] INFO - # doOnNext fromArray: 1
19:40:02.916 [boundedElastic-1] INFO - # doOnNext fromArray: 3
19:40:02.917 [boundedElastic-1] INFO - # doOnNext fromArray: 5
19:40:02.917 [boundedElastic-1] INFO - # doOnNext filter: 5
19:40:02.917 [boundedElastic-1] INFO - # doOnNext fromArray: 7
19:40:02.917 [parallel-1] INFO - # doOnNext map: 50
19:40:02.917 [boundedElastic-1] INFO - # doOnNext filter: 7
19:40:02.917 [parallel-1] INFO - # onNext: 50
19:40:02.917 [parallel-1] INFO - # doOnNext map: 70
19:40:02.917 [parallel-1] INFO - # onNext: 70
*/
Flux
.fromArray(new Integer[] {1, 3, 5, 7})
.publishOn(Schedulers.parallel())
.filter(data -> data > 3)
.doOnNext(data -> log.info("# doOnNext filter: {}", data))
.publishOn(Schedulers.immediate())
.map(data -> data * 10)
.doOnNext(data -> log.info("# doOnNext map: {}", data))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(200L);
/*
19:44:55.012 [parallel-1] INFO - # doOnNext filter: 5
19:44:55.013 [parallel-1] INFO - # doOnNext map: 50
19:44:55.014 [parallel-1] INFO - # onNext: 50
19:44:55.014 [parallel-1] INFO - # doOnNext filter: 7
19:44:55.014 [parallel-1] INFO - # doOnNext map: 70
19:44:55.014 [parallel-1] INFO - # onNext: 70
*/
doTask("task1")
.subscribe(data -> log.info("# onNext: {}", data));
doTask("task2")
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(200L);
private static Flux<Integer> doTask(String taskName) {
return Flux.fromArray(new Integer[] {1, 3, 5, 7})
.publishOn(Schedulers.single())
.filter(data -> data > 3)
.doOnNext(data -> log.info("# {} doOnNext filter: {}", taskName, data))
.map(data -> data * 10)
.doOnNext(data -> log.info("# {} doOnNext map: {}", taskName, data));
}
/*
19:46:07.861 [single-1] INFO - # task1 doOnNext filter: 5
19:46:07.863 [single-1] INFO - # task1 doOnNext map: 50
19:46:07.863 [single-1] INFO - # onNext: 50
19:46:07.863 [single-1] INFO - # task1 doOnNext filter: 7
19:46:07.863 [single-1] INFO - # task1 doOnNext map: 70
19:46:07.863 [single-1] INFO - # onNext: 70
19:46:07.863 [single-1] INFO - # task2 doOnNext filter: 5
19:46:07.864 [single-1] INFO - # task2 doOnNext map: 50
19:46:07.864 [single-1] INFO - # onNext: 50
19:46:07.864 [single-1] INFO - # task2 doOnNext filter: 7
19:46:07.864 [single-1] INFO - # task2 doOnNext map: 70
19:46:07.864 [single-1] INFO - # onNext: 70
*/
doTask("task1")
.subscribe(data -> log.info("# onNext: {}", data));
doTask("task2")
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(200L);
private static Flux<Integer> doTask(String taskName) {
return Flux.fromArray(new Integer[] {1, 3, 5, 7})
.publishOn(Schedulers.newSingle("new-single", true))
.filter(data -> data > 3)
.doOnNext(data -> log.info("# {} doOnNext filter: {}", taskName, data))
.map(data -> data * 10)
.doOnNext(data -> log.info("# {} doOnNext map: {}", taskName, data));
}
/*
19:46:50.784 [new-single-2] INFO - # task2 doOnNext filter: 5
19:46:50.784 [new-single-1] INFO - # task1 doOnNext filter: 5
19:46:50.786 [new-single-2] INFO - # task2 doOnNext map: 50
19:46:50.786 [new-single-1] INFO - # task1 doOnNext map: 50
19:46:50.786 [new-single-2] INFO - # onNext: 50
19:46:50.786 [new-single-1] INFO - # onNext: 50
19:46:50.786 [new-single-2] INFO - # task2 doOnNext filter: 7
19:46:50.786 [new-single-1] INFO - # task1 doOnNext filter: 7
19:46:50.786 [new-single-2] INFO - # task2 doOnNext map: 70
19:46:50.786 [new-single-1] INFO - # task1 doOnNext map: 70
19:46:50.786 [new-single-2] INFO - # onNext: 70
19:46:50.786 [new-single-1] INFO - # onNext: 70
*/