
Flow Flattening이란 여러 Flow를 하나의 Flow로 만들어준다.
종류에는 flatMapConcat, flatMapMerge, flatMapLatest가 있다.
flatMapConcat에 가장 큰 특징은 순서와 데이터를 보장해준다는 점이다.
val flowA = flow {
emit("A")
emit("B")
emit("C")
}
val flowB = flow {
emit(1)
emit(2)
emit(3)
}
fun main() {
runBlocking {
flowA
.flatMapConcat{ a -> flowB.map { b -> "$a - $b" } }
.collect { println(it)}
}
}
/**
A - 1
A - 2
A - 3
B - 1
B - 2
B - 3
C - 1
C - 2
C - 3
*/
2개의 플로우가 있고 flatMapConcat을 이용하여 flowA,flowB를 하나의 flow로 만든 결과값이다. 순서와 데이터가 모두 나오는 걸 볼 수 있다.
순서는 어떻게 보장될까?
내부 코드를 보면 이해할 수 있다. emitAll인 부분을 보면 현재 flow를 모두 방출한 후 다음 Flow로 넘어가는 걸 볼 수 있다. 여기서 순서와 데이터가 보장된다.
하지만 단점이 있다. 각 flow에 딜레이가 있으면 모두 방출을 하는데 시간이 많이 소요된다. 그래서 최신 데이터를 받기엔 어려움이 있다.
flatMapMerge는 위에 말한 단점을 해결할 수 있다.
이번엔 위와 다르게 flowA엔 딜레이 0.5초를 줬고 flowB엔 1초를 줘봤다.
val flowA = flow {
emit("A")
delay(500)
emit("B")
delay(500)
emit("C")
}
val flowB = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}
fun main() {
runBlocking {
flowA
.flatMapMerge { a -> flowB.map { b -> "$a - $b" } }
.collect { println(it)}
}
}
/**
A - 1
B - 1
C - 1
A - 2
B - 2
C - 2
A - 3
B - 3
C - 3
*/
아까와 다르게 순서가 보장되지 않는 걸 볼 수 있다.
한마디로 2개의 플로우가 병렬적으로 수집된다고 보면 된다.
내부코드를 봐보자

flatMapConcat과 비슷해보이지만 concurrency라는 파라미터가 추가됐다.
flattenMerge코드를 들어가보면 concurrency가 1일 때는 flattenConcat과 같은 동작을 하는 걸 볼 수 있다. 즉, concurrency가 1일 때는 직렬적으로 동작을 하고 그게 flatMapConcat이라는 것도 알 수 있다.
그럼 1이 아닐때 동작하는 ChannelFlowMerge 부분도 살펴보자

ChannelFlowMerge#collectTo 핵심 로직만 가져왔다.
해당 로직을 보면 Semaphore에 concurrency값을 넣어 설정해준다.
*Semaphore는 동시에 접근할 수 있는 스레드를 제한하는 객체라고 보면 된다.
그 뒤 flow만큼 Coroutine을 생성하여 수집하여 채널에 넣어주고 있다.
inner =
#1 flowB.map { b -> "A - $b" }
#2 flowB.map { b -> "B - $b" }
#3 flowB.map { b -> "C - $b" }
inner는 사실상 저런 값을 가진다고 생각하면 된다.
저 3개를 collect하는 코루틴을 순차적으로 생성하여 emit을 하고 downstream에선 channel에서 값을 받아 쓴다.
collect와 collectLatest의 차이점을 알면 flatMapLatest도 쉽게 이해할 수 있을 것이다.
collectLatest도 값을 수집할 때 순서대로 수집하되 수집하는 도중 최신 값이 수집되면 취소하고 최신값을 바로 수집하는 동작 원리랑 똑같다고 보면 된다.
그래서 특징은 순서는 보장되지만 데이터가 보장이 안된다는 점이다.
val flowA = flow {
emit("A")
delay(500)
emit("B")
delay(500)
emit("C")
}
val flowB = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}
fun main() {
runBlocking {
flowA
.flatMapLatest { a -> flowB.map { b -> "$a - $b" } }
.collect { println(it)}
}
}
/**
A - 1
B - 1
C - 1
C - 2
C - 3
*/
FlowA에서 A의 flow를 방출을 하고 있을 때 B의 flow가 시작되면 A의 플로우는 중지하고 B의 flow가 시작하게 된다. 그래서 A, B모두 1밖에 방출을 못했다.
ChannelFlowTransformLatest#flowCollect라는 로직을 가져왔는데 이 로직을 보면 이전 flow의 job을 취소하고 최신 flow로 collect를 하는 로직을 볼 수 있다.