기존의 Coroutine suspend
함수는 비동기적으로 단일 값만을 반환한다. 그렇다면 비동기적으로 계산된 복수의 값들을 반환하려면 어떻게 해야할까? 여기서 등장하는 것이 Kotlin의 Flow
개념이다.
기존 suspend 함수를 사용해서 복수의 값을 반환하는 함수를 만들어본다면 아래와 같이 만들 수 있다.
suspend fun simple(): List<Int> {
delay(1000) // 1초간 지연
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}
이 코드는 1초간 기다린 다음 1, 2, 3을 순서대로 프린트한다.
하지만 결과적으로 List<Int>
타입을 사용하기 때문에 한번에 모든 값을 반환해야한다.
위 예시처럼 한번에 모든 값을 반환하는 방법말고 비동기적으로 계산된 값을 스트림으로 나타내기 위해서는 다른 방법이 필요하다. 이때 사용할 수 있는게 Flow<T>
타입이다.
fun simple(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Launch a concurrent coroutine to check if the main thread is blocked
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
simple().collect { value -> println(value) }
}
위 코드를 실행하면 각 숫자들을 프린트하기 전 블로킹 되지 않은 메인 스레드에서 100ms마다 "I'm not blocked"를 출력한다. 그리고 별도의 Coroutine인 simple()
의 emit()
을 통해서 비동기적으로 값이 순차적으로 방출되어서 출력된다.
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
위 예제를 통해서 알 수 있는 Flow
의 특징은 아래와 같다.
Flow
의 빌더 함수는 flow
이다.flow { ... }
블록 내부의 코드들은 일시 중단될 수 있다.simple
함수는 더이상 suspend
수정자로 표시되어 있지 않다.emit
함수를 사용해 flow에서 값들이 방출된다.collect
함수를 사용해 flow로부터 값들을 수집한다.다른 예시로 Flow를 만들어보고 Flow 빌더에 대해서 살펴보자. 아래는 데이터 소스에서 일정한 주기로 최신 뉴스를 자동으로 가져오는 예시 코드이다.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the corouinte for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
flow
빌더는 코루틴 내에서 실행된다. 따라서 몇가지 제한사항이 적용된다.
1. Flows는 순차적이다. 코루틴에 속한 생산자(producer)로써, suspend 함수를 호출할 때 생산자 suspend 함수가 결과를 리턴할때까지 Flow 역시 일시중단(suspend)된다. 위 예시에서는 fetchLatestNews
라는 네트워크 요청이 완료될때까지 일시중단된 뒤에 결과값을 emit
함수를 통해 stream 형태로 방출한다.
2. flow
빌더에서 생산자(producer)는 다른 CoroutineContext
로부터 값을 방출(emit)할 수 없다. 따라서 withContext
블록이나 다른 코루틴을 생성하여 다른 CoroutineContext
에서 emit
함수를 호출하면 안된다. 이 경우에는 callbackFlow
라는 flow
빌더를 사용할 수 있다.
flow { ... }
빌더는 Flows의 가장 기본적인 빌더이다. 또한 Flow를 선언할 수 있는 다른 빌더들도 존재한다.
flowOf
빌더는 정해진 값의 세트를 방출하는 Flow를 정의한다..asFlow()
확장 함수를 사용해 Flow로 변환될 수 있다.// 정수 범위를 flow로 변환한다.
(1..3).asFlow().collect { value -> println(value) }
Flow는 Sequence와 비슷한 차가운 Stream이다. flow
빌더 내부의 코드는 flow가 collect되기 전까지 실행되지 않는다.
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
// result
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
simple() 함수 그 자체는 호출하자마자 곧바로 반환된다. 이것이 flow가 suspend 함수가 아닌 이유다. flow는 collect가 될 때마다 새로 시작되며, 따라서 collect를 호출할때마다 "Flow started"가 출력된다.
Flow는 Coroutines의 기본적인 협력 취소를 따른다. 취소 가능한 suspend 함수 (delay
같은)에서 Flow가 일시중단될 때, Flow로부터 값을 수집하는 것이 취소될 수 있다. 아래 예는 Flow가 withTimeoutOrNull
블록에서 실행될 때, Flow가 시간 초과에 따라 어떻게 취소되고 코드 실행이 중지되는지 보여준다.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main = runBlocking<Unit> {
withTimeoutOrNull(250) { // Timeout af
simple().collect { value -> println(value) }
}
println("Done")
}
위 코드를 실행하면 simple
함수의 flow에서 2개의 숫자만 방출되고 다음과 같이 출력된다.
Emitting 1
1
Emitting 2
2
Done
참고 및 출처
1. Android의 Kotlin 흐름
2. 비동기 Flow - GitBook