reactor operator 참조
: https://projectreactor.io/docs/core/release/reference/#which-operator
- 흐름
=>Publisher → [Data 1] → OP1 → [Data 2] → OP2 → [Data 3] → Subscriber
- WebFlux의 기반인 Reactor에서는
Flux
와Mono
라는Publisher
를 사용- Publisher에서 데이터를 변경하며, 새로운 Publisher / Subscriber로 생성하는 도구
- Flux 와 Mono의 많은 operator를 통해 데이터를 조작할 수 있다
ex)just()
/range()
/flatMap()
/groupBy()
등
시퀀스 생성 시 (Creating a New Sequence)
(
#
앞에Mono
혹은Flux
가 없다면둘다 가능
한 것)
- just
just(T data)
- null이 존재할 경우 T :
Mono#justOrEmpty(T data)
- iterate 관련
- array :
Flux#fromArray(T[] array)
- collection 또는 iterable :
Flux#fromIterable(Iterable it)
- 정수 범위 :
Flux#range(int start, int count)
- 각 Subscription에 제공된 Stream :
Flux#fromStream(Supplier<Stream>)
기존 시퀀스를 변경하는 경우(Transforming an Existing Sequence)
- 기존 데이터 변경
- 1 to 1 :
Flux#map()
- 1 to N
- 순서 보장 X :
Flux#flatMap()
- 순서 보장 O :
Flux#flatMapSequential()
/Flux#concatMap()
- Flux를 합치고 싶은 경우
- list로 :
Flux#collectList()
/Flux#collectSortedList()
(Mono<List>로 반환
)- map으로 :
Flux#collectMap()
/Flux#collectMultiMap()
- 각 element 사이에 function 적용 :
reduce()
- sequence 수 :
count()
- 방출된 순서대로 결합 (기존 순서 X)
- 같은 타입 :
Flux#merge()
- 다른 타입으로 변환 :
Flux#zip()
/Flux#zipWith()
시퀀스 엿보기(Peeking into a Sequence)
- 최종 sequence를 수정하지 않고 다음을 수행
- 방출 시 :
doOnNext()
- 완료 시 :
Flux#doOnComplete()
,Mono#doOnSuccess()
- 에러 종료 시 :
doOnError()
- 취소시 :
doOnCancel()
- subscription 이후 :
doOnSubscribe()
- 요청 시 :
doOnRequest()
- 로그를 보고 싶은 경우 :
log()
시퀀스 필터링 (Filtering a Sequence)
- 중복을 무시
- 전체 sequence에 대해 :
Flux#distinct()
- 이후 방출된 item에 대해 :
Flux#distinctUntillChanged()
- 주어진 Predicate에 대해
- 동기 실행 :
filter()
- 비동기 실행 :
filterWhen()
비동기 실행
- 개요
WebFlux
도 아무런 동작을 하지 않으면 동기적으로 스트림을 처리 ->하나의 스레드
Scheduler
를 통해서 동시성(병렬처리
)과 실행 순서를 적절하게 관리할 수 있다
subscribeOn(Scheduler scheduler)
- 스트림을 구독할 때 동작을 지정한 스레드 풀에서 수행할 수 있도록 설정하는 operator
- 즉, publisher가 subscribe() 하는 과정을 별도의 스레드에서 비동기적으로 처리
Flux.subscribeOn(Schedulers.single()).subscribe()
=> 지정한 하나의 스레드에서 구독을 수행Flux.subscribeOn(Schedulers.parallel()).subscribe()
=> 최대 CPU Core 개수 만큼의 worker를 만들어서 병렬로 처리
publishOn(Scheduler Scheduler)
subscriber
가 처리하는 부분인onNext()
/onComplete()
/onError()
를 지정한 스데르 풀에서 수행onSubscribe()
/request()
는 별도의 설정이 없으면기존 스레드
에서 수행이후 더 많은 operator는 Mono / Flux docs에서 확인
Cold Publisher
- 기본적으로 특별하게 Hot을 취급하는 연산자(
operator
)가 아닌 이상Flux
,Mono
는Cold
로 동작- 각 구독(
subscribe
)에 대해 항상데이터를 새로 생성
- (
중요
) 구독하지 않으면 데이터가 생성되지 X
Hot Publisher
- 구독(
subscribe
)하기 전에데이터 스트림
을동작
하게 할 수 있다- 구독자 수에 의존하지 않고,
즉시 데이터 게시
를 시작할 수 있다반복적으로 발생하는 데이터 스트림
을미리 동작
해서 공유하면,중복을 방지
할 수 있다
=>Flux.subscribeOn(Schedulers.single())
을 통해서하나의 스레드
에서 수행하게 가능
=> 그리고,Flux.publishOn()
을 통해서 각구독자 동작
을별도의 스레드
에서 수행하면,효율적인 병렬 처리
가 가능!
Cold -> Hot 변환
Hot Publisher
로 변환하려면,ConnectableFlux
로 변환하면 된다
Flux#publish()
- Flux -> ConnectableFlux 로 변환하여 반환
ConnectableFlux#autoConnect()
- 최소 구독 개수를 만족하면 자동으로 connect()를 호출하는 역할
ConnectableFlux#refCount()
- 구독자 수를 세서, 하나도 없으면 기존 소스의 스트림 구독을 해제하는 역할
- 소스(source)가 무한으로 값을 생성할 때 구독을 막기 위한 용도로 사용
Flux.publish().refCount()
를 추상화한 operator가 =>Flux.share()
@GetMapping("/fruit") Flux<FruitInfo> getFruit(){ /* 3개의 List<String> 데이터 준비 */ final List<String> basket1 = Arrays.asList(new String[]{"kiwi", "orange", "lemon", "orange", "lemon", "kiwi"}); final List<String> basket2 = Arrays.asList(new String[]{"banana", "lemon", "lemon", "kiwi"}); final List<String> basket3 = Arrays.asList(new String[]{"strawberry", "orange", "lemon", "grape", "strawberry"}); final List<List<String>> baskets = Arrays.asList(basket1, basket2, basket3); /* List -> Flux */ final Flux<List<String>> basketFlux = Flux.fromIterable(baskets); /* concatMap() 을 통해 하나의 stream씩 순차적으로 실행 */ Flux<FruitInfo> result = basketFlux.concatMap(basket -> { /* Hot Publisher인 source를 미리 만들어서 반복 작업을 최소화하고, 하나의 스레드에서 통일되게 사용되도고 Schedulers.single() 적용 */ final Flux<String> source = Flux.fromIterable(basket).publish().autoConnect(2).subscribeOn(Schedulers.single()); /* publishOn() 을 통해서 subscribe의 부분인 onNext() ~ onComplete() 까지를 별도의 스레드에서 수행*/ final Mono<List<String>> distinctFruits = source.publishOn(Schedulers.parallel()).distinct().collectList().log(); Mono<Map<String, Long>> countFruitsMono = source.publishOn(Schedulers.parallel()) .groupBy(fruit -> fruit) .concatMap(groupFlux -> groupFlux.count() .map(count -> { final Map<String, Long> fruitCount = new LinkedHashMap<>(); fruitCount.put(groupFlux.key(), count); return fruitCount; }) ) .reduce((accumulatedMap, currentMap) -> new LinkedHashMap<String, Long>() {{ putAll(accumulatedMap); putAll(currentMap); }}).log(); /* Flux.zip() 을 통해서 새로운 타입을 가지는 하나의 객체로 합친다 */ return Flux.zip(distinctFruits, countFruitsMono, (distinct, count) -> new FruitInfo(distinct, count)); }).log(); return result; }