flow의 collect는 항상 호출부의 coroutine에서 발생한다.
이 특징을 context preservation이라고 한다.
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main() = runBlocking {
simple().collect { value -> log("Collected $value") }
}
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
[main] Started simple flow
[main] Collected 1
[main] Collected 2
[main] Collected 3
simple().collect가 main 스레드에서 호출되었기 때문에 collect의 구현부 역시 main 스레드에서 동작한다.
cpu 자원을 오래 사용하는 코드는 Dispatchers.Default에서 주로 실행되고 (몰랐다!)
UI 업데이트 코드는 Main에서 주로 실행된다.
보통 kotlin coroutine에선 withContext를 통해 context를 교체하는데 flow 빌더 내부에선 context preservation을 지켜줘야 한다. emit()은 호출부 외에 다른 스레드에선 동작하지 않는다.
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
// 다른 context에서 방출
withContext(Dispatchers.Default) {
emit(i)
}
}
}
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@fc87cb2, BlockingEventLoop@5bb9f24e],
but emission happened in [DispatchedCoroutine{Active}@4a034118, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
collect과 emit을 같은 context에서 하라는 Exception을 뱉는다.
위 Exception의 마지막에 flowOn이라는 키워드가 언급된다.
collect의 구현부와 flow 빌더 내의 작업을 다른 context에서 수행하기 위한 연산자이다.
fun simple(): Flow<Int> = flow {
withContext(Dispatchers.Default) {}
for (i in 1..3) {
Thread.sleep(100) // cpu 자원을 사용하는 척
log("Emitting $i")
emit(i)
}
}.flowOn(Dispatchers.Default)
flow 빌더 내의 작업은 cpu 자원을 사용하는 작업이 필요해서 Default에서 실행되어야할 때 위처럼 flowOn을 통해 context 교체를 할 수 있다.
[DefaultDispatcher-worker-1] Emitting 1
[main] Collected 1
[DefaultDispatcher-worker-1] Emitting 2
[main] Collected 2
[DefaultDispatcher-worker-1] Emitting 3
[main] Collected 3
flow builder의 upstream과 collect의 downstream 각각의 coroutine이 따로 정해져서 관리된다는 것을 알 수 있다. flowOn에 대해서 알아보자
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
checkFlowContext(context)
return when {
context == EmptyCoroutineContext -> this
this is FusibleFlow -> fuse(context = context)
else -> ChannelFlowOperatorImpl(this, context = context)
}
}
ChannelFlow와 연관이 있어보인다. 다음에 알아봐야겠다.