Flow의 최신 값을 처리하는 방법 중 하나인 collectLatest 함수는 기존의 collect 함수와 동일하게 수집하는 함수이지만 새로운 값이 생산되면 이전 코드를 취소하는 함수이다. collectLatest 함수처럼 함수 이름 뒤에 Latest가 붙은 xxxLatest 함수들은 기존의 xxx 함수와 동일한 동작을 하지만 새로운 값이 생산되면 이전 코드를 취소하는 특징을 가졌다.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(1000) // pretend we are asynchronously waiting 1000 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(3000) // pretend we are processing it for 3000 ms
println("Done $value")
}
}
println("Collected in $time ms")
}
/* 결과 :
(1초 후)
Collecting 1
(1초 후)
Collecting 2
(1초 후)
Collecting 3
(3초 후)
Done 3
Collected in 6052 ms
*/
위 코드의 동작은 다음과 같다.
1. collectLatest를 통해 수집 시작. flow는 Cold Stream이기 때문에 수집이 시작되어야 방출이 시작된다.
2. 1초 delay 후 emit 1
3. Collecting 1 출력 후 3초 delay
4. 그 사이에 1초 delay 후 emit 2
5. 이전 코드가 취소되어 Done 1은 출력되지 않고 새로운 코드가 실행되어 Collecting 2 출력 후 3초 delay
6. 그 사이에 1초 delay 후 emit 3
7. 이전 코드가 취소되어 Done 2는 출력되지 않고 새로운 코드가 실행되어 Collecting 3 출력 후 3초 delay
8. 더이상 방출되는 데이터가 없기 때문에 그대로 3초 후 Done 3 출력
9. 최종적으로 걸린 시간 출력
최종적으로 1초 + 1초 + 1초 + 3초 = 약 6초의 시간이 소요된다.
collectLastest를 사용하면 어떤 스레드를 사용하고 어떤 코루틴을 사용하는지 알고 싶어서 코드를 아래와 같이 바꿔보았다. 달라진 것은 println() 함수를 중간중간 추가시켰다.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(1000) // pretend we are asynchronously waiting 1000 ms
println("Emitting $i, Thread : ${Thread.currentThread().name}, currentCoroutineContext: ${currentCoroutineContext()}")
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.collectLatest {
println("Collecting $it, Thread : ${Thread.currentThread().name}, currentCoroutineContext: ${currentCoroutineContext()}")
delay(3000) // pretend we are processing it for 3000 ms
println("Done $it, Thread : ${Thread.currentThread().name}, currentCoroutineContext: ${currentCoroutineContext()}")
}
}
println("Collected in $time ms")
}
/* 결과 :
Emitting 1, Thread : main @coroutine#2, currentCoroutineContext: [CoroutineId(2), "coroutine#2":ScopeCoroutine{Active}@2ef9b8bc, BlockingEventLoop@5d624da6]
Collecting 1, Thread : main @coroutine#3, currentCoroutineContext: [CoroutineId(3), "coroutine#3":StandaloneCoroutine{Active}@153f5a29, BlockingEventLoop@5d624da6]
Emitting 2, Thread : main @coroutine#2, currentCoroutineContext: [CoroutineId(2), "coroutine#2":ScopeCoroutine{Active}@2ef9b8bc, BlockingEventLoop@5d624da6]
Collecting 2, Thread : main @coroutine#4, currentCoroutineContext: [CoroutineId(4), "coroutine#4":StandaloneCoroutine{Active}@528931cf, BlockingEventLoop@5d624da6]
Emitting 3, Thread : main @coroutine#2, currentCoroutineContext: [CoroutineId(2), "coroutine#2":ScopeCoroutine{Active}@2ef9b8bc, BlockingEventLoop@5d624da6]
Collecting 3, Thread : main @coroutine#5, currentCoroutineContext: [CoroutineId(5), "coroutine#5":StandaloneCoroutine{Active}@ea1a8d5, BlockingEventLoop@5d624da6]
Done 3, Thread : main @coroutine#5, currentCoroutineContext: [CoroutineId(5), "coroutine#5":StandaloneCoroutine{Active}@ea1a8d5, BlockingEventLoop@5d624da6]
Collected in 6141 ms
*/
스레드는 똑같이 메인 스레드를 사용하고 생산(Emitting)하는 쪽의 코루틴은 동일한 것을 확인할 수 있다. 하지만 소비(collecting)하는 쪽의 코루틴이 계속 변경된다. 생산하는 코루틴은 ScopeCoroutine인데 소비하는 코루틴은 StandaloneCoroutine이다. 어떻게 이렇게 되는지 내부 코드를 통해 알아보자.
public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) {
mapLatest(action).buffer(0).collect()
}
collectLatest는 내부적으로 mapLatest 함수를 사용하는데 이를 따라가보자.
@ExperimentalCoroutinesApi
public fun <T, R> Flow<T>.mapLatest(@BuilderInference transform: suspend (value: T) -> R): Flow<R> =
transformLatest { emit(transform(it)) }
@ExperimentalCoroutinesApi
public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> =
ChannelFlowTransformLatest(transform, this)
코드 내부를 계속 따라가다보면 mapLatest -> transformLatest를 통해 최종적으로 ChannelFlowTransformLatest 클래스를 확인할 수 있다.
internal class ChannelFlowTransformLatest<T, R>(
private val transform: suspend FlowCollector<R>.(value: T) -> Unit,
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, R>(flow, context, capacity, onBufferOverflow) {
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<R> =
ChannelFlowTransformLatest(transform, flow, context, capacity, onBufferOverflow)
override suspend fun flowCollect(collector: FlowCollector<R>) {
assert { collector is SendingCollector } // So cancellation behaviour is not leaking into the downstream
coroutineScope {
var previousFlow: Job? = null
flow.collect { value ->
previousFlow?.apply {
cancel(ChildCancelledException())
join()
}
// Do not pay for dispatch here, it's never necessary
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
collector.transform(value)
}
}
}
}
}
여기서 flowCollect 함수가 바로 핵심 로직이다. coroutineScope 함수가 내부적으로 ScopeCoroutine을 생성한다. 그리고 값을 방출할 때마다 flow.collect 블록 내부의 코드가 실행된다.
초깃값은 null이기 때문에 cancel과 join이 실행되지 않는다. 그리고 launch 함수를 통해 previousFlow가 초기화된다. launch 함수는 내부적으로 StandaloneCoroutine을 생성한다.
{
println("Collecting $it, Thread : ${Thread.currentThread().name}, currentCoroutineContext: ${currentCoroutineContext()}")
delay(3000) // pretend we are processing it for 3000 ms
println("Done $it, Thread : ${Thread.currentThread().name}, currentCoroutineContext: ${currentCoroutineContext()}")
}
그리고 collector.transform(value)가 실행될 때 예시 코드의 위 수집 코드 블록이 실행된다.
그래서 ScopeCoroutine 내에서 맨처음에 1 방출 -> previousFlow에서 collect.transform(value)를 통해 수집 코드 블록 실행 순으로 코드가 동작한다.
그렇다면 여기서 새로운 값인 2가 방출된다면? -> previousFlow가 null이 아니기 때문에 이전 수집 코드를 실행하던 코루틴이 취소된다. 그리고 join 함수를 통해 취소가 완료될 때까지 기다린다. 그 다음 새로 방출된 값인 2를 통해 수집 코드 블록이 실행된다.
즉 collectLatest 함수에서 내부적으로 사용된 mapLatest 함수는 내부적으로 기존 코루틴이 취소되고 새로운 코루틴을 생성하는 방식으로 이전 작업을 취소하는 것이다.
public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) {
mapLatest(action).buffer(0).collect()
}
이번엔 collectLatest 함수의 buffer(0) 부분에 대해 알고 싶어졌다. buffer에 capacity로 0을 전달하게 되면 이는 capacity 상수 중 RENDEZVOUS를 의미한다.
/**
* Requests a rendezvous channel in the `Channel(...)` factory function — a channel that does not have a buffer.
*/
public const val RENDEZVOUS: Int = 0
설명에 따르면 버퍼를 가지지 않는 채널(Channel)을 위해 사용되는 상수이다. 데이터를 방출하는 코루틴과 수집하는 코루틴 사이의 통신을 채널을 통해 진행하는데, 버퍼를 가지지 않는 채널을 사용하는 것이다.
@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.mapLatest(
transform = {
println("Collecting $it, Thread : ${Thread.currentThread().name}, currentCoroutineContext: ${currentCoroutineContext()}")
delay(3000) // pretend we are processing it for 3000 ms
println("Done $it, Thread : ${Thread.currentThread().name}, currentCoroutineContext: ${currentCoroutineContext()}")
}
)
.collect()
}
println("Collected in $time ms")
}
사실 buffer(0)를 사용하는 것과 사용하지 않는 것의 차이를 알아보기 위해 코드를 위와 같이 mapLatest()를 직접 사용하면서 buffer(0)를 호출하지 않는 것으로 바꿔보았는데 출력되는 결과는 기존과 같았다. 그래서 안타깝게도 명확히 차이를 알 수는 없었지만 차이를 알 수 있는 다른 예시를 알게 되면 추후 추가하려 한다.
지금 상황에서 buffer(0)를 호출한 이유를 추측해보자면 아마도 이 또한 최신값을 수집하기 위해서라고 생각한다. ChannelFlowTransformLatest 클래스의 flowCollect 함수 내부를 살펴보면서 기존 코루틴이 취소되고 새로운 코루틴이 생성되는 과정을 살펴보았는데 사실 이건 mapLatest 함수의 내부 동작이다. mapLatest는 사실 변환하는 함수인 것이지, 최종적인 수집까지 이루어지는 함수는 아니다.
@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.mapLatest(
transform = {
// ...
}
)
}
println("Collected in $time ms")
}
/* 결과
Collected in 5 ms
*/
그래서 위 코드처럼 mapLatest()만 사용하고 .collect()를 호출하지 않으면 값의 수집이 전혀 이루어지지 않는다. (Flow는 Cold Stream이어서 방출도 이루어지지 않는다.)
그래서 .buffer(0).collect()를 호출함으로써 값을 수집할 때도 최신값만 수집되도록 하기 위한 것이라고 생각한다. buffer(0)를 호출하지 않는다면 채널 버퍼의 기본 용량인 64가 적용된다.
// Channel.kt
internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME,
64, 1, UNLIMITED - 1
)
그래서 만약 방출된 값이 버퍼에 쌓인다면 최신값이 아닌 이전의 값을 제공받을 수도 있다. 그래서 그러한 예상치 못한 가능성을 차단하기 위해서 버퍼를 0으로 설정한 거라고 생각한다.
collectLatest는 지금까지의 예시에서 볼 수 있듯이 방출과 수집이 다른 코루틴에서 이루어지기 때문에 일단 컨텍스트 보존은 지켜지지 않는다는 것은 확실하다.
그런데 collectLatest가 순차적으로 처리되는가에 대해서는 좀 애매한 것 같다.
일반적인 순차처리는
1. 데이터 생산 -> 2. 소비 -> 3. 소비 완료 후 새로운 데이터 생산
이러한 방식으로 이루어지는데 collectLatest는 새로운 데이터가 생산될 때 이전 소비 작업이 취소되는 방식이기 때문이다.
그래서 동작하는 것을 보면 순차적으로 처리하는 것처럼 보이지만, 일반적인 순차처리 방식은 아니라고 생각한다.