Flattening은 여러개의 flow가 합쳐져서 새로운 flow를 생성하는 것을 말합니다. 이번에 저는 하나의 ViewModel에서 동시에 여러 곳에서 비동기 데이터를 가져와서 조합하게 될때 사용할 flow를 찾기 위해 이 flattening operator의 존재를 알게 되었습니다. 예제를 만들다가 더 괜찮은 코드를 생각하지 못해서 예제코드를 가져왔는데 한눈에 파악하기 쉽게 합쳐 놓고 리턴하였습니다.
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>
): Flow<R> =
map(transform).flattenConcat()
flatMapCat()
은 데이터를 변환하여 flow들을 새로 생성한 후 flow들을 합쳐서 하나의 flow를 만들어냅니다.
fun main() = runBlocking {
flowOf("A", "B", "C").flatMapConcat { value ->
flowOf(1, 2, 3)
.onEach { delay(1000L) }
.map { "${it}_${value}" }
}.collect { value ->
println(value)
}
}
[결과]
(delay 1sec)
1_A
(delay 1sec)
2_A
(delay 1sec)
3_A
(delay 1sec)
1_B
(delay 1sec)
2_B
(delay 1sec)
3_B
(delay 1sec)
1_C
(delay 1sec)
2_C
(delay 1sec)
3_C
flatMapConcat
의 단점으로는 emit이 순차적으로 처리되어 flow를 생성하기 때문에 consumer가 받게 되는 시간이 오래걸립니다. 만약 delay()되는 연산이 존재한다면 producer에서 emit되었다고 해도 delay() 연산이 오래걸릴수록 다음 emit또한 함께 늦어지게 될것입니다.
FlatMapMerge
는 flatMapConcat
의 성능을 개선하여 delay되는 연산이 존재 하더라도 다음 producer의 emit에는 영향을 주지 않습니다.
public fun <T, R> Flow<T>.flatMapMerge(
concurrency: Int = DEFAULT_CONCURRENCY,
transform: suspend (value: T) -> Flow<R>
): Flow<R> =
map(transform).flattenMerge(concurrency)
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}
flatMapMerge()
는 flattenConcat()
의 기능을 하거나 ChannelFlowMerge()
를 사용합니다. channel은 hot stream 이기 때문에 상태가 변경되면 선언과 상관없이 그 즉시 emit하게 됩니다.
suspend fun flatMapMerge() {
flowOf("A", "B", "C").flatMapMerge { value ->
flowOf(1, 2, 3)
.onEach { delay(1000L) }
.map { "${it}_${value}" }
}.collect { value ->
println(value)
}
}
[결과]
(delay 1sec)
1_A
1_B
1_C
(delay 1sec)
2_A
2_B
2_C
(delay 1sec)
3_A
3_B
3_C
가장 최신의 emit 만을 collect하는 operator입니다. flow가 consumer에게 소비되기 전에 데이터가 들어오면 이전 flow가 취소 됩니다.
public inline fun <T, R> Flow<T>
.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
transformLatest { emitAll(transform(it)) }
suspend fun flatMapLatest() {
flowOf("A", "B", "C").flatMapLatest { value ->
flowOf(1, 2, 3)
.onEach { delay(1000L) }
.map { "${it}_${value}" }
}.collect { value ->
println(value)
}
}
[결과]
(delay 1sec)
1_C
(delay 1sec)
2_C
(delay 1sec)
3_C
flatMapConcat은 여러개의 flow를 한개의 flow로 생성해주는 것이고 flatMapMerge는 여러개의 flow들을 병렬로 사용할 수 있는 flow이며 flatMapLatest는 최신 데이터들로 flow를 생성하는 것입니다. 다음으로는 flow 여러개를 가지고 조합하는 combine, zip, merge를 살펴보겠습니다.