[kotlin] Flow Flattening에 종류를 알아보자

우발자·2025년 9월 9일
1

Flow Flattening이란 여러 Flow를 하나의 Flow로 만들어준다.

종류에는 flatMapConcat, flatMapMerge, flatMapLatest가 있다.


flatMapConcat

flatMapConcat에 가장 큰 특징은 순서와 데이터를 보장해준다는 점이다.

val flowA = flow {
    emit("A")
    emit("B")
    emit("C")
}

val flowB = flow {
    emit(1)
    emit(2)
    emit(3)

}

fun main() {
    runBlocking {
        flowA
            .flatMapConcat{ a -> flowB.map { b -> "$a - $b" } }
            .collect { println(it)}
    }
}

/**
A - 1
A - 2
A - 3
B - 1
B - 2
B - 3
C - 1
C - 2
C - 3
*/

2개의 플로우가 있고 flatMapConcat을 이용하여 flowA,flowB를 하나의 flow로 만든 결과값이다. 순서와 데이터가 모두 나오는 걸 볼 수 있다.
순서는 어떻게 보장될까?
내부 코드를 보면 이해할 수 있다. emitAll인 부분을 보면 현재 flow를 모두 방출한 후 다음 Flow로 넘어가는 걸 볼 수 있다. 여기서 순서와 데이터가 보장된다.

하지만 단점이 있다. 각 flow에 딜레이가 있으면 모두 방출을 하는데 시간이 많이 소요된다. 그래서 최신 데이터를 받기엔 어려움이 있다.


flatMapMerge

flatMapMerge는 위에 말한 단점을 해결할 수 있다.

이번엔 위와 다르게 flowA엔 딜레이 0.5초를 줬고 flowB엔 1초를 줘봤다.

val flowA = flow {
    emit("A")
    delay(500)
    emit("B")
    delay(500)
    emit("C")
}

val flowB = flow {
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
}

fun main() {
    runBlocking {
        flowA
            .flatMapMerge { a -> flowB.map { b -> "$a - $b" } }
            .collect { println(it)}
    }
}

/**
A - 1
B - 1
C - 1
A - 2
B - 2
C - 2
A - 3
B - 3
C - 3
*/

아까와 다르게 순서가 보장되지 않는 걸 볼 수 있다.
한마디로 2개의 플로우가 병렬적으로 수집된다고 보면 된다.

내부코드를 봐보자

flatMapConcat과 비슷해보이지만 concurrency라는 파라미터가 추가됐다.
flattenMerge코드를 들어가보면 concurrency가 1일 때는 flattenConcat과 같은 동작을 하는 걸 볼 수 있다. 즉, concurrency가 1일 때는 직렬적으로 동작을 하고 그게 flatMapConcat이라는 것도 알 수 있다.

그럼 1이 아닐때 동작하는 ChannelFlowMerge 부분도 살펴보자


ChannelFlowMerge#collectTo 핵심 로직만 가져왔다.
해당 로직을 보면 Semaphore에 concurrency값을 넣어 설정해준다.

*Semaphore는 동시에 접근할 수 있는 스레드를 제한하는 객체라고 보면 된다.

그 뒤 flow만큼 Coroutine을 생성하여 수집하여 채널에 넣어주고 있다.

inner = 
  #1 flowB.map { b -> "A - $b" }
  #2 flowB.map { b -> "B - $b" }
  #3 flowB.map { b -> "C - $b" }

inner는 사실상 저런 값을 가진다고 생각하면 된다.
저 3개를 collect하는 코루틴을 순차적으로 생성하여 emit을 하고 downstream에선 channel에서 값을 받아 쓴다.


flatMapLatest

collect와 collectLatest의 차이점을 알면 flatMapLatest도 쉽게 이해할 수 있을 것이다.

collectLatest도 값을 수집할 때 순서대로 수집하되 수집하는 도중 최신 값이 수집되면 취소하고 최신값을 바로 수집하는 동작 원리랑 똑같다고 보면 된다.
그래서 특징은 순서는 보장되지만 데이터가 보장이 안된다는 점이다.

val flowA = flow {
    emit("A")
    delay(500)
    emit("B")
    delay(500)
    emit("C")
}

val flowB = flow {
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
}

fun main() {
    runBlocking {
        flowA
            .flatMapLatest { a -> flowB.map { b -> "$a - $b" } }
            .collect { println(it)}
		
    }
}

/**
A - 1
B - 1
C - 1
C - 2
C - 3
*/

FlowA에서 A의 flow를 방출을 하고 있을 때 B의 flow가 시작되면 A의 플로우는 중지하고 B의 flow가 시작하게 된다. 그래서 A, B모두 1밖에 방출을 못했다.
ChannelFlowTransformLatest#flowCollect라는 로직을 가져왔는데 이 로직을 보면 이전 flow의 job을 취소하고 최신 flow로 collect를 하는 로직을 볼 수 있다.


요약

flatMapConcat

  1. 동기적으로 flow를 수집할 수 있게 그만큼 수집되는 데 지연될 수 있다.
  2. 순서 보장이 된다.
  3. 데이터 보장이 된다.

flatMapMerge

  1. 각 flow를 병렬적으로 방출해서 오래되는 작업이 있더라도 지연되지 않는다.
  2. 그대신 순서 보장이 안된다.
  3. 데이터 보장은 된다.

flatMapLatest

  1. flow를 수집하는 도중에 새로운 flow가 방출되면 현재 flow는 취소되고 새로운 flow가 방출된다.
  2. 순차적으로 방출은 된다. (순서 보장이라고 하기엔 유실될 수 있어서 애매...)
  3. 데이터 보장은 안된다.
profile
어제보다 나은 개발자가 되자

0개의 댓글