Flux.create(sink -> {
for (int i = 0; i < 5; i++) {
log.info("next : {}", i);
sink.next(i);
}
})
.publishOn(Schedulers.newSingle("single"))
.doOnNext(item -> {
log.info("doOnNext: {}",item);
})
.publishOn(Schedulers.boundedElastic())
.doOnNext(item -> {
log.info("doOnNext2: {}",item);
})
.subscribeOn(Schedulers.newSingle("my-main"))
.subscribe(value -> {
log.info("value: " + value);
});
Flux.create(sink -> {...})
:
Flux.create()
는 개발자가 프로그래밍적으로 데이터를 방출할 수 있는 방법을 제공합니다. 여기서 sink
객체를 통해 데이터 항목을 방출합니다.for
루프를 사용해 0부터 4까지 숫자를 순차적으로 방출합니다. 각 숫자 방출 시, 로그를 찍어 현재 방출되는 데이터를 추적합니다..publishOn(Schedulers.newSingle("single"))
:
publishOn
은 스트림의 처리를 지정된 스케줄러로 전환합니다. 여기서는 "single"
라는 이름의 싱글 스레드 스케줄러를 생성해 사용합니다.doOnNext
이후의 연산)가 "single"
스케줄러에서 실행됨을 의미합니다..doOnNext(item -> {...})
:
doOnNext
는 스트림의 각 데이터 항목이 처리될 때 실행되는 콜백입니다. 여기서는 각 아이템을 로그로 출력합니다..publishOn(Schedulers.boundedElastic())
:
publishOn
은 스트림의 처리를 boundedElastic
스케줄러로 전환합니다. 이 스케줄러는 여러 스레드를 통해 작업을 분산시킬 수 있습니다..subscribeOn(Schedulers.newSingle("my-main"))
:
subscribeOn
은 스트림의 구독 자체가 실행될 스케줄러를 지정합니다. 여기서는 "my-main"
이라는 싱글 스레드 스케줄러에서 구독 로직이 실행됩니다..subscribe(value -> {...})
:
subscribe
는 스트림에 최종적으로 구독하고, 방출된 각 값에 대한 처리를 정의합니다. 여기서는 구독한 값이 로그로 출력됩니다.핸들링의 다양한 방법
// Flux 에러 처리
Flux.error(new ArithmeticException("ArithmeticException!!!"))
.doOnError(e -> log.info("doOnError: {}", e.getMessage()))
.onErrorMap(e -> new MyException("MyException!!!"))
.subscribe(null, e -> {
log.info("error:" + e.getMessage());
});
// Mono 에러 처리
Mono.error(new ArithmeticException("ArithmeticException!!!"))
.onErrorReturn(0)
.subscribe(v -> {
log.info("return : {}", v);
});
Mono.error(new ArithmeticException("ArithmeticException!!!"))
.onErrorComplete()
.subscribe(null, null, () -> log.info("complete"));
Flux.error(...).doOnError(...).onErrorMap(...).subscribe(...)
:
Flux.error
는 의도적으로 에러를 발생시키는 예제입니다. 이를 통해 에러 처리 메커니즘을 살펴볼 수 있습니다.doOnError
는 에러 발생 시 로그를 찍습니다.onErrorMap
은 발생한 에러를 다른 타입의 에러로 변환합니다. 여기서는 ArithmeticException
을 MyException
으로 변환합니다.subscribe
의 두 번째 파라미터에서 변환된 에러를 처리합니다.Mono.error(...).onErrorReturn(...).subscribe(...)
:
Mono.error
로 에러를 발생시키고, onErrorReturn
을 사용하여 에러가 발생했을 때 반환할 기본값을 지정합니다.subscribe
에서 반환된 값을 로그로 출력합니다.Mono.error(...).onErrorComplete(...).subscribe(...)
:
Mono.error
예제에서는 onErrorComplete
을 사용하여 에러를 무시하고 스트림을 정상적으로 완료하도록 합니다.subscribe
의 세 번째 파라미터에서 완료 로직을 정의합니다.concat
사용하기Flux<Integer> flux1 = Flux.range(1, 3)
.doOnSubscribe(value -> {
log.info("doOnSubscribe1");
})
.delayElements(Duration.ofMillis(100));
Flux<Integer> flux2 = Flux.range(10, 3)
.doOnSubscribe(value -> {
log.info("doOnSubscribe2");
})
.delayElements(Duration.ofMillis(100));
Flux.concat(flux1, flux2)
.doOnNext(value -> log.info("doOnNext: " + value))
.subscribe();
Thread.sleep(1000);
concat
의 특징:Flux.concat()
은 여러 스트림을 순차적으로 결합합니다. 첫 번째 스트림이 완료된 후에 두 번째 스트림이 시작됩니다.flux1
이 완전히 처리된 후 flux2
가 처리됩니다. 즉, 1, 2, 3의 숫자가 방출된 후에 10, 11, 12가 방출됩니다.merge
사용하기Flux<Integer> flux1 = Flux.range(1, 3)
.doOnSubscribe(value -> {
log.info("doOnSubscribe1");
})
.delayElements(Duration.ofMillis(100));
Flux<Integer> flux2 = Flux.range(10, 3)
.doOnSubscribe(value -> {
log.info("doOnSubscribe2");
})
.delayElements(Duration.ofMillis(100));
Flux.merge(flux1, flux2)
.doOnNext(value -> log.info("doOnNext: " + value))
.subscribe();
Thread.sleep(1000);
merge
의 특징:Flux.merge()
는 여러 스트림을 병렬로 결합합니다. 스트림 간의 순서는 보장되지 않습니다.flux1
과 flux2
가 병렬로 실행되며, 데이터는 각 스트림에서 생성되는 순서대로 방출됩니다. 따라서 1, 10, 2, 11, 3, 12와 같은 순서로 데이터가 방출될 수 있습니다.mergeSequential
사용하기Flux<Integer> flux1 = Flux.range(1, 3)
.doOnSubscribe(value -> {
log.info("doOnSubscribe1");
})
.delayElements(Duration.ofMillis(100));
Flux<Integer> flux2 = Flux.range(10, 3)
.doOnSubscribe(value -> {
log.info("doOnSubscribe2");
})
.delayElements(Duration.ofMillis(100));
Flux.mergeSequential(flux1, flux2)
.doOnNext(value -> log.info("doOnNext: " + value))
.subscribe();
Thread.sleep(1000);
mergeSequential
의 특징:Flux.mergeSequential()
은 merge
와 비슷하지만, 스트림을 제공된 순서대로 처리합니다. 즉, 첫 번째 스트림의 모든 데이터가 방출된 후에 두 번째 스트림이 처리됩니다.concat
과 유사한 결과를 제공하지만, 내부적으로는 병렬 처리가 가능하여 효율적일 수 있습니다.flux1
의 데이터 처리가 완료된 후 flux2
의 데이터 처리가 시작됩니다.다양한 스트림 처리 메소드를 활용하여 스트림 처리의 흐름을 어떻게 제어할 수 있는지 보여줍니다. schedule을 이용하면 각 단계에서의 데이터 처리를 추적할 수 있으며, 이를 통해 비동기 및 병렬 처리의 복잡성을 효과적으로 관리할 수 있습니다.
concat
, merge
, mergeSequential
은 각각 다른 시나리오에 적합한 스트림
조합 방법을 제공합니다. concat
은 순차 처리가 필요할 때, merge
는 스트림 간의 순서가 중요하지 않은 병렬 처리에 적합하며, mergeSequential
은 순서를 유지하면서도 병렬 처리의 이점을 취하고자 할 때 사용할 수 있습니다.(concat
과 merge
의 특성을 둘다 살린) Reactor를 활용함으로써 복잡한 데이터 스트림을 효율적으로 관리하고 조합할 수 있습니다.