flow의 upstream과 downstream을 다른 coroutine context에서 실행하는건 전체 작업시간을 단축하는데 도움이 될 수 있다고 한다. 어떻게?
우선 하나의 context에서 모두 수행될 때를 살펴보자
private fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
flow 빌더 body에서 100ms collect body에서 300ms씩 총 3번의 데이터를 처리하는 과정이다.
1
2
3
Collected in 1282 ms
1200ms 정도의 시간이 걸리게 된다.
컴퓨터 구조를 배우면 파이프라인에 대해서 알게 된다. 그러면 collect body를 처리하는동안 emit body가 쉬고있다는게 비효율적이란 걸 금방 깨달을 수 있다. 그래서 collect body가 수행되는 동안 다른 context를 통해 flow body를 실행하여 값을 저장해두는 것이 buffer()다.
val time = measureTimeMillis {
simple()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
time을 buffer()를 사용해서 위처럼 수정했다.
1
2
3
Collected in 1071 ms
예상할수 있듯이 200ms의 시간을 단축한 것을 알 수 있다. 재밌다 ㅎㅎ
앞서 살펴봤던 flowOn 함수에서도 동일한 매커니즘을 사용한다고 한다.
방출된 값중 가장 최신의 값만 보여줄 필요가 있을때 중간값은 건너뛰는게 성능에 더 도움이 될 때가 있다. 이때 사용하는 게 Conflation이다. (아마 실시간으로 보여줘야하는 정보들에 해당되지 않을까?)
val time = measureTimeMillis {
simple()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
conflate()를 통해 가장 최신값만 받아 처리하게 되면
1
3
Collected in 765 ms
1을 처리하는 동안에 2와 3이 방출되었고 가장 최신값인 3만 처리해서 표시한다.
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
conflate는 위처럼 간단하게 구현되어 있다. 설명에는 CONFLATED는 buffer 호출에 설정된 여떤 capacity보다 우선 적용된다고 한다. buffer를 한번 볼까?
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) {
"Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity"
}
require(capacity != CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) {
"CONFLATED capacity cannot be used with non-default onBufferOverflow"
}
// desugar CONFLATED capacity to (0, DROP_OLDEST)
var capacity = capacity
var onBufferOverflow = onBufferOverflow
if (capacity == CONFLATED) {
capacity = 0
onBufferOverflow = BufferOverflow.DROP_OLDEST
}
// create a flow
return when (this) {
is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
}
}
CONFLATED를 우선적으로 처리하는 걸 볼 수 있다. 그리고 재미있는건 StateFlow에는 CONFLATED가 이미 적용되어 있는 것처럼 동작해서 CONFLATED를 적용해도 영향이 없다고 한다.
Conflation은 emitter와 collector가 둘다 느릴때 사용할 수 있는 하나의 방법이고 또 다른 방법은 수행중인 collector를 취소시켜버리고 새로운 value가 방출될 때마다 새로 collector 작업을 수행하는 것이다.
이 방법은 xxxLatest 친구들이 사용하는데, xxx 작업은 똑같이 수행하지만 새로운 value를 받으면 기존 작업중이던 coroutine은 취소시키고 새로 작업을 수행한다.
val time = measureTimeMillis {
simple()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
time을 collectLatest를 사용해서 최신값만 처리하도록 해봤다.
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 660 ms
1,2,3을 emit하는데 걸리는 시간 300ms 그리고 마지막 3을 처리하는 300ms를 더해서 총 600ms 정도 걸리게 됐다.
StateFlow가 기본적으로 CONFLATED가 적용된 것처럼 동작한다는 사실을 알게됐다. 즉 상태를 빠르게 방출하면 가장 최신의 상태를 처리해서 보여주는 원리를 알 수 있었다.