
map, flatMap, concat, merge, zip: Flux 흐름을 변환하거나 결합.and: 완료 신호만 모아서 후처리할 때 사용.collectList, collectMap: 전체 결과를 하나로 모아 Mono로 변환할 때 사용.| 오퍼레이터 | 주요 기능 설명 | 반환 타입 | 특징 및 동작 요약 |
|---|---|---|---|
map | 단건 데이터 변환 | Flux<T> → Flux<R> | 입력값에 함수 적용하여 변환 |
flatMap | 내부 시퀀스를 병합 | Flux<T> → Flux<R> | 여러 inner Flux로 분기 후 평탄화. 순서 보장 없음 |
concat | Flux 순차 연결 | Flux<T> | 앞 Flux가 완료되면 다음 Flux 연결 (순서 보장) |
merge | Flux 병렬 병합 | Flux<T> | 여러 Flux 동시에 subscribe (순서 보장 안됨) |
zip | 여러 Flux 병렬 결합 | Flux<TupleN<...>> | 각 Flux에서 하나씩 받아 묶어서 emit |
and | 완료 여부 동기화 | Mono<Void> | 모든 Publisher가 완료되면 onComplete emit |
collectList | 전체 수집 → List | Flux<T> → Mono<List<T>> | 전체 데이터를 List로 수집 |
collectMap | 전체 수집 → Map | Flux<T> → Mono<Map<K,V>> | Key-Value 매핑 후 Map으로 수집 |
map 오퍼레이터는 업스트림에서 emit된 데이터를 mapper function을 사용하여 변환한 후, 이를 다운스트림으로 emit한다.
map 오퍼레이터는 내부에서 에러가 발생하더라도 시퀀스를 종료하지 않고 계속 진행되도록 하는 기능을 지원한다.
Flux.just("a", "b", "c")
.map(String::toUpperCase)
.subscribe(data -> System.out.println("Data: " + data));
flatMap 오퍼레이터를 사용하면 업스트림에서 emit된 하나의 데이터가 Inner Sequence에서 여러 건의 데이터로 변환된다는 것을 알 수 있다.
업스트림에서 emit된 데이터는 각각 Inner Sequence로 들어가며, 이 Inner Sequence에서 변환된 데이터는 평탄화 과정을 거쳐 하나의 시퀀스로 합쳐져 downstream으로 emit된다.
Flux.just("a", "b", "c")
.flatMap(s1 -> Flux.just("1", "2", "3")
.publishOn(Schedulers.parallel())
.map(s2 -> s1 + " " + s2))
.subscribe(data -> System.out.println("Data flat: " + data));
업스트림에서 emit된 데이터 수 × Inner Sequence에서 emit되는 데이터 수만큼 최종적으로 subscriber에게 전달된다.
또한, Inner Sequence를 비동기적으로 실행할 경우 데이터 emit의 순서가 보장되지 않는다.
Data flat: b 1
Data flat: a 1
Data flat: a 2
Data flat: a 3
Data flat: c 1
Data flat: c 2
Data flat: c 3
Data flat: b 2
Data flat: b 3
concat 오퍼레이터는 입력된 Publisher들의 sequence를 순서대로 연결해서 데이터를 순차적으로 emit한다.
특히, 먼저 입력된 Publisher의 sequence가 완료될 때까지 나머지 Publisher는 구독되지 않고 대기한다.
Flux.concat(
Flux.just("A", "B", "C"),
Flux.just("D", "E", "F")
).subscribe(data -> System.out.println("Data: " + data));
Data: A
Data: B
Data: C
Data: D
Data: E
Data: F
merge 오퍼레이터는 파라미터로 입력된 Publisher들의 sequence를 인터리빙(interleaving) 방식으로 병합한다.
merge는 concat과는 달리 먼저 입력된 Publisher의 sequence가 종료될 때까지 대기하지 않으며, 모든 Publisher가 즉시 subscribe 된다.
Flux.merge(
Flux.just("A", "B", "C").delayElements(Duration.ofMillis(300L)),
Flux.just("D", "E", "F").delayElements(Duration.ofMillis(500L))
).subscribe(data -> System.out.println("Data: " + data));
Thread.sleep(2000L);
각 Publisher에서 emit되는 시간에 따라 빠르게 emit된 데이터가 먼저 subscriber에게 전달된다.
Data: A
Data: D
Data: B
Data: C
Data: E
Data: F
zip 오퍼레이터는 여러 Publisher가 emit한 데이터를 결합할 때 사용된다.
모든 Publisher에서 각각 하나씩 데이터를 emit할 때까지 기다렸다가, 이들을 묶어서 subscriber에게 전달한다.
Flux.zip(
Flux.just(1, 2, 3).delayElements(Duration.ofMillis(300L)),
Flux.just(4, 5, 6).delayElements(Duration.ofMillis(500L))
).subscribe(System.out::println);
Thread.sleep(25000L);
각 Flux에서 emit되는 시간이 다르더라도, zip은 쌍(pair)이 맞춰질 때까지 기다렸다가 emit하며, 결과는 Tuple2로 전달된다.
[1,4]
[2,5]
[3,6]
and() 오퍼레이터는 Mono의 Complete Signal과 파라미터로 전달된 Publisher의 Complete Signal을 결합하여, 새로운 Mono<Void>를 반환한다.
즉, Mono와 Flux가 모두 완료되었음을 알리는 신호만 emit하며, 실제 데이터는 전달하지 않는다.
Mono.just("Hello")
.delayElement(Duration.ofSeconds(1))
.doOnNext(data -> System.out.println("#doOnNext data: " + data))
.and(
Flux.just("World", "!")
.delayElements(Duration.ofSeconds(1))
.doOnNext(data -> System.out.println("inner #doOnNext data: " + data))
)
.subscribe(data -> System.out.println("#subscribe data: " + data),
error -> System.out.println("#subscribe error: " + error),
() -> System.out.println("#subscribe completed"));
Thread.sleep(5000);
#doOnNext data: Hello
inner #doOnNext data: World
inner #doOnNext data: !
#subscribe completed
collectList 오퍼레이터는 Flux에서 emit된 모든 데이터를 수집하여 List로 변환한 후, 이를 하나의 Mono로 감싸서 emit한다.
즉, Flux → Mono<List<T>> 로 변환되며, 단일 아이템만 emit된다.
Flux.range(1, 10)
.collectList()
.subscribe(data -> System.out.println("Data: " + data));
“Mono를 반환한다”는 말은 단일 값을 담고 있는 Publisher를 의미하며, 여기서는
List<Integer>하나를 emit한다는 의미이다.
Data: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
collectMap 오퍼레이터는 Flux에서 emit된 각 데이터를 기반으로 Key와 Value를 생성한 후, 이를 Map의 형태로 수집하고, Mono<Map<K, V>>로 반환한다.
업스트림이 비어 있으면 빈 Map이 emit된다.
Flux.range(0, 26)
.collectMap(key -> key, value -> (char) (value + 65))
.subscribe(System.out::println);
{0=A, 1=B, 2=C, 3=D, 4=E, 5=F, 6=G, 7=H, 8=I, 9=J, 10=K, 11=L, 12=M, 13=N, 14=O, 15=P, 16=Q, 17=R, 18=S, 19=T, 20=U, 21=V, 22=W, 23=X, 24=Y, 25=Z}