Kotlin Flow 파헤치기

최석우·2023년 11월 28일

Flow란?

A suspending function asynchronously returns a single value, but how can we return multiple asynchronously computed values? This is where Kotlin Flows come in.

→ 공식 문서 설명에서도 볼 수 있듯이, 비동기적으로 생성되는 value는 한개지만, 이 value의 묶음을 반환하기 위해서는 다른 객체가 필요한데, 그 객체를 flow라고 이야기하고 있다.

→ Sequence는 Flow와 비슷하지만, Thread Blocking

Flows are Cold

  • Official Docs
  • Flow의 중요한 성질이다. Reactive Programming의 주요 컨셉 (Back Pressure) 중 하나. Provider가 Cold하다는 의미는, 요청이 오기 전까지는 값을 만들고 있지 않는다는 뜻. 즉 필요하다고 요청이 와야 그때서야 가장 가까이에 있는 emit을 실행한다.
fun simple(): Flow<Int> = flow { 
    val v = Random.nextInt()
    println("Flow started $v")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}
  • flow는 실제로 collect가 실행되기 전까지는 그 어떤 행도 실행되지 않는다.
    • “Calling simple function”과 “Calling collect…” 사이에 flow 함수의 그 어떤 행도 실행되지 않음.
  • collect가 두 번째로 호출되면, 새로운 flow를 생성하고 그 flow를 실행한다.
    • random으로 int를 생성하는 함수가 두 번 호출됨. 즉, 똑같은 flow를 다시 제공하는 것이 아니다.
suspend fun usefulFunction1(): Int {
    delay(2000)
    return 1
}
suspend fun usefulFunction2(): Int {
    delay(1000)
    return 2
}
suspend fun usefulFunction3(): Int {
    delay(3000)
    return 3
}

suspend fun getFlow(): Flow<Int> = flow {
    coroutineScope {
        emit(usefulFunction1())
        emit(usefulFunction2())
        emit(usefulFunction3())
    }
}

fun flowTest() {
    runBlocking {
        val f = getFlow()
        f.collect {
            println(it)
        }
    }
}
  • 이 예제 코드에서도 위의 cold 성질을 알 수 있다.
    • usefulFunction들은 동시에 실행되지 않고, 무조건 emit이 불릴때만 값을 계산하여 전달한다.

Flow with Coroutine - Provider

  • 위의 코드는 실무에서는 비효율적인 코드이다. 오래 걸리는 작업이 앞에서 blocker가 되어 뒤의 값들이 계산되지 못하고 있다. ⇒ 여기서 비동기적으로 emit을 하려고 하면 어떻게 해야할까?
suspend fun getFlow(): Flow<Int> = flow {
		coroutineScope {
        launch {
            emit(usefulFunction1())
        }
        launch {
            emit(usefulFunction2())
        }
        launch {
            emit(usefulFunction3())
        }
    }
}
  • getFlow함수의 내부에서 emit을 launch로 감싸면, 비동기로 실행해주지 않을까?

    • 이 코드는 에러를 발생시킨다. flow를 정의하는 coroutine scope와 emit하는 scope가 다르면 안된다.
  • 그럼 async를 걸고 deferred함수에 await을 해서 emit하는건 어떨까?

suspend fun getFlow(): Flow<Int> = flow {
    coroutineScope {
        val v1 = async { usefulFunction1() }
        val v2 = async { usefulFunction2() }
        val v3 = async { usefulFunction3() }
        emit(v1.await())
        emit(v2.await())
        emit(v3.await())
    }
}

fun flowTest() {
    runBlocking {
        val f = getFlow()
        f.take(1).collect {
            delay(2100)
            println(it)
        }
    }
}
  • 이러면, 함수를 미리 실행시켜놓을 수 있기 때문에 느린 작업들이 blocker가 되지 않고, 따라서 한 가지 해결방법으로 사용 가능하다.
  • 다만 이 방식의 문제점은, flow가 cold하다는 성질을 전혀 활용하지 못한다는 점이다.
  • flow의 collect가 호출되면, 호출되는 즉시 첫 번째 emit까지 코드를 모두 실행한다. 즉 우리는 v1.await()의 값만 필요로 하는데 이미 v1, v2, v3의 값들의 준비 작업(async로 쓰여진 코드들)이 시작되었음을 의미한다. 본 예제에서는 3개의 값이지만, 1000개의 비동기 작업이 있었다고 하면 consumer가 값을 하나만 요청했더라도 1000개의 비동기 작업을 모두 실행하게 되어버린다.

ChannelFlow

suspend fun usefulFunction(i: Int): Int {
		if (i==1) {
				// when i==1, really long running job
        delay(10_000)
    }
    delay(2000)
    println("ready $i")
    return i
}

suspend fun getFlow(): Flow<Int> = channelFlow {
    for(i in 1..1000) {
				launch {
		        send(usefulFunction(i))
				}
    }
}
  • channelFlow를 사용하면 child coroutine에서 비동기적으로 발행되는 값들이 채널에 먼저 들어가고, 채널에 들어간 순서대로 flow를 만들어준다.
    • 이 방식대로 하면, flow를 collect했을 때 비동기적으로 usefulFunction을 호출하여 값을 채워주게 된다.
  • 그러나 이 방법에도 문제가 있다
    • flow에서는 collect가 불리는 순간에 emit을 실행했지만, 이렇게 coroutine에서 비동기적으로 값을 생성하고 있는 경우에는 collect가 하나의 원소만 불렀더라도, 수많은 (1000개의) usefulFunction 함수들이 모두 실행된다. 즉 back pressure가 적절하게 작동하지 않게 된다.
    • 앞선 예제의 방법과 동일한 형태의 문제점이다. 값을 불러오는 함수를 모두 호출해서 결과를 받아놓고, 필요할때마다 내보내는 방식이다.

val s  = Semaphore(5)

suspend fun getFlow(): Flow<Int> = channelFlow {
    for(i in 1..100) {
        launch {
            s.withPermit {
                send(usefulFunction(i))
            }
        }
    }
}

fun flowTest() {
    runBlocking {
        val f = getFlow()
        f.collect {
            delay(1000)
            println(it)
        }
    }
}
  • 따라서, Coroutine에서 동시에 실행되는 함수의 숫자를 Semaphore를 통해 조절해보자. 이렇게 하면, 5개만 미리 요청을 보내지 않을까?
    • 함수가 실행되는 최대 횟수 자체는 5번으로 제한되지만, flow에 준비되는 객체의 개수가 5개인 것은 아니다.

A channel with the default buffer size is used. Use the buffer operator on the resulting flow to specify a user-defined value and to control what happens when data is produced faster than consumed, i.e. to control the back-pressure behavior.

  • 왜냐하면 channelFlow는 기본적으로 default buffer size(64)개 만큼의 원소를 미리 send 하는 것을 허용한다. (즉 channel의 사이즈가 64) 따라서 이 채널의 크기를 줄여야 정말 우리가 원하는 만큼의 함수만 미리 실행될 것이다.
  • 이 buffer size를 조절하기 위해서는 channelFlow의 끝에 .buffer(buffer_size)를 추가하면 된다.
suspend fun usefulFunction(i: Int): Int {
    if (i==1) {
        delay(10_000)
    }
    delay(2000)
    println("ready $i")
    return i
}

val s  = Semaphore(5)
suspend fun getFlow(): Flow<Int> = channelFlow {
    for(i in 1..100) {
        launch {
            s.withPermit {
                send(usefulFunction(i))
            }
        }
    }
}.buffer(0)

fun flowTest() {
    runBlocking {
        val f = getFlow()
        f.collect {
            delay(2100)
            println(it)
        }
    }
}
  • 이제 coroutine과 flow가 적절하게 작동하게 된다.
  • buffer를 0으로 주는 이유는, channel에 값을 넣기 위한 launch 작업이 이미 최대 5개로 고정되어 있으므로 우리는 5개의 bufferred item이 있는 것과 같다.
  • 실행해보면 ready 라는 로그는 5개 초과로 연속해서 쌓이지 않는다.
  • 단, 이 방법은 usefulFunction의 call 순서와 반환 순서의 일치를 보장하지 않는다. (즉 먼저 실행된 함수의 결과값이 꼭 먼저 나오지는 않는다.) 따라서 순서가 중요한 요청들에 대해서는 그냥 flow에 적절한 비동기를 섞어서 사용하는 것이 더 좋다.
suspend fun usefulFunction(i: Int): Int {
    if (i==1) {
        delay(10_000)
    }
    delay(2000)
    println("ready $i")
    return i
}

suspend fun getFlow(): Flow<Int> = flow {
    coroutineScope {
        for(i in 1..100) {
            emit (
                async { usefulFunction(i) }
            )
        }
    }
}.buffer(5).transform { emit(it.await()) }

fun flowTest() {
    runBlocking {
        val f = getFlow()
        f.collect {
            delay(2100)
            println("collect $it")
        }
    }
}
  • getFlow함수를 다시 flow로 설정하고, 대신 첫 번째 flow가 emit하는 것을 deferred 객체로 만들어준다. 그리고, 그 사이에 버퍼를 둔 다음 값을 기다려서 emit하도록 flow를 만들어보자.
  • 이렇게 되면, (buffer가 없을 때는) collect가 불릴때마다 비동기 객체를 만들고, 그것을 기다려서 emit해준다. 이때 buffer의 역할은 5개정도의 usefulFunction의 값들이 미리 대기할 수 있도록 coroutine을 실행시켜준다는 것이다.
  • 실제로 함수를 실행시켜보면, i가 1일때 매우 오래걸리므로 ready 2~7 (buffer에 들어있는 값들)이 먼저 찍히고, 1이 끝나서 collect 되면 다음 함수들이 미리 대기되는 모습을 확인할 수 있다.
  • 또한 이 방식은 처음 값이 생성되는 순서와 collect에 의해 받아들여지는 순서가 보장된다. (단 long running job이 있는 경우 blocker가 될 수 있다는 점을 염두에 두어야 한다.)

Flow with Coroutine - Consumer

  • 지금까지는 Provider 쪽에서 비동기적으로 값을 만들어내고, 그 값들을 flow의 형태로 반환하는 방법에 대해서 알아보았다.
  • 이제는 Consumer 쪽에서 값들을 어떻게 비동기적으로 소비할 수 있을지 알아보자.
// Super fast provider
fun getFlow() = flow {
    for (i in 1..100) {
        delay(1)
				println("emit $i")
        emit(i)
    }
}

suspend fun usefulFunction(i: Int) {
    delay(100)
    println("useful $i")
}

fun flowTest() {
    val f = getFlow()
    runBlocking {
        f.collect {
            usefulFunction(it)
        }
    }
}
  • 이번엔 consumer쪽에서 usefulFunction이 호출되는 상황이다. (이런 useful function들은 나중에 실무에서는 API call, DB call, external API call 등으로 대체될 수 있다.)
  • 위의 예제에서도 우리는 역시 flow가 cold함을 알 수 있다.
    • emit과 useful 로그가 번갈아가면서 나오는 것으로 알 수 있다.
  • 이 코드는 좋지 못하다. 왜냐하면, 동시에 여러 usefulFunction을 실행하면 금방 처리되지만, provider가 빠르게 (1ms의 delay로) 값을 내보내고 있는데, 처리하는 쪽은 시간이 좀 더 걸리는 (100ms) 함수이다. 우리는 이 함수가 비동기적으로 coroutine에서 처리되길 바란다.
var runCount = 0
suspend fun usefulFunction(i: Int) {
    runCount++
    if (i==1) {
        delay(10_000)
    }
    delay(1000)
    println("useful $i")
    if (runCount > 10) {
        throw Exception("Function is not available!!")
    }
    runCount--
}

fun flowTest() {
    val f = getFlow()
    runBlocking {
        f.collect {
            launch {
                usefulFunction(it)
            }
        }
    }
}
  • 실행되는 부분을 launch를 사용해서 비동기적으로 실행해보자.
  • 이러면 앞에서 문제점으로 꼽았던 동기적으로 실행되던 문제는 해결됐지만, 다른 문제가 발생하였다.
  • consumer쪽도 무작정 비동기로 모든 값들을 받아서 처리하는게 아니라, consume capacity에 맞게 (실제 예시에서는 API 혹은 DB의 Throughput 값들) 적절한 양만큼만 실행해줘야 한다.
  • usefulFunction이 10개 이상 실행된다면 exception을 던지도록 만들어뒀으므로, 위의 코드를 실행해보면 에러가 발생한다.
fun flowTest() {
    val f = getFlow()
    val s = Semaphore(5)
    runBlocking {
        f.collect {
						// does not suspended
            launch {
                s.withPermit {
                    usefulFunction(it)
                }
            }
        }
    }
}
  • 따라서, collect 하는쪽에서 semaphore를 사용하여 최대로 실행될 수 있는 함수의 수를 5개 이하로 조정해보자. 이렇게 수정한 코드는, Function not available exception을 던지지 않는다.
  • 또한 usefulFunction은 1이 들어왔을 때 굉장히 오래 걸리는 작업인데, 해당 작업과 무관하게 나머지 작업들이 잘 실행되는 것을 확인할 수 있다.
  • 다만, 이 코드 또한 collect의 실행 방식 때문에 문제가 된다.
    • flow는 collect 내부의 구문이 한번 실행되면 consumer가 provider의 emit을 하나 처리했다고 인지한다. 그런데, 이 coroutine launch 구문의 특성은 fire and forget으로, child coroutine에서 해당 작업을 시작만 하고 바로 해당 구문을 넘어가버리기 때문에, provider쪽에서는 consumer가 launch구문을 fire하기만 해도, 이미 처리됐다고 생각하고 다음 emit을 진행해버린다.
    • 실제로 코드를 실행해보면, 아직 useful이 한번도 실행되지 않았지만 이미 “emit” 이 모두 출력되는 것을 확인할 수 있다.
    • 즉 provider쪽에서 실제 consumer의 진행상황을 인지하지 못하고 값을 계속 만들어서 제공하게 된다.
fun flowTest() {
    val f = getFlow()
    val s = Semaphore(5)
    runBlocking {
        f.collect{
            s.acquire() // suspended when resource not available
            launch {
                usefulFunction(it)
                s.release() // release it
            }
        }
    }
}
  • 따라서 release가 launch 안에서 일어나야 semaphore가 정상적으로 작동한다.
var runCount = 0
suspend fun usefulFunction(i: Int): Int {
    runCount++
    println("useful $i start")
    delay(1000)
    if (runCount > 10) {
        throw Exception("Function is not available!!")
    }
    println("useful $i end")
    runCount--
    return i
}

suspend fun worker(c: Channel<Int>) {
    while(true) {
        try {
            val v = c.receive()
            usefulFunction(v)
            println("done $v")
        } catch (_: ClosedReceiveChannelException) {
            break
        }
    }
}

fun flowTest() {
    val f = getFlow()
    val c = Channel<Int>(1)
    runBlocking {
        repeat(5){
            launch {
                worker(c)
            }
        }
        f.collect {
            c.send(it)
        }
        c.close()
    }
}
  • 기존 C#처럼 worker를 사용해서도 구현이 가능하다.
  • 따라서, channel을 사용하여 background에 돌고있는 비동기 worker에게 받아온 값을 전달하고, worker의 숫자를 5개로 제한하면 5개의 function만 동시에 실행되게 된다. (worker만이 function을 실제로 실행하는 주체이고, 이 개수는 5개뿐이다.)
profile
SNU computer science & engineering

0개의 댓글