목차
- Intro
- Flow란?
- Flow의 구성
- 플로우 빌더
- 중간 연산자
- 최종 연산자
- Outro
선행지식
- Kotlin Coroutines
- Sequence
스타카토에서 LiveData를 Flow로 마이그레이션하고 있다. 스프린트 일정을 맞추느라 StateFlow와 SharedFlow의 개념만 간단히 이해한 채 사용했더니, 관련 지식이 부족하다는 느낌이 들었다.
따라서 이번 포스팅에서는 Flow에 대해 알아보려고 한다.

An asynchronous data stream that sequentially emits values and completes normally or with an exception.
Flow는 비동기 *데이터 스트림으로, 값을 순차적으로 전송한다. 비동기란 *동시성을 가지는 방식으로 작업 1을 수행하는 중에 작업 2를 실행할 수 있다.
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
Flow의 유일한 멤버 함수는 collect이다. 즉, Flow는 떠다니는 원소들을 모으는 역할을 하며, 플로우의 끝에 도달할 때까지 각 값을 처리한다. Flow의 collect는 컬렉션의 forEach와 비슷하다.
Kotlin Docs: Flow and Reactive Streams에 따르면, Flow는 Reactive Streams에서 영감을 받았다고 한다. Reactive Streams는 비동기적으로 데이터를 처리할 때, 너무 많은 데이터를 한 번에 전달하지 않도록 조절(back pressure)하면서 처리하는 방식을 정의한 표준이다. Reactive Stream과 Flow는 정의만 봐도 공통점이 많아 보인다.
Sequence의 최종 연산(forEach 등)은 중단 함수(suspend function)가 아니다. 시퀀스 빌더 내부에 중단점이 포함되어 있으면 해당 값을 기다리는 스레드는 블로킹된다. 이에 따라 시퀀스를 잘못 사용하면 성능 저하나 예기치 않은 동작이 발생할 수 있어, 중단 함수를 사용할 수 없다는 제약사항이 존재한다.
반면 Flow의 빌더와 연산은 모두 중단 함수로 구성되어 있으며, *구조화된 동시성과 적절한 예외 처리를 지원한다.
import kotlinx.coroutines.delay
...
fun getSequence(): Sequence<Int> = sequence {
repeat(3) {
yield(it)
delay(1000) // 컴파일 오류
}
}
import kotlinx.coroutines.delay
...
fun getFlow(): Flow<Int> = flow {
repeat(3) {
emit(it)
delay(1000)
}
}
Flow는 크게 플로우 빌더, 중간 연산자, 최종 연산자로 구성된다. 빌더로 Flow를 만들고 중간 연산자를 이용해 값을 가공한 뒤, 최종 연산자로 수집하거나 결과를 얻을 수 있다.
코틀린 코루틴 P. 266
Flow 빌더는 코루틴에서 Flow를 만드는 함수들이다. 기본적인 방법으로는 flowOf(…) , asFlow() , flow { … } , channelFlow { … } , MutableSharedFlow/MutableStateFlow 가 있다.
flowOf(…)는 정해진 값들로 간단하게 Flow를 만들 때 사용한다.
fun main() = runBlocking {
flowOf(1, 2, 3)
.collect { print(it) } // 123
}
flowOf()의 특징은 직접 값을 내보내지(emit()) 않아도 된다는 것이다. 내부 구현을 보면 이러한 점이 잘 드러난다.
flowOf(...) 내부 구현
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element) // emit()을 이용해 내부적으로 값을 내보냄
}
}
public fun <T> flowOf(value: T): Flow<T> = flow {
emit(value) // emit()을 이용해 내부적으로 값을 내보냄
}
asFlow() 는 Iterable, Iterator, Sequence를 flow로 변환한다.
fun main() = runBlocking {
listOf(1, 2, 3)
.asFlow()
.collect { println(it) } // 123
}
fun main() = runBlocking {
sequenceOf(1, 2, 3)
.asFlow()
.collect { print(it) } // 123
}
asFlow() 역시 flowOf() 와 마찬가지로 직접 값을 내보내지 않아도 된다. 이러한 특징은 내부 구현을 통해 확인할 수 있다.
asFlow() 내부 구현
public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
public fun <T> Sequence<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
flow { … } 는 플로우를 만들 때 가장 많이 사용되는 방법이다. 직접 값을 emit() 해서 임의의 flow를 생성할 수 있다.
fun main() = runBlocking {
flow {
emit(1)
emit(2)
emit(3)
}.collect { print(it) } // 123
}
flow { ... } 내부 구현
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
send 함수에 대한 잠재적으로 동시적인 호출로부터 임의의 flow를 생성한다.
이 설명만으로는 이해가 잘되지 않는다. 먼저 코드부터 살펴보자!
fun main() = runBlocking {
channelFlow {
send(1) // 원소를 생성하려면 emit 대신 send 사용
send(2)
send(3)
}.collect { print(it) } // 123
}
위 코드만 봐서는 원소를 생성할 때 emit 대신 send를 사용하는 것 외에는 flow { … }와 channelFlow { … }의 차이가 크게 느껴지지 않는다. 하지만 내부에서 별도의 코루틴을 실행해야 하는 상황에 사용하면 두 빌더의 차이점이 명확히 드러난다.
fun main() = runBlocking {
flow {
emit(1)
print(" [context 변경] ")
withContext(Dispatchers.IO) {
// 또는 launch {
emit(2)
emit(3)
}
}.collect { print(it) } // 1 출력 후 IllegalStateException 발생
}
flow 빌더 결과
1 [context 변경] Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: …
fun main() = runBlocking {
channelFlow {
send(1)
print(" [context 변경] ")
withContext(Dispatchers.IO) {
// 또는 launch {
send(2)
send(3)
}
}.collect { print(it) } // 1 출력 후 23도 정상적으로 출력
}
channelFlow 빌더 사용 결과
1 [context 변경] 23
flow 빌더를 사용했을 때는 Flow 불변성이 위반되었다는 IllegalStateException이 발생한다. 반면 channelFlow 빌더는 예외가 발생하지 않는다. 이 이유 역시 내부 구현을 살펴보면 금방 알 수 있다.
flow { ... } 내부 구현
public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
ChannelFlowBuilder(block)
public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
public val channel: SendChannel<E>
}
channelFlow 빌더는 ProducerScope에서 동작하는 데, ProducerScope는 Coroutinescope를 상속받고 있기 때문에 새로운 코루틴을 시작할 수 있다.
따라서 channelFlow는 코루틴 스코프를 생성하여 launch와 같은 코루틴 빌더를 직접 시작하거나 context를 변경할 수 있다. 반면 flow는 코루틴 빌더가 필요로 하는 스코프를 만들지 못하기 때문에 IllegalStateException이 발생한다. 다른 코루틴처럼 channelFlow도 모든 자식 코루틴이 종료 상태가 될 때까지 종료되지 않는다.
다시 channelFlow의 정의를 살펴보면, 이제 좀 더 쉽게 이해할 수 있을 것이다.
send 함수에 대한 잠재적으로 동시적인 호출로부터 임의의 flow를 생성한다.
channelFlow는 내부에서 여러 코루틴을 실행하여 동시에 값을 보낼 수 있다. 이러한 특성 때문에 동시성이 필요한 복잡한 비동기 작업에 유용하다.
MutableSharedFlow, MutableStateFlow
MutableStateFlow와 MutableSharedFlow에 대해서는 이번 글에서는 간단히 언급만 하고, 다음 포스팅에서 자세히 다룰 것이다.
직접 값을 업데이트할 수 있는 Hot Flow이다.
map { ... }, filter { ... }, take()와 같은 중간 연산자를 활용하면 최종 연산 전에 데이터를 원하는 형태로 가공할 수 있다. 많은 연산자가 Collection의 중간 연산자와 유사하다. 중간 연산자의 종류가 워낙 다양하기 때문에, 이 글에서는 모든 확장 함수를 하나하나 다루지는 않을 것이다.
flow의 다양한 연산자들은 공식문서에서 확인할 수 있다.
flow는 최종 연산자를 사용해야 값을 수집하거나 결과를 얻을 수 있다. 최종 연산자로는 가장 기본적인 연산자 collect()와 toSet(), toList(), first() 등이 있다.
Flow는 지연 연산(Lazy Evaluation)을 하므로, 최종 연산자가 호출되기 전까지는 실제로 아무 작업도 수행되지 않는다. 따라서 Flow를 사용할 때는 반드시 collect와 같은 최종 연산자를 호출해야 값이 처리된다.
val flow = flow {
print("시작 ")
emit(1)
emit(2)
emit(3)
print(" 종료")
}
fun main(): Unit = runBlocking {
flow.map { print(it) } // 아무것도 출력되지 않음
}
fun main() = runBlocking {
flow.collect { print(it) } // 시작 123 종료
}
이번 편에서는 Flow란 무엇이며, 어떻게 구성하는지 알아보았다. 다음 편에는 안드로이드 개발에서 상태 관리를 위해 자주 쓰이는 StateFlow와 SharedFlow에 대해 더 자세히 알아보겠다.