[Kotlin] flow Flattening Operators

sundays·2023년 4월 19일
0

coroutine

목록 보기
5/7

Flattening은 여러개의 flow가 합쳐져서 새로운 flow를 생성하는 것을 말합니다. 이번에 저는 하나의 ViewModel에서 동시에 여러 곳에서 비동기 데이터를 가져와서 조합하게 될때 사용할 flow를 찾기 위해 이 flattening operator의 존재를 알게 되었습니다. 예제를 만들다가 더 괜찮은 코드를 생각하지 못해서 예제코드를 가져왔는데 한눈에 파악하기 쉽게 합쳐 놓고 리턴하였습니다.

FlatMapConcat

public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>
): Flow<R> =
    map(transform).flattenConcat()

flatMapCat()은 데이터를 변환하여 flow들을 새로 생성한 후 flow들을 합쳐서 하나의 flow를 만들어냅니다.

예시 코드

fun main() = runBlocking {
    flowOf("A", "B", "C").flatMapConcat { value ->
        flowOf(1, 2, 3)
            .onEach { delay(1000L) }
            .map { "${it}_${value}" }
    }.collect { value ->
        println(value)
    }
}

[결과]
(delay 1sec)
1_A
(delay 1sec)
2_A
(delay 1sec)
3_A
(delay 1sec)
1_B
(delay 1sec)
2_B
(delay 1sec)
3_B
(delay 1sec)
1_C
(delay 1sec)
2_C
(delay 1sec)
3_C

flatMapConcat의 단점으로는 emit이 순차적으로 처리되어 flow를 생성하기 때문에 consumer가 받게 되는 시간이 오래걸립니다. 만약 delay()되는 연산이 존재한다면 producer에서 emit되었다고 해도 delay() 연산이 오래걸릴수록 다음 emit또한 함께 늦어지게 될것입니다.

FlatMapMerge

FlatMapMergeflatMapConcat의 성능을 개선하여 delay되는 연산이 존재 하더라도 다음 producer의 emit에는 영향을 주지 않습니다.

public fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R> =
    map(transform).flattenMerge(concurrency)
    
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
    require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
    return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}

flatMapMerge()flattenConcat()의 기능을 하거나 ChannelFlowMerge()를 사용합니다. channel은 hot stream 이기 때문에 상태가 변경되면 선언과 상관없이 그 즉시 emit하게 됩니다.

예시 코드

suspend fun flatMapMerge() {
    flowOf("A", "B", "C").flatMapMerge { value ->
        flowOf(1, 2, 3)
            .onEach { delay(1000L) }
            .map { "${it}_${value}" }
    }.collect { value ->
        println(value)
    }
}

[결과]
(delay 1sec)
1_A
1_B
1_C
(delay 1sec)
2_A
2_B
2_C
(delay 1sec)
3_A
3_B
3_C

FlatMapLatest

가장 최신의 emit 만을 collect하는 operator입니다. flow가 consumer에게 소비되기 전에 데이터가 들어오면 이전 flow가 취소 됩니다.

public inline fun <T, R> Flow<T>
.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
    transformLatest { emitAll(transform(it)) }

예시 코드

suspend fun flatMapLatest() {
    flowOf("A", "B", "C").flatMapLatest { value ->
        flowOf(1, 2, 3)
            .onEach { delay(1000L) }
            .map { "${it}_${value}" }
    }.collect { value ->
        println(value)
    }
}

[결과]
(delay 1sec)
1_C
(delay 1sec)
2_C
(delay 1sec)
3_C

결론

flatMapConcat은 여러개의 flow를 한개의 flow로 생성해주는 것이고 flatMapMerge는 여러개의 flow들을 병렬로 사용할 수 있는 flow이며 flatMapLatest는 최신 데이터들로 flow를 생성하는 것입니다. 다음으로는 flow 여러개를 가지고 조합하는 combine, zip, merge를 살펴보겠습니다.

Reference

profile
develop life

0개의 댓글