과제전형을 진행하면서 오랜만에 coroutine, flow를 많이 사용했는데 공식문서를 보고 공부한 적이 없어서 한번 읽어보면서 정리하려한다.
flow는 Asynchronous Flow란 이름으로 소개되어있다. 비동기적 '흐름'이란 말인데 설명을 보면, suspend fun으로 비동기적으로 하나의 값을 반환할 수 있는데, flow는 여러 값을 비동기적으로 반환하기 위해 탄생했다고 한다.
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
결과가 꽤 재밌다. 공식문서는 이 특징이 flow를 반환하는 simple fun에 suspend를 붙이지 않은 이유라고 한다. simple은 정의된 Flow 반환할 뿐이고, collect()로 구독한 순간부터 데이터를 반환하기 때문이다.
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
withTimeoutOrNull(250) {
flow.collect { value -> println(value) }
}
println("Calling collect again...")
flow.collect { value -> println(value) }
}
예제코드는 달랐는데 그냥 cancel에 대한 설명이라 이전 코드에 써봤다. timeout으로 인해 2까지 출력되고 collect가 취소된다.
flow를 생성하는 3가지 방법에 대해서 소개한다.
다양한 collection에 .asFlow() 확장함수를 통해 생성하거나
flowOf를 통해 생성한다.
가장 기본적인 방법은 flow{} block을 통해서 생성하는 것이다.
fun main() = runBlocking {
val list = listOf(1, 2, 3)
val flow1 = list.asFlow()
flow1.collect { value -> println(value) }
val flow2 = flowOf(*list.toTypedArray())
flow2.collect { value -> println(value) }
val flow3 = flow { list.forEach { value -> emit(value) } }
flow3.collect { value -> println(value) }
}
flow를 통해 비동기적으로 반환받은 값을 연산자를 통해 가공하여 사용할 수 있다. map과 filter같은 익숙한 연산자를 사용할 수 있는데 차이점은 코드블록 내부에서 suspend fun을 사용할 수 있단 점이다.
transform 연산자는 가장 일반적으로 사용되는 연산자로 map이나 filter같은 변환을 구현하거나 더 복잡한 변환도 할 수 있다.
public inline fun <T, R> Flow<T>.transform(
@BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = flow { // Note: safe flow is used here, because collector is exposed to transform on each operation
collect { value ->
// kludge, without it Unit will be returned and TCE won't kick in, KT-28938
return@collect transform(value)
}
}
transform의 구현은 위와 같다. 기존 flow에서 수집한 값을 변환해서 방출한다.
TCE를 위해 transform(value)를 반환한다고 되어있다. TCE는 Tail Call Elimination의 약자다.
함수의 마지막에 다른 함수를 호출할 때 스택을 최적화하기 위해 현재 함수의 스택 프레임을 제거하고 다음 함수를 호출한다.
하지만 kotlin의 suspend fun은 반환값을 명시하지 않으면 Unit 객체를 반환하기 때문에 최적화가 이뤄지지 않는다.
그래서 명시적으로 return 값을 선언해주어 최적화가 이뤄지게 만들었다.
- kludge는 편법, 임시방편이라는 뜻
// For internal operator implementation
@PublishedApi
internal inline fun <T, R> Flow<T>.unsafeTransform(
@BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
collect { value ->
// kludge, without it Unit will be returned and TCE won't kick in, KT-28938
return@collect transform(value)
}
}
위에서 safe flow라는 단어가 나와서 뭔가 싶었는데 unsafeFlow라고 사용자에겐 열려있지 않은 내부에서만 사용되는 함수가 있었다.
/**
* An analogue of the [flow] builder that does not check the context of execution of the resulting flow.
* Used in our own operators where we trust the context of invocations.
*/
@PublishedApi
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}
호출 context를 신용할 수 있는 자체 연산자에서만 사용해서 따로 실행의 context를 확인하지 않는다고 한다. 이 부분은 다음에 자세히 다뤄봐야겠다.
take()와 같은 중간 연산자는 구독하는 횟수를 제한할 수 있다.
coroutine의 취소는 항상 Exception을 던지는 방법으로 이뤄진다. 그래서 try-catch문 같은 자원관리 fun은 정상적으로 동작한다.
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
이 예제가 좀 재밌는게 이 코드를 통해 Hot Stream과 Cold Stream에 대해 더 쉽게 이해가 됐기 때문이다.
보통 취소를 생각하면 stream, 즉 flow는 그대로 있고 구독을 더이상 안하면 되는거 아닌가? 생각하게 되는데 이 코드는 flow를 취소시켜버린다.
그러면 이후의 값을 계속 받고 싶으면 어떻게 하지? -> 이게 Hot Stream의 흐름이다.
Cold Stream은 collect하는 순간부터 flow로서 시작된다. 코드를 약간 변형시켜보자
fun numbers(): Flow<Int> = flow {
try {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
} catch (e: Exception) {
println("cancel flow")
}
}
1
2
cancel flow
2개를 받은 이후에 flow를 취소시켜버린다. collect를 취소할 순 없나? collect는 suspend fun으로 flow가 아니다.
그리고 위의 cancel에서 봤던 한 문장이 이해가 되기 시작했다.
Flows adhere to the general cooperative cancellation of coroutines
fun main() = runBlocking<Unit> {
val job = launch {
launch {
numbers()
.collect { value ->
println(value)
}
}
delay(1000)
cancel()
}
}
collect를 수행하고 collect job을 1초후에 취소해보자
1
cancel flow
flow가 취소된다.
Cold Stream이라는 말이 좀 더 와 닿는다. 그리고 자연스럽게 하나의 구독자만 존재할 수 있다는걸 알 수 있다. 하나의 구독자만을 위해서 새로 생성되기 때문이다.
남은 특징들은 다음에 또 추가로 정리해봐야겠다.