public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper)
이 Flux
에서 비동기적으로 방출되는 요소를 Publishers
로 변환한 다음, 병합을 통해 이러한 inner publishers
를 single Flux
스로 평탄화하여 상호 연결할 수 있다.
이 오퍼레이터는 다른 이너를 기다리지 않고 미리 구독한다.
이 연산자는 내부 요소가 도착할 때 평활화되므로 원래 순서가 반드시 유지되지 않는다.
이 연산자를 사용하면 서로 다른 내부의 값을 인터리빙할 수 있습니다(내부 시퀀스를 병합하는 것과 유사).
@Test
void flatMapOperator() {
Shop shop = Flux.just(
Shop.builder().shopId(1).size(0).build(), // 800ms
Shop.builder().shopId(2).size(5).build(), // 200ms
Shop.builder().shopId(3).size(5).build(), // 100ms
Shop.builder().shopId(4).size(0).build() // 0ms
)
.flatMap(this::doSomethingAsync) // delay 발생시킴
.takeUntil(Shop::hasSize)
.doOnNext(n -> log.info("Done {}", n))
.blockLast();
System.out.println("shop = " + shop);
}
4개의 요소 모두 비동기로 실행하기 때문에 다같이 실행된다.
hasSize
조건에 만족하는 id=3
이 지연시간 100ms 로 id=2
보다 빠르기 때문에 가장 먼저 가져온다.
총 소요시간은 195ms 걸렸다.
테스트 실행시간이지만 Flux 연산 속도와 크게 차이나지 않는다. 10ms이내
public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper)
이 Flux
에서 비동기적으로 방출되는 요소를 Publishers
로 변환한 다음, 이러한 inner publishers
를 single Flux
로 병합하되 소스 요소의 순서대로 병합합니다.
이 오퍼레이터는 다른 이너를 기다리지 않고 미리 구독한다. (flatMap과 동일)
이 연산자는 앞쪽 이너의 모든 요소가 방출될 때까지 뒤쪽 이너의 요소를 큐에 대기시켜 소스 순서와 일치하는 순서로 전체적으로 이너 시퀀스를 방출합니다.
이 연산자는 서로 다른 내부의 값이 인터리빙되지 않도록 합니다(concatMap과 비슷해 보이지만, 그렇지 않으면 인터리빙되었을 값의 큐에 대기열이 생기기 때문입니다).
@Test
void flatMapOperator() {
Shop shop = Flux.just(
Shop.builder().shopId(1).size(0).build(), // 800ms
Shop.builder().shopId(2).size(5).build(), // 200ms
Shop.builder().shopId(3).size(5).build(), // 100ms
Shop.builder().shopId(4).size(0).build() // 0ms
)
.flatMapSequential(this::doSomethingAsync) // delay 발생시킴
.takeUntil(Shop::hasSize)
.doOnNext(n -> log.info("Done {}", n))
.blockLast();
System.out.println("shop = " + shop);
}
4개의 요소 순차적으로 실행된다.
id=2
과 id=3
이 hasSize
조건에 만족한다. 순서가 보장되기 때문에 최종적으로 id=2
을 가져온다.
총 소요시간은 901ms 걸렸다.
public final <V> Flux<V> concatMap(Function<? super T,? extends Publisher<? extends V>> mapper)
이 Flux
에서 비동기적으로 방출되는 요소를 Publishers
로 변환한 다음, 연결을 사용하여 순서를 유지하면서 순차적으로 이러한 inner publishers
를 single Flux
로 평탄화합니다.
이 오퍼레이터는 하나의 이너가 완료될 때까지 기다렸다가 다음 이너를 생성하고 구독합니다.
이 연산자는 각 소스 요소의 안쪽을 순차적으로 연결하여 소스 요소와 동일한 순서를 자연스럽게 유지합니다.
이 연산자는 서로 다른 내부의 값이 인터리빙(연결)되지 않도록 합니다.
@Test
void flatMapOperator() {
Shop shop = Flux.just(
Shop.builder().shopId(1).size(0).build(), // 800ms
Shop.builder().shopId(2).size(5).build(), // 200ms
Shop.builder().shopId(3).size(5).build(), // 100ms
Shop.builder().shopId(4).size(0).build() // 0ms
)
.concatMap(this::doSomethingAsync) // delay 발생시킴
.takeUntil(Shop::hasSize)
.doOnNext(n -> log.info("Done {}", n))
.blockLast();
System.out.println("shop = " + shop);
}
concatMap 역시 순차적으로 실행되지만 모든 요소를 실행하지 않고 완료될때까지 기다린다.
id=1
은 hasSize
조건에 만족하지 않아 다음 요소(id=2
)를 실행한다.
id=2
이 조건에 만족하기 때문에 방출하고 종료한다.
총 소요시간은 1107ms 걸렸다.
👉 테스트 코드는 깃헙에 있어요