[Coroutine] Flow 내부 파헤치기

JunHyeong Lee·2023년 6월 9일

Coroutine

목록 보기
2/2
post-thumbnail

심명표 님의 코루틴 플로우 내부 살펴보기 1 를 읽고 코루틴 내부를 이해해본 내용을 정리하고자 한다.

처음엔 Cold Stream 과 Hot Stream의 차이점에 대해 알아보려 했는데 먼저 Flow에 대해 깊게 이해해보는것 부터 시작하고 싶었다.

Flow builder 를 가장 간단하게 만들 수 있는 방법이다.

flow<Int> {
    emit(10)
    delay(1000)
    emit(20)
}

Flow Builder를 만드는 것은 어떤 의미인가??
저 빌더를 생성하고 있는 코드를 살펴보자.

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

여기서 주목할 것은 block 이다. 이 block은 FlowCollector<T>.() 를 수신기로 하고 있다. 수신기로 하고 있다는 말은 이 FlowCollector 인터페이스의 함수인 emit 을 미리 정의하고 이를 구현하는 객체를 받아와 나중에 실행 시킨다는 뜻이다.

그럼 이 FlowCollector 객체를 언제 생성할까??
눈치 챘을 지 모르겠지만 바로 collect 시점이다.

flow<Int> {
    ...
}.collect {
    println("value: $it")
}

그렇다면 collect 의 내부 코드를 또 확인해봐야 한다.

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

여기서 봤을 때 우리가 항상 사용하는 collect 함수는 Flow 의 확장 함수로 만들어져있고 value를 인자로 한 람다함수를 매개변수로 받는다. 이 확장함수는 collect(object: FlowCollector<T> {... 이렇게 또 collect 함수를 실행하고 있다.

여기서부터 이해하는데 너무 힘들었다. 그렇다면 이 collect 의 출처는 또 어디인가?
Flow 를 상속받아 구현하고있는 클래스의 collect 일 것이다. 이 collect의 출처는 위에 flow builder에서 확인할 수 있었다.
바로 SafeFlow 를 확인해보자.

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

아직까진 뭐가 보이진 않는다. 다만 AbstractFlow 라는 클래스를 상속받고 있다는 것을 확인할 수 있다.
AbstractFlow도 그럼 확인해보자.

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

    public final override suspend fun collect(collector: FlowCollector<T>) {
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

우리가 찾던 Flow를 구현하고 있는 클래스이다. 여기서 보면 collect를 override 하여 구현하고 있다. 그렇다면 인자로 받는 FlowCollector는 아까 collect 확장함수를 생성할 때 익명 클래스(anonymous class)로 생성한 것임을 이해할 수 있다.

그럼 구현된 collect 함수가 어떤 작업을 하는지 이해한다면 Flow의 내부 동작을 파악할 수 있을 것이다.

여기까지 잘 따라왔다면 이제 AbstractFlow의 collect 함수 내부를 보자.
SafeCollector 라는 클래스를 생성하고 collectSafely 함수에 인자로 넘겨주는걸 확인할 수 있다. SafeCollector는 또 뭔가....

@Suppress("UNCHECKED_CAST")
private val emitFun =
    FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>

internal actual class SafeCollector<T> actual constructor(
    @JvmField internal actual val collector: FlowCollector<T>,
    @JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {

	override suspend fun emit(value: T) {
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                emit(uCont, value)
            } catch (e: Throwable) {
                //...
            }
        }
    }
    
    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        //...
        val result = emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
        //...
        return result
    }
}

SafeCollector는 FlowCollector와 ContinuationImpl을 상속받아 구현된 class 이다. 정확히 어떻게 동작하는지는 확실히 모르겠으나 적절한 인수로 함수를 호출하여 FlowCollector를 통해 안전하게 방출한다 정도로 이해하고 넘어가겠다.

본론으로 돌아가서
collectSafely?? 어딘가 익숙한데.. 이 abstract function 은 아까 AbstractFlow를 상속받은 SafeFlow에서 구현하고 있었다.
다시 확인해보자.

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

보면 collectSafely는 AbstractFlow에서 구현된 collect의 인자로 받은 FlowCollector를 가지고 만든 SafeCollector를 넘겨받아 생성자로 받은 block을 실행시키고 있다.
이게 어떤 의미냐면 이 collector는 flow {...}.collect 에서 collect 람다를 만들 때 생긴 FlowCollector를 참조하고 있고 block은 flow builder 즉 flow {...} 이 block을 말한다.

앞에서 block은 FlowCollector를 수신기로 사용하고 FlowCollector 객체를 받아와 나중에 실행한다고 말했는데, 결과적으로 collect 함수를 실행시켰을 때 생성된 FlowCollect를 넘겨주는 것이었다. 그리고 block에서 emit함수를 실행시키면 전달된 value를 collect 람다에서 받아 사용한다.

이렇게 머리로 이해해보니 Flow 패턴을 활용하여 코딩할 수 있을 것 같아 나만의 예시를 만들어 보았다.

/**
 *  Flow -> Delivery
 *      - collect -> take
 *  FlowCollector -> Taker
 *      - emit -> give
 */

interface Delivery<out T> {
    suspend fun take(taker: Taker<T>)
}

fun interface Taker<in T> {
    fun give(product: T)
}

fun <T> deliver(block: suspend Taker<T>.() -> Unit): Delivery<T> = SafeDelivery(block)

class SafeDelivery<T>(private val block: suspend Taker<T>.() -> Unit) : AbstractDelivery<T>() {
    override suspend fun giveSafely(taker: Taker<T>) {
        taker.block()
    }
}

abstract class AbstractDelivery<T> : Delivery<T> {
    final override suspend fun take(taker: Taker<T>) {
        val safeTaker = SafeTaker(taker, coroutineContext)
        giveSafely(safeTaker)
    }
    abstract suspend fun giveSafely(taker: Taker<T>)
}

class SafeTaker<T> constructor(
    @JvmField val taker: Taker<T>,
    @JvmField val collectContext: CoroutineContext
) : Taker<T> {
    override fun give(product: T) {
    	// Continuation 과 Safe 의 의미를 제외하고 그냥 함수를 실행시켰다.
        taker.give(product)
    }
}

Flow 개념을 따라해본 것인데, 흐름을 배달로 바꾸고 방출과 수집을 배달 상품을 주고 받는다는 의미로 바꿔 만들어보았다.

val delivery = deliver {
        give(10)
        delay(1000)
        give(20)
    }

    runBlocking {
        delivery.take {
            println("take: $it")
        }
    }

// 결과
// take: 10
// take: 20

잘 작동하는 것까지 확인해보았다.!!

profile
Android Developer

0개의 댓글