[kotlin] flow buffering

문돌이 개발자·2025년 1월 5일

buffering

flow의 upstream과 downstream을 다른 coroutine context에서 실행하는건 전체 작업시간을 단축하는데 도움이 될 수 있다고 한다. 어떻게?

우선 하나의 context에서 모두 수행될 때를 살펴보자

private fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        simple().collect { value ->
            delay(300) // pretend we are processing it for 300 ms
            println(value)
        }
    }
    println("Collected in $time ms")
}

flow 빌더 body에서 100ms collect body에서 300ms씩 총 3번의 데이터를 처리하는 과정이다.

  • 결과
1
2
3
Collected in 1282 ms

1200ms 정도의 시간이 걸리게 된다.
컴퓨터 구조를 배우면 파이프라인에 대해서 알게 된다. 그러면 collect body를 처리하는동안 emit body가 쉬고있다는게 비효율적이란 걸 금방 깨달을 수 있다. 그래서 collect body가 수행되는 동안 다른 context를 통해 flow body를 실행하여 값을 저장해두는 것이 buffer()다.

    val time = measureTimeMillis {
        simple()
            .buffer() // buffer emissions, don't wait
            .collect { value ->
                delay(300) // pretend we are processing it for 300 ms
                println(value)
            }
    }
    println("Collected in $time ms")

time을 buffer()를 사용해서 위처럼 수정했다.

  • 결과
1
2
3
Collected in 1071 ms

예상할수 있듯이 200ms의 시간을 단축한 것을 알 수 있다. 재밌다 ㅎㅎ
앞서 살펴봤던 flowOn 함수에서도 동일한 매커니즘을 사용한다고 한다.

Conflation

방출된 값중 가장 최신의 값만 보여줄 필요가 있을때 중간값은 건너뛰는게 성능에 더 도움이 될 때가 있다. 이때 사용하는 게 Conflation이다. (아마 실시간으로 보여줘야하는 정보들에 해당되지 않을까?)

val time = measureTimeMillis {
    simple()
        .conflate() // conflate emissions, don't process each one
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")

conflate()를 통해 가장 최신값만 받아 처리하게 되면

  • 결과
1
3
Collected in 765 ms

1을 처리하는 동안에 2와 3이 방출되었고 가장 최신값인 3만 처리해서 표시한다.

public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)

conflate는 위처럼 간단하게 구현되어 있다. 설명에는 CONFLATED는 buffer 호출에 설정된 여떤 capacity보다 우선 적용된다고 한다. buffer를 한번 볼까?

public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
    require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) {
        "Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity"
    }
    require(capacity != CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) {
        "CONFLATED capacity cannot be used with non-default onBufferOverflow"
    }
    // desugar CONFLATED capacity to (0, DROP_OLDEST)
    var capacity = capacity
    var onBufferOverflow = onBufferOverflow
    if (capacity == CONFLATED) {
        capacity = 0
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    }
    // create a flow
    return when (this) {
        is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
        else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
    }
}

CONFLATED를 우선적으로 처리하는 걸 볼 수 있다. 그리고 재미있는건 StateFlow에는 CONFLATED가 이미 적용되어 있는 것처럼 동작해서 CONFLATED를 적용해도 영향이 없다고 한다.

Processing the latest value

Conflation은 emitter와 collector가 둘다 느릴때 사용할 수 있는 하나의 방법이고 또 다른 방법은 수행중인 collector를 취소시켜버리고 새로운 value가 방출될 때마다 새로 collector 작업을 수행하는 것이다.
이 방법은 xxxLatest 친구들이 사용하는데, xxx 작업은 똑같이 수행하지만 새로운 value를 받으면 기존 작업중이던 coroutine은 취소시키고 새로 작업을 수행한다.

    val time = measureTimeMillis {
        simple()
            .collectLatest { value -> // cancel & restart on the latest value
                println("Collecting $value")
                delay(300) // pretend we are processing it for 300 ms
                println("Done $value")
            }
    }

time을 collectLatest를 사용해서 최신값만 처리하도록 해봤다.

  • 결과
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 660 ms

1,2,3을 emit하는데 걸리는 시간 300ms 그리고 마지막 3을 처리하는 300ms를 더해서 총 600ms 정도 걸리게 됐다.

StateFlow가 기본적으로 CONFLATED가 적용된 것처럼 동작한다는 사실을 알게됐다. 즉 상태를 빠르게 방출하면 가장 최신의 상태를 처리해서 보여주는 원리를 알 수 있었다.

profile
까먹고 다시 보려고 남기는 기록

0개의 댓글