코틀린에서 suspend 함수는 결과를 반환하지 않거나 단일 값을 반환하는 태스크를 수행하는 데만 유용하다는 단점이 있다. 코틀린 플로는 이를 보완하여 코루틴 기반 태스크로부터 얻은 순차적인 결과 스트림을 반환하는 기능을 제공한다.
플로는 코루틴 기반의 비동기 태스크들로부터 순차적으로 여러값을 반환할 수 있는 기능. 플로는 생산자, 중재자, 소비자로 구성된다. 생산자가 네트워크 커넥션으로부터 데이터 스트림을 꺼내어 플로로 방출하면 소비자는 플로 스트림의 반대편에 끝에 위치하며 생산자가 방출한 데이터를 수집한다. 중재자는 생산자와 소비자 사이에 위치하여 스트림을 필터링하거나 데이터에 추가적인 처리를 수행한다.

실제 플로를 사용할 때는 여러 생산자와 하나의 소비자, 하나의 생산자와 여러 소비자로 구성하는 등 다양한 상황이 존재할 수 있다.
이 프로젝트에서 플로는 하나의 뷰 모델 클래스 안에 위치한다. FlowDemoViewModel을 선언하고 MainActivity를 수정한다.
// FlowDemoViewModel.kt
class FlowDemoViewModel: ViewModel() {
}
//MainActivity.kt
import androidx.lifecycle.viewmodel.compose.viewModel
@Composable
fun ScreenSetUp(viewModel: FlowDemoViewModel = viewModel()) {
// viewModel()을 호출하면 알아서 FlowDemoViewModel이 생성됨
MainScreen()
}
@Composable
fun MainScreen() {
}
각 플로는 한 가지 타입의 데이터만 방출할 수 있으며, 이는 플로 선언 시 명시해야 한다. 다음은 String 기반 데이터를 스트리밍하는 Flow 인스턴스를 선언한 코드 예시다
Flow<String>
플로를 선언할 때는 데이터 스트림을 생성하는 코드를 해당 선언에 할당해야 한다. 이 코드를 생산자 블록이라 부른다. 이 작업은 flow() 빌더를 호출해서 수행하며, 이 빌더는 생산자 블록 코드를 포함한 코루틴 블록을 파라미터로 받는다. FlowDemoViewModel 파일을 다음과 같이 수정한다.
class FlowDemoViewModel: ViewModel() {
val myFlow: Flow<Int> = flow {
}
}
위의 코드는 하나의 정숫값 스트림을 방출하는 myFlow라는 이름의 플로를 방출한다. 위의 빌더 대신 flowOf() 빌더를 사용해 고정된 값의 집합을 하나의 플로로 변환할 수 있다.
val myFlow2 = flowOf(2, 4, 6, 8)
또한 asFlow() 확장 기능을 사용하면 포함된 데이터를 하나의 플로로 변환할 수 있다.
val myArrayFlow = arrayOf<String>("Red", "Green", "Blue").asFlow()
flowOf()와 asFlow() 빌더는 소비자가 수집을 시작하는 즉시 자동으로 데이터를 방출한다. 그러나 flow 빌더는 각 값이 사용 가능해졌을 때 emit() 함수를 호출하고 스트리밍할 값을 인자로 전달하는 형식으로 직접 방출해야 한다.
class FlowDemoViewModel: ViewModel() {
val myFlow: Flow<Int> = flow {
for (i in 0..9) {
emit(i)
delay(2000)
}
}
}
소비자 안에서 데이터를 수집할 때는 collect() 메서드를 호출하여 수집할 수 있다. 이 방법보다 덜 유연하지만 훨씬 편리한 방법으로, 플로 인스턴스에 대해 collectAsState() 함수를 호출해서 플로를 상태로 변환하여 데이터를 다룰 수 있다.
@Composable
fun ScreenSetUp(viewModel: FlowDemoViewModel = viewModel()) {
MainScreen(viewModel.myFlow)
}
@Composable
fun MainScreen(flow: Flow<Int>) {
val count by flow.collectAsState(initial = 0)
Column(
modifier = Modifier.fillMaxSize(),
verticalArrangement = Arrangement.Center,
horizontalAlignment = Alignment.CenterHorizontally
) {
Text(text = "$count", style = TextStyle(fontSize = 40.sp))
}
}
@Preview(showBackground = true)
@Composable
fun GreetingPreview6() {
MyApplicationTheme {
ScreenSetUp(viewModel())
}
}
위의 코드를 실행할 경우 count 값이 플로로 방출될 때마다 Text 컴포넌트에 표시되는 카운트값이 달라지는 것을 확인할 수 있다.

앞의 예시에서는 아무런 처리 없이 데이터를 소비자에게 전달했지만 중재자를 거쳐서 데이터를 가공하고 전달할 수도 있다. 다음은 map() 메서드를 사용하여 데이터를 가공하는 예시다.
class FlowDemoViewModel: ViewModel() {
val myFlow: Flow<Int> = flow {
for (i in 0..9) {
emit(i)
delay(2000)
}
}
//map() 메서드를 사용하여 데이터 가공
val newFlow = myFlow.map {
"Current value = $it"
}
}
//MainActivity.kt
@Composable
fun ScreenSetUp(viewModel: FlowDemoViewModel = viewModel()) {
MainScreen(viewModel.newFlow)
}
@Composable
fun MainScreen(flow: Flow<String>) {
val count by flow.collectAsState(initial = 0)
Column(
modifier = Modifier.fillMaxSize(),
verticalArrangement = Arrangement.Center,
horizontalAlignment = Alignment.CenterHorizontally
) {
Text(text = "$count", style = TextStyle(fontSize = 40.sp))
}
}
위의 코드를 실행하면 플로가 값을 방출할 때마다 Text 컴포넌트가 업데이트되는 것을 알 수 있다.

map() 메서드는 수집된 값마다 변환을 수행한다. filter() 메서드를 사용하면 값 전체를 대상으로 하여 제어할 수 있다. filter 코드는 Boolean 값을 반환하는 표현식을 포함해야 하고 참으로 평가될 때만 통과된다. 다음은 홀수를 필터링하는 코드이다.
val newFlow = myFlow
.filter {
it % 2 == 0
}
.map {
"Current value = $it"
}

transform() 메서드는 map()과 유사하지만 좀 더 유연하게 사용할 수 있으며 여러 값을 방출할 수도 있다.
val newFlow = myFlow
.transform {
emit("Value = $it")
delay(1000)
val doubled = it * 2
emit("Value doubled = $doubled")
}

collectAsState 함수를 사용하더라도 내부적으로는 collect() 함수를 사용해 데이터 컬렉션을 초기화한다. collectAsState()가 대부분 잘 동작하지만 이 메서드를 사용할 수 없는 경우가 존재하며 이럴 때는 collect() 메서드를 직접 호출해야 한다. collect() 메서드는 종단 플로 연산자 중 하나이다.
종단 플로 연산자들은 코루틴 스코프 안에서만 호출할 수 있다. 컴포저블 함수 내에서 안전하게 코루틴을 실행하기 위해 LaunchedEffect 함수를 구현하더라도 가장 최근 값을 저장할 수 있는 뮤터블 상태가 있어야 한다.
@Composable
fun MainScreen(flow: Flow<String>) {
var count by remember { mutableStateOf("Current value =") }
LaunchedEffect(Unit) {
flow.collect {
count = it
}
}
Column(
modifier = Modifier.fillMaxSize(),
verticalArrangement = Arrangement.Center,
horizontalAlignment = Alignment.CenterHorizontally
) {
Text(text = "$count", style = TextStyle(fontSize = 40.sp))
}
}
스트림이 종료되는 시점에 실행되는 코드를 추가할 때는 try/finally 구조를 사용해 실행하면 된다.
LaunchedEffect(Unit) {
try {
flow.collect {
count = it
}
} finally {
count = "Flow stream ended."
}
}

collect() 연산자는 소비자가 이전 값을 처리하고 있을 때 생산자가 새로운 값을 방출하더라도 모든 값을 수집한다. 이는 collect 연산자가 수집되지 않은 값들을 버리지 않기 때문이다. 이는 플로 안의 데이터 유실을 피하기 위해 필수적이다. 하지만 이러한 동작이 불필요한 경우도 존재할 수 있으며 용도에 따라 다른 방식으로 동작하는 연산자들이 존재한다.
만약 생산자에서 데이터를 방출할 때마다 추가적인 2초의 지연이 있고, 소비자에서 이를 처리하는 데 추가로 1초의 시간이 필요하다고 하자. 해당 플로 안에서 10개의 값을 처리하는 데에만 30초 가량의 시간이 필요하다.
생산자는 소비자가 다음 값을 처리하는 것을 기다리고 있기 때문에 비효율적이다. 생산자가 소비자의 처리 여부를 기다리지 않고 중간값을 유실해도 큰 문제가 없다면 collecLatest 또는 conflate 연산자를 사용할 수 있다. 방출되는 모든 값을 수집하면서 처리 속도를 높이고 싶을 때 buffer를 사용한다.
buffer 연산자는 값들이 방출되면 모아두었다가 소비자가 처리할 준비가 되면 하나씩 전달하기 때문에 생산자가 소비자를 기다리 필요도 없고 모든 값이 수집됨을 보장할 수 있다. 다음은 buffer를 사용하는 예시이다.
LaunchedEffect(Unit) {
val elapsedTime = flow
.buffer()
.collect {
count = it
delay(1000)
}
count = "Duration = $elapsedTime"
}
위처럼 실행하면 30초가 걸리던 일이 20초로 줄어들 것이다.
플로 평탄화는 어떤 태스크 자체가 하나 이상의 플로를 생성하면 플로의 플로가 생성되는 상황이 발생하고 이 스트림들을 단일 스트림으로 평탄화하는 것을 말한다.
val myFlow: Flow<Int> = flow {
for (i in 1..5) {
delay(1000)
emit(i)
}
}
fun doubleIt(value: Int) = flow {
emit(value)
delay(1000)
emit(value + value)
}
위의 코드에서 myFlow 스트림의 각 값에 대해 doubleIt()를 호출하면 각 값에 별도의 플로가 생성된다. 이 문제는 flatMapConcat() 연산자를 사용해 doubleIt() 스트림을 하나의 플로로 연결해서 해결할 수 있다.
var count by remember { mutableStateOf(0) }
LaunchedEffect(Unit) {
viewModel.myFlow
.flatMapConcat { viewModel.doubleIt(it) }
.collect { count = it }
}
flatMapConcat은 이전 Flow가 완료되기 전까지는 다음 Flow를 시작하지 않는다
zip()과 combine() 연산자를 사용해 여러 플로를 단일 플로로 조합할 수 있다.
@Composable
fun MainScreen(viewModel: FlowDemoViewModel) {
var count by remember { mutableStateOf("") }
LaunchedEffect(Unit) {
val flow1 = (1..5).asFlow()
.onEach { delay(1000) }
val flow2 = flowOf("one", "two", "three", "four")
.onEach { delay(1500) }
flow1.zip(flow2) { value, string -> "$value, $string" }
.collect{ count = it }
}
Column(
modifier = Modifier.fillMaxSize(),
verticalArrangement = Arrangement.Center,
horizontalAlignment = Alignment.CenterHorizontally
) {
Text(text = "$count", style = TextStyle(fontSize = 40.sp))
}
}
zip() 연산자는 두 플로 모두가 새로운 값을 방출한 뒤 수집을 수행한다. combine() 연산자는 두 플로 중 한 플로가 새로운 값을 방출할 때, 다른 플로가 새로운 값을 방출하지 않으면 가장 최근에 방출한 이전 값을 사용해 처리한다.
//zip()에서의 출력값
1, one
2, two
3, three
4, four
//combine()에서의 출력값
1, one
2, one
3, one
3, two
4, two
4, three
5, three
5, four
스테이트플로는 카운터, 토글 버튼, 슬라이더 등의 현재 설정과 같은 앱 안에서의 상태 변화를 관찰하는 데 사용된다. 각 StateFlow 인스턴스는 시간에 다라 변경되는 단일값을 저장하고 변경이 발생되면 모든 소비자에게 알려, 상탯값의 변경을 감지하는 코드를 작성하지 않아도 된다. 이는 LiveData와 유사하지만 LiveData는 라이프사이클을 인식하며 초깃값이 필요하지 않다는 차이가 있다.
StateFlow 스트림을 생성할 때는 먼저 MutableStateFlow의 인스턴스를 만들고 필수 초깃값을 전달한다. 이 변수는 앱 코드 안에서 현재 상탯값을 변경하는 데 사용된다. 그 후에 해당 인스턴스에 대해 asStateFlow()를 호출해서 StateFlow로 변환한다. 해당 플로가 활성화되면 상태를 소비할 수 있는데, 일반적으로 StateFlow에서는 collectLatest() 연산자를 사용해 상태를 수집하는 것을 권장한다.
class FlowDemoViewModel: ViewModel() {
private val _stateFlow = MutableStateFlow(0)
val stateFlow = _stateFlow.asStateFlow()
fun increaseValue() {
_stateFlow.value += 1
}
}
@Composable
fun MainScreen(viewModel: FlowDemoViewModel) {
val count by viewModel.stateFlow.collectAsState()
Column(
modifier = Modifier.fillMaxSize(),
verticalArrangement = Arrangement.Center,
horizontalAlignment = Alignment.CenterHorizontally
) {
Text(text = "$count", style = TextStyle(fontSize = 40.sp))
Button(onClick = { viewModel.increaseValue() }) {
Text("Click Me")
}
}
}
위의 코드를 실행시키면 버튼을 클릭할 때마다 count 값이 증가하는 것을 확인할 수 있다.
class FlowDemoViewModel: ViewModel() {
private val _sharedFlow = MutableSharedFlow<Int>(
replay = 10,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val sharedFlow = _sharedFlow.asSharedFlow()
}
위의 코드에서 replay 값을 10으로 설정하면 과거 10개의 값을 받은 뒤 새로운 값을 받는다. 그리고 DROP_OLDEST으로 설정하여 10개보다 많은 값이 버퍼되면 가장 오래된 값부터 무시하도록 설정되어 있다. 버퍼에 관련된 다른 옵션은 DROP_LATEST(최근 값을 무시), SUSPEND(버퍼가 가득 차면 플로를 중지)가 있다.
값들은 코루틴 안에서 MutableSharedFlow 인스턴스의 emit() 메서드를 호출해서 스트림으로 방출된다.
// FlowDemoViewModel.kt
fun startSharedFlow() {
viewModelScope.launch {
for (i in 1..5) {
_sharedFlow.emit(i)
delay(2000)
}
}
}
//MainActivity.kt
@Composable
fun MainScreen(viewModel: FlowDemoViewModel) {
val count by viewModel.sharedFlow.collectAsState(initial = 0)
Column(
modifier = Modifier.fillMaxSize(),
verticalArrangement = Arrangement.Center,
horizontalAlignment = Alignment.CenterHorizontally
) {
Text(text = "$count", style = TextStyle(fontSize = 40.sp))
Button(onClick = { viewModel.startSharedFlow() }) {
Text("Click Me")
}
}
}
앱을 실행해서 버튼을 여러번 클릭해 여러 플로로부터 값들을 수집할 수 있는 것을 확인할 수 있다. 이는 새로운 값들은 하나의 코루틴 내부로부터 방출되므로 가능한 것이다. 마지막으로 셰어드플로에서 현재 구독자 수를 MutableSharedFlow 인스턴스의 subscribtionCount 프로퍼티를 통해 얻을 수 있다.
플로 타입을 사용해 선언된 스틀림을 콜드 플로라 부른다. 생산자 내부의 코드는 소비자가 값의 수집을 시작해야만 실행되기 시작한다.
스테이트플로와 셰어드플로는 핫 플로라 부른다. 생산자는 소비자가 값을 수집하는지에 관계없이 즉시 값을 방출하기 시작한다.
콜드 플로는 여러 컬렉터를 갖지 못하지만 핫 플로는 여러 컬렉터가 같은 플로에 대한 접근을 공유한다.
콜드 플로는 shareIn() 함수를 호출해 핫 플로로 만들 수 있다. 코루틴 스코프 내에서 실행하야 하며 사용할 수 있는 시작 정책 옵션은 다음과 같다.