
심명표 님의 코루틴 플로우 내부 살펴보기 1 를 읽고 코루틴 내부를 이해해본 내용을 정리하고자 한다.
처음엔 Cold Stream 과 Hot Stream의 차이점에 대해 알아보려 했는데 먼저 Flow에 대해 깊게 이해해보는것 부터 시작하고 싶었다.
Flow builder 를 가장 간단하게 만들 수 있는 방법이다.
flow<Int> {
emit(10)
delay(1000)
emit(20)
}
Flow Builder를 만드는 것은 어떤 의미인가??
저 빌더를 생성하고 있는 코드를 살펴보자.
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
여기서 주목할 것은 block 이다. 이 block은 FlowCollector<T>.() 를 수신기로 하고 있다. 수신기로 하고 있다는 말은 이 FlowCollector 인터페이스의 함수인 emit 을 미리 정의하고 이를 구현하는 객체를 받아와 나중에 실행 시킨다는 뜻이다.
그럼 이 FlowCollector 객체를 언제 생성할까??
눈치 챘을 지 모르겠지만 바로 collect 시점이다.
flow<Int> {
...
}.collect {
println("value: $it")
}
그렇다면 collect 의 내부 코드를 또 확인해봐야 한다.
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
여기서 봤을 때 우리가 항상 사용하는 collect 함수는 Flow 의 확장 함수로 만들어져있고 value를 인자로 한 람다함수를 매개변수로 받는다. 이 확장함수는 collect(object: FlowCollector<T> {... 이렇게 또 collect 함수를 실행하고 있다.
여기서부터 이해하는데 너무 힘들었다. 그렇다면 이 collect 의 출처는 또 어디인가?
Flow 를 상속받아 구현하고있는 클래스의 collect 일 것이다. 이 collect의 출처는 위에 flow builder에서 확인할 수 있었다.
바로 SafeFlow 를 확인해보자.
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
아직까진 뭐가 보이진 않는다. 다만 AbstractFlow 라는 클래스를 상속받고 있다는 것을 확인할 수 있다.
AbstractFlow도 그럼 확인해보자.
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
우리가 찾던 Flow를 구현하고 있는 클래스이다. 여기서 보면 collect를 override 하여 구현하고 있다. 그렇다면 인자로 받는 FlowCollector는 아까 collect 확장함수를 생성할 때 익명 클래스(anonymous class)로 생성한 것임을 이해할 수 있다.
그럼 구현된 collect 함수가 어떤 작업을 하는지 이해한다면 Flow의 내부 동작을 파악할 수 있을 것이다.
여기까지 잘 따라왔다면 이제 AbstractFlow의 collect 함수 내부를 보자.
SafeCollector 라는 클래스를 생성하고 collectSafely 함수에 인자로 넘겨주는걸 확인할 수 있다. SafeCollector는 또 뭔가....
@Suppress("UNCHECKED_CAST")
private val emitFun =
FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
internal actual class SafeCollector<T> actual constructor(
@JvmField internal actual val collector: FlowCollector<T>,
@JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
override suspend fun emit(value: T) {
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
try {
emit(uCont, value)
} catch (e: Throwable) {
//...
}
}
}
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
//...
val result = emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
//...
return result
}
}
SafeCollector는 FlowCollector와 ContinuationImpl을 상속받아 구현된 class 이다. 정확히 어떻게 동작하는지는 확실히 모르겠으나 적절한 인수로 함수를 호출하여 FlowCollector를 통해 안전하게 방출한다 정도로 이해하고 넘어가겠다.
본론으로 돌아가서
collectSafely?? 어딘가 익숙한데.. 이 abstract function 은 아까 AbstractFlow를 상속받은 SafeFlow에서 구현하고 있었다.
다시 확인해보자.
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
보면 collectSafely는 AbstractFlow에서 구현된 collect의 인자로 받은 FlowCollector를 가지고 만든 SafeCollector를 넘겨받아 생성자로 받은 block을 실행시키고 있다.
이게 어떤 의미냐면 이 collector는 flow {...}.collect 에서 collect 람다를 만들 때 생긴 FlowCollector를 참조하고 있고 block은 flow builder 즉 flow {...} 이 block을 말한다.
앞에서 block은 FlowCollector를 수신기로 사용하고 FlowCollector 객체를 받아와 나중에 실행한다고 말했는데, 결과적으로 collect 함수를 실행시켰을 때 생성된 FlowCollect를 넘겨주는 것이었다. 그리고 block에서 emit함수를 실행시키면 전달된 value를 collect 람다에서 받아 사용한다.
이렇게 머리로 이해해보니 Flow 패턴을 활용하여 코딩할 수 있을 것 같아 나만의 예시를 만들어 보았다.
/**
* Flow -> Delivery
* - collect -> take
* FlowCollector -> Taker
* - emit -> give
*/
interface Delivery<out T> {
suspend fun take(taker: Taker<T>)
}
fun interface Taker<in T> {
fun give(product: T)
}
fun <T> deliver(block: suspend Taker<T>.() -> Unit): Delivery<T> = SafeDelivery(block)
class SafeDelivery<T>(private val block: suspend Taker<T>.() -> Unit) : AbstractDelivery<T>() {
override suspend fun giveSafely(taker: Taker<T>) {
taker.block()
}
}
abstract class AbstractDelivery<T> : Delivery<T> {
final override suspend fun take(taker: Taker<T>) {
val safeTaker = SafeTaker(taker, coroutineContext)
giveSafely(safeTaker)
}
abstract suspend fun giveSafely(taker: Taker<T>)
}
class SafeTaker<T> constructor(
@JvmField val taker: Taker<T>,
@JvmField val collectContext: CoroutineContext
) : Taker<T> {
override fun give(product: T) {
// Continuation 과 Safe 의 의미를 제외하고 그냥 함수를 실행시켰다.
taker.give(product)
}
}
Flow 개념을 따라해본 것인데, 흐름을 배달로 바꾸고 방출과 수집을 배달 상품을 주고 받는다는 의미로 바꿔 만들어보았다.
val delivery = deliver {
give(10)
delay(1000)
give(20)
}
runBlocking {
delivery.take {
println("take: $it")
}
}
// 결과
// take: 10
// take: 20
잘 작동하는 것까지 확인해보았다.!!