Reactor를 활용한 데이터 스트림 처리

greenTea·2024년 1월 16일
0

Reactor를 활용한 데이터 스트림 처리

예제 1: 스트림 처리

Flux.create(sink -> {
    for (int i = 0; i < 5; i++) {
        log.info("next : {}", i);
        sink.next(i);
    }
})
.publishOn(Schedulers.newSingle("single"))
.doOnNext(item -> {
    log.info("doOnNext: {}",item);
})
.publishOn(Schedulers.boundedElastic())
.doOnNext(item -> {
    log.info("doOnNext2: {}",item);
})
.subscribeOn(Schedulers.newSingle("my-main"))
.subscribe(value -> {
    log.info("value: " + value);
});

코드 분석:

  1. Flux.create(sink -> {...}):

    • Flux.create()는 개발자가 프로그래밍적으로 데이터를 방출할 수 있는 방법을 제공합니다. 여기서 sink 객체를 통해 데이터 항목을 방출합니다.
    • for 루프를 사용해 0부터 4까지 숫자를 순차적으로 방출합니다. 각 숫자 방출 시, 로그를 찍어 현재 방출되는 데이터를 추적합니다.
  2. .publishOn(Schedulers.newSingle("single")):

    • publishOn은 스트림의 처리를 지정된 스케줄러로 전환합니다. 여기서는 "single"라는 이름의 싱글 스레드 스케줄러를 생성해 사용합니다.
    • 이것은 데이터의 처리(즉, doOnNext 이후의 연산)가 "single" 스케줄러에서 실행됨을 의미합니다.
  3. .doOnNext(item -> {...}):

    • doOnNext는 스트림의 각 데이터 항목이 처리될 때 실행되는 콜백입니다. 여기서는 각 아이템을 로그로 출력합니다.
  4. .publishOn(Schedulers.boundedElastic()):

    • 두 번째 publishOn은 스트림의 처리를 boundedElastic 스케줄러로 전환합니다. 이 스케줄러는 여러 스레드를 통해 작업을 분산시킬 수 있습니다.
  5. .subscribeOn(Schedulers.newSingle("my-main")):

    • subscribeOn은 스트림의 구독 자체가 실행될 스케줄러를 지정합니다. 여기서는 "my-main"이라는 싱글 스레드 스케줄러에서 구독 로직이 실행됩니다.
  6. .subscribe(value -> {...}):

    • subscribe는 스트림에 최종적으로 구독하고, 방출된 각 값에 대한 처리를 정의합니다. 여기서는 구독한 값이 로그로 출력됩니다.

예제 2: 에러

핸들링의 다양한 방법

// Flux 에러 처리
Flux.error(new ArithmeticException("ArithmeticException!!!"))
    .doOnError(e -> log.info("doOnError: {}", e.getMessage()))
    .onErrorMap(e -> new MyException("MyException!!!"))
    .subscribe(null, e -> {
        log.info("error:" + e.getMessage());
    });

// Mono 에러 처리
Mono.error(new ArithmeticException("ArithmeticException!!!"))
    .onErrorReturn(0)
    .subscribe(v -> {
        log.info("return : {}", v);
    });

Mono.error(new ArithmeticException("ArithmeticException!!!"))
    .onErrorComplete()
    .subscribe(null, null, () -> log.info("complete"));

코드 분석:

  1. Flux.error(...).doOnError(...).onErrorMap(...).subscribe(...):

    • Flux.error는 의도적으로 에러를 발생시키는 예제입니다. 이를 통해 에러 처리 메커니즘을 살펴볼 수 있습니다.
    • doOnError는 에러 발생 시 로그를 찍습니다.
    • onErrorMap은 발생한 에러를 다른 타입의 에러로 변환합니다. 여기서는 ArithmeticExceptionMyException으로 변환합니다.
    • subscribe의 두 번째 파라미터에서 변환된 에러를 처리합니다.
  2. Mono.error(...).onErrorReturn(...).subscribe(...):

    • Mono.error로 에러를 발생시키고, onErrorReturn을 사용하여 에러가 발생했을 때 반환할 기본값을 지정합니다.
    • subscribe에서 반환된 값을 로그로 출력합니다.
  3. Mono.error(...).onErrorComplete(...).subscribe(...):

    • 또 다른 Mono.error 예제에서는 onErrorComplete을 사용하여 에러를 무시하고 스트림을 정상적으로 완료하도록 합니다.
    • subscribe의 세 번째 파라미터에서 완료 로직을 정의합니다.

예제 3: concat 사용하기

Flux<Integer> flux1 = Flux.range(1, 3)
    .doOnSubscribe(value -> {
        log.info("doOnSubscribe1");
    })
    .delayElements(Duration.ofMillis(100));

Flux<Integer> flux2 = Flux.range(10, 3)
    .doOnSubscribe(value -> {
        log.info("doOnSubscribe2");
    })
    .delayElements(Duration.ofMillis(100));

Flux.concat(flux1, flux2)
    .doOnNext(value -> log.info("doOnNext: " + value))
    .subscribe();
Thread.sleep(1000);

concat의 특징:

  • Flux.concat()은 여러 스트림을 순차적으로 결합합니다. 첫 번째 스트림이 완료된 후에 두 번째 스트림이 시작됩니다.
  • 예제에서는 flux1이 완전히 처리된 후 flux2가 처리됩니다. 즉, 1, 2, 3의 숫자가 방출된 후에 10, 11, 12가 방출됩니다.

예제 4: merge 사용하기

Flux<Integer> flux1 = Flux.range(1, 3)
    .doOnSubscribe(value -> {
        log.info("doOnSubscribe1");
    })
    .delayElements(Duration.ofMillis(100));

Flux<Integer> flux2 = Flux.range(10, 3)
    .doOnSubscribe(value -> {
        log.info("doOnSubscribe2");
    })
    .delayElements(Duration.ofMillis(100));

Flux.merge(flux1, flux2)
    .doOnNext(value -> log.info("doOnNext: " + value))
    .subscribe();
Thread.sleep(1000);

merge의 특징:

  • Flux.merge()는 여러 스트림을 병렬로 결합합니다. 스트림 간의 순서는 보장되지 않습니다.
  • 이 예제에서는 flux1flux2가 병렬로 실행되며, 데이터는 각 스트림에서 생성되는 순서대로 방출됩니다. 따라서 1, 10, 2, 11, 3, 12와 같은 순서로 데이터가 방출될 수 있습니다.

예제 5: mergeSequential 사용하기

Flux<Integer> flux1 = Flux.range(1, 3)
    .doOnSubscribe(value -> {
        log.info("doOnSubscribe1");
    })
    .delayElements(Duration.ofMillis(100));

Flux<Integer> flux2 = Flux.range(10, 3)
    .doOnSubscribe(value -> {
        log.info("doOnSubscribe2");
    })
    .delayElements(Duration.ofMillis(100));

Flux.mergeSequential(flux1, flux2)
    .doOnNext(value -> log.info("doOnNext: " + value))
    .subscribe();
Thread.sleep(1000);

mergeSequential의 특징:

  • Flux.mergeSequential()merge와 비슷하지만, 스트림을 제공된 순서대로 처리합니다. 즉, 첫 번째 스트림의 모든 데이터가 방출된 후에 두 번째 스트림이 처리됩니다.
  • 이 방식은 concat과 유사한 결과를 제공하지만, 내부적으로는 병렬 처리가 가능하여 효율적일 수 있습니다.
  • 예제에서는 flux1의 데이터 처리가 완료된 후 flux2의 데이터 처리가 시작됩니다.

결론

다양한 스트림 처리 메소드를 활용하여 스트림 처리의 흐름을 어떻게 제어할 수 있는지 보여줍니다. schedule을 이용하면 각 단계에서의 데이터 처리를 추적할 수 있으며, 이를 통해 비동기 및 병렬 처리의 복잡성을 효과적으로 관리할 수 있습니다.

concat, merge, mergeSequential은 각각 다른 시나리오에 적합한 스트림

조합 방법을 제공합니다. concat은 순차 처리가 필요할 때, merge는 스트림 간의 순서가 중요하지 않은 병렬 처리에 적합하며, mergeSequential은 순서를 유지하면서도 병렬 처리의 이점을 취하고자 할 때 사용할 수 있습니다.(concatmerge의 특성을 둘다 살린) Reactor를 활용함으로써 복잡한 데이터 스트림을 효율적으로 관리하고 조합할 수 있습니다.

profile
greenTea입니다.

0개의 댓글