Flow는 Collections, Sequence와 같이 연산자를 이용해 변환될 수 있다. 이러한 중간 연산자는 업스트림 Flow에 적용되어 다운스트림 Flow를 반환한다. 이러한 연산자들은 Flow만큼 차갑다. 연산자를 호출하는 것 자체는 suspend 함수가 아니다. 빠르게 작동하여 새롭게 변환된 Flow를 반환한다.
기본 중간 연산자들은 map
혹은 filter
와 같은 친숙한 이름을 가지고 있다. 이 연산자들이 Sequence에서 사용될 때와 차이점은 이 연산자들 내부 코드 블록에서는 suspend 함수를 호출할 수 있다는 점이다.
예를 들어 요청을 수행하는 것이 오래 걸리는 작업이고 suspend 기능으로 구현되어 있는 경우에도, 요청들을 받는 Flow를 map
연산자를 사용해 결과에 매핑할 수 있다.
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
위 코드의 결과는 아래와 같으면 각 줄은 이전 줄로부터 1초 후에 나타난다.
response 1
response 2
response 3
Flow 변환 연산자들 중 가장 일반적인 것은 transform
이다. map
이나 filter
와 같은 간단한 변환을 구현할 수도 있고 임의의 횟수만큼 값을 emit
할 수도 있다.
예를 들어 아래와 같이 오래걸리는 비동기 요청을 수행하기 전에 문자열을 방출(emit)하고 그 응답을 기다릴 수 있다.
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
이 외에도 방출 크기를 한정할 수 있는 연산자 take
등이 있다.
Flow의 터미널 연산자는 flow의 수집(collection)을 시작하는 일시정지 함수이다. 가장 기본적으로 collect
연산자가 있으며 이외에 toList
, toSet
, first
, single
, reduce
, fold
등 다양한 터미널 연산자가 존재한다.
val sum = (1..5).asFlow()
.map { it * it } // squares of numbers from 1 to 5
.reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)
15
특수한 연산자를 사용하지 않는 한 각 개별 Flow의 컬렉션은 순차적으로 동작한다. 여기서 컬렉션은 터미널 연산자를 호출하는 Coroutine에서 직접 동작하며 이때 기본적으로 어떠한 새로운 Coroutine도 실행되지 않는다.
방출된 각 값들은 transform
과 같은 중간 연산자들에 의해 업스트림에서 다운스트림으로 ㅓ리된 후 터미널 연산자에게 전달된다.
다음은 정수 중 짝수를 필터링한 후 문자열에 매핑하는 코드이다.
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
Flow의 수집은 언제나 Coroutine을 호출하는 Context상에서 일어난다. 만약 simple
이라는 Flow가 있다면, 다음 코드의 simple
flow는 구체적인 구현과 상관없이 코드 작성자가 지정한 Context상에서 실행된다.
이러한 Flow의 성질을 컨텍스트 보존(context preservation)이라 부른다.
withContext(context) {
simple().collect { value ->
println(value) // run in the specified context
}
}
따라서 기본적으로 flow { ... }
빌더 내부의 코드는 해당 Flow의 collector가 제공하는 Context 상에서 실행된다. 예를 들어, simple
함수의 구현이 호출되는 스레드를 출력하고 3개의 숫자들을 방출한다고 해보자.
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
simple().collect
가 메인 스레드에서 호출되므로, simple
flow의 body 또한 메인 스레드에서 호출된다.
하지만 CPU를 오래 사용하는 코드는 Dispatchers.Default Context에서 실행되어야 할 수도 있고, UI를 업데이트하는 코드는 Dispatchers.Main Context에서 실행되어야 할 수 있다.
일반적으로 withContext
는 Coroutine을 사용하는 코드의 Context를 변경하는데 사용되지만, flow { ... }
빌더의 코드는 Context 보존 특성을 준수해야하므로 다른 Context에서 방출하는 것은 허용되지 않는다.
fun simple(): Flow<Int> = flow {
// The WRONG way to change context for CPU-consuming code in flow builder
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
이 코드는 아래의 Exception을 생성한다.
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
앞에서 언급한 것처럼 flow { ... }
빌더 생산자는 수집하는 CoroutineContext
에서 실행된다. 따라서 다른 CoroutineContext
에서 값을 emit
할 수 없다.
하지만 상황에 따라서 다른 Context
에서 Flow를 수집하거나 값을 방출해야할 수도 있다. 이렇게 Flow의 Context를 변경하려면 중간 연산자 flowOn
을 사용해야 한다.
flowOn
은 업스트림 흐름의 Context를 변경한다. 즉, 생산자 및 중간 연산자가 flowOn
전에 적용된다. 다운스트림 흐름 (flowOn
이후의 중간 연산자 및 소비자)는 영향을 받지 않으며 흐름에서 collect
하는데 사용되는 CoroutineContext
에서 실행된다. flowOn
연산자가 여러 개 있는 경우 각 연산자는 현재 위치에서 업스트림을변경한다.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
위 코드에서 onEach
및 map
연산자는 Dispatchers.Default
를 사용하는 반면, catch
연산자와 소비자는 viewModelScope
에 사용되는 Dispatchers.Main
에서 실행된다.