WebFlux (2) - Operator

김정욱·2021년 10월 27일
0

WebFlux

목록 보기
5/5
post-thumbnail
post-custom-banner

reactor operator 참조
: https://projectreactor.io/docs/core/release/reference/#which-operator

Operator ?

[ 설명 ]

  • 흐름
    => Publisher → [Data 1] → OP1 → [Data 2] → OP2 → [Data 3] → Subscriber
  • WebFlux의 기반인 Reactor에서는 FluxMono라는 Publisher를 사용
  • Publisher에서 데이터를 변경하며, 새로운 Publisher / Subscriber로 생성하는 도구
  • Flux 와 Mono의 많은 operator를 통해 데이터를 조작할 수 있다
    ex) just() / range() / flatMap() / groupBy()

[ 분류 ]

시퀀스 생성 시 (Creating a New Sequence)

(# 앞에 Mono 혹은 Flux가 없다면 둘다 가능한 것)

  • just
    • just(T data)
    • null이 존재할 경우 T : Mono#justOrEmpty(T data)
  • iterate 관련
    • array : Flux#fromArray(T[] array)
    • collection 또는 iterable : Flux#fromIterable(Iterable it)
    • 정수 범위 : Flux#range(int start, int count)
    • 각 Subscription에 제공된 Stream : Flux#fromStream(Supplier<Stream>)

기존 시퀀스를 변경하는 경우(Transforming an Existing Sequence)

  • 기존 데이터 변경
    • 1 to 1 : Flux#map()
    • 1 to N
      • 순서 보장 X : Flux#flatMap()
      • 순서 보장 O : Flux#flatMapSequential() / Flux#concatMap()
  • Flux를 합치고 싶은 경우
    • list로 : Flux#collectList() / Flux#collectSortedList()
      (Mono<List>로 반환)
    • map으로 : Flux#collectMap() / Flux#collectMultiMap()
    • 각 element 사이에 function 적용 : reduce()
    • sequence 수 : count()
    • 방출된 순서대로 결합 (기존 순서 X)
      • 같은 타입 : Flux#merge()
      • 다른 타입으로 변환 : Flux#zip() / Flux#zipWith()

시퀀스 엿보기(Peeking into a Sequence)

  • 최종 sequence를 수정하지 않고 다음을 수행
    • 방출 시 : doOnNext()
    • 완료 시 : Flux#doOnComplete(), Mono#doOnSuccess()
    • 에러 종료 시 : doOnError()
    • 취소시 : doOnCancel()
    • subscription 이후 : doOnSubscribe()
    • 요청 시 : doOnRequest()
    • 로그를 보고 싶은 경우 : log()

시퀀스 필터링 (Filtering a Sequence)

  • 중복을 무시
    • 전체 sequence에 대해 : Flux#distinct()
    • 이후 방출된 item에 대해 : Flux#distinctUntillChanged()
  • 주어진 Predicate에 대해
    • 동기 실행 : filter()
    • 비동기 실행 : filterWhen()

비동기 실행

  • 개요
    • WebFlux도 아무런 동작을 하지 않으면 동기적으로 스트림을 처리 -> 하나의 스레드
    • Scheduler를 통해서 동시성(병렬처리)과 실행 순서를 적절하게 관리할 수 있다
  • subscribeOn(Scheduler scheduler)
    • 스트림을 구독할 때 동작을 지정한 스레드 풀에서 수행할 수 있도록 설정하는 operator
    • 즉, publisher가 subscribe() 하는 과정을 별도의 스레드에서 비동기적으로 처리
    • Flux.subscribeOn(Schedulers.single()).subscribe()
      => 지정한 하나의 스레드에서 구독을 수행
    • Flux.subscribeOn(Schedulers.parallel()).subscribe()
      => 최대 CPU Core 개수 만큼의 worker를 만들어서 병렬로 처리
  • publishOn(Scheduler Scheduler)
    • subscriber가 처리하는 부분인 onNext() / onComplete() / onError() 를 지정한 스데르 풀에서 수행
    • onSubscribe() / request() 는 별도의 설정이 없으면 기존 스레드에서 수행

이후 더 많은 operator는 Mono / Flux docs에서 확인

[ Cold / Hot Publisher ]

Cold Publisher

  • 기본적으로 특별하게 Hot을 취급하는 연산자(operator)가 아닌 이상 Flux, MonoCold로 동작
  • 각 구독(subscribe)에 대해 항상 데이터를 새로 생성
  • (중요) 구독하지 않으면 데이터가 생성되지 X

Hot Publisher

  • 구독(subscribe)하기 전에 데이터 스트림동작하게 할 수 있다
  • 구독자 수에 의존하지 않고, 즉시 데이터 게시를 시작할 수 있다
  • 반복적으로 발생하는 데이터 스트림미리 동작해서 공유하면, 중복을 방지할 수 있다
    => Flux.subscribeOn(Schedulers.single()) 을 통해서 하나의 스레드에서 수행하게 가능
    => 그리고, Flux.publishOn() 을 통해서 각 구독자 동작별도의 스레드에서 수행하면, 효율적인 병렬 처리가 가능!

Cold -> Hot 변환

  • Hot Publisher로 변환하려면, ConnectableFlux로 변환하면 된다
  • Flux#publish()
    • Flux -> ConnectableFlux 로 변환하여 반환
  • ConnectableFlux#autoConnect()
    • 최소 구독 개수를 만족하면 자동으로 connect()를 호출하는 역할
  • ConnectableFlux#refCount()
    • 구독자 수를 세서, 하나도 없으면 기존 소스의 스트림 구독을 해제하는 역할
    • 소스(source)가 무한으로 값을 생성할 때 구독을 막기 위한 용도로 사용
    • Flux.publish().refCount()를 추상화한 operator가 => Flux.share()

[ 사용 예시 ]

@GetMapping("/fruit")
  Flux<FruitInfo> getFruit(){
      /* 3개의 List<String> 데이터 준비 */
      final List<String> basket1 = Arrays.asList(new String[]{"kiwi", "orange", "lemon", "orange", "lemon", "kiwi"});
      final List<String> basket2 = Arrays.asList(new String[]{"banana", "lemon", "lemon", "kiwi"});
      final List<String> basket3 = Arrays.asList(new String[]{"strawberry", "orange", "lemon", "grape", "strawberry"});
      final List<List<String>> baskets = Arrays.asList(basket1, basket2, basket3);
      /* List -> Flux */
      final Flux<List<String>> basketFlux = Flux.fromIterable(baskets);

      /* concatMap() 을 통해 하나의 stream씩 순차적으로 실행 */
      Flux<FruitInfo> result = basketFlux.concatMap(basket -> {
          /* Hot Publisher인 source를 미리 만들어서 반복 작업을 최소화하고, 하나의 스레드에서 통일되게 사용되도고 Schedulers.single() 적용 */
          final Flux<String> source = Flux.fromIterable(basket).publish().autoConnect(2).subscribeOn(Schedulers.single());
          /* publishOn() 을 통해서 subscribe의 부분인 onNext() ~ onComplete() 까지를 별도의 스레드에서 수행*/
          final Mono<List<String>> distinctFruits = source.publishOn(Schedulers.parallel()).distinct().collectList().log();
          Mono<Map<String, Long>> countFruitsMono = source.publishOn(Schedulers.parallel())
                  .groupBy(fruit -> fruit)
                  .concatMap(groupFlux -> groupFlux.count()
                          .map(count -> {
                              final Map<String, Long> fruitCount = new LinkedHashMap<>();
                              fruitCount.put(groupFlux.key(), count);
                              return fruitCount;
                          })
                  )
                  .reduce((accumulatedMap, currentMap) -> new LinkedHashMap<String, Long>() {{
                      putAll(accumulatedMap);
                      putAll(currentMap);
                  }}).log();
                  /* Flux.zip() 을 통해서 새로운 타입을 가지는 하나의 객체로 합친다 */
          return Flux.zip(distinctFruits, countFruitsMono, (distinct, count) -> new FruitInfo(distinct, count));
       }).log();
      return result;
  }
profile
Developer & PhotoGrapher
post-custom-banner

0개의 댓글