[Kotlin in Action 2/e] 16장 플로우

왕왕조현·2026년 2월 19일

Kotlin in Action 2/e

목록 보기
16/18
post-thumbnail

안녕하세요!

플로우에 대한 정리글로 돌아온 개발자 꿈나무 김조현입니다.

이번 글에서는 플로우가 무엇인지, 플로우가 어떻게 구성되어 있는지 등에 대해 정리해보겠습니다.


플로우란?

플로우는 시간이 지남에 따라 나타나는 값과 작업할 수 있게 해주는 코루틴 기반의 추상화입니다. 플로우는 점진적인 로딩, 이벤트 스트림 작업, 구독 스타일 API를 모델링하는 데 사용할 수 있는 범용적인 추상화입니다.

플로우를 사용하면 시간이 지남에 따라 나타나는 여러 값을 다루는 상황에서 코틀린의 동시성 메커니즘을 활용할 수 있습니다.

플로우를 사용할 때는 flow 빌더 함수를 사용합니다. 플로우에 원소를 추가하려면 emit을 호출합니다. 빌더 함수 호출 후에는 collect 함수를 사용해 플로우의 원소를 순회할 수 있습니다.

import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.flow.*
import kotlin.time.Duration.Companion.milliseconds

fun createValues(): Flow<Int> {
	return flow {
		emit(1)
		delay(1000.milliseconds)
		emit(2)
		delay(1000.milliseconds)
		emit(3)
		delay(1000.milliseconds)
	}
}

fun main() = runBlocking {
	val myFlowOfValues = createValues()
	myFlowOfValues.collect{ log(it) }
}

// 120 [main @coroutine#1] 1
// 1137 [main @coroutine#1] 2
// 2137 [main @coroutine#1] 3

출력된 타임스탬프를 보면 원소가 배출되는 즉시 표시된다는 것을 알 수 있습니다. 즉, 모든 값을 계산할 때까지 기다릴 필요가 없는 것입니다.


플로우의 유형은?

플로우는 콜드 플로우와 핫 플로우라는 2가지 카테고리로 나뉩니다.

  • 콜드 플로우는 비동기 데이터 스트림으로, 값이 실제로 소비되기 시작할 때만 값을 배출합니다.

  • 핫 플로우는 값이 실제로 소비되고 있는지와 상관없이 값을 독립적으로 배출하며, 브로드캐스트 방식으로 동작합니다.


콜드 플로우란?

콜드 플로우는 flow라는 빌더 함수를 사용하여 생성할 수 있습니다. 빌더 함수 블록 안에서는 emit 함수를 호출해 플로우의 수집자에게 값을 제공하고, 수집자가 해당 값을 처리할 때가지 빌더 함수의 실행을 중단합니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.Duration.Companion.milliseconds

fun main() {
	val letters = flow {
		log("Emitting A!")
		emit("A")
		delay(200.milliseconds)
		log("Emitting B!")
		emit("B")
	}
}

이 코드를 동작시키면 아무런 출력도 나타나지 않습니다. 이는 빌더 함수가 연속적인 값의 스트림을 표현하는 Flow<T> 타입의 객체를 반환하기 때문입니다. 이 플로우는 처음에 비활성 상태이며, 최종 연산자가 호출돼야만 빌더에서 정의된 계산이 시작됩니다.

기본적으로 수집되기 시작할 때까지 비활성 상태입니다.

또한 빌더 함수 안의 코드는 플로우가 수집될 때만 실행되므로 시퀀스와 마찬가지로 무한 플로우를 정의하고 반환해도 괜찮습니다.

val counterFlow = flow {
	var x = 0
	while(true) {
		emit(x++)
		delay(200.milliseconds)
	}
}

콜드 플로우를 수집하는 작업은?

Flow에 대해 collect 함수를 호출하면 그 로직이 실행됩니다. 플로우를 수집하는 코드를 수집자라고 부르는데, collect를 호출할 때 플로우에서 배출된 각 원소에 대해 호출될 람다를 제공할 수 있습니다.

플로우를 수집할 때는 플로우 내부의 일시 중단 코드를 실행하므로 collect는 일시 중단 함수이며, 플로우가 끝날 때까지 일시 중단됩니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.Duration.Companion.milliseconds

val letters = flow {
	log("Emitting A!")
	emit("A")
	delay(200.milliseconds)
	log("Emitting B!")
	emit("B")
}

fun main() = runBlocking {
	letters.collect {
		log("Collecting $it")
		delay(500.milliseconds)
	}
}

// 132 [main @coroutine#1] Emitting A!
// 137 [main @coroutine#1] Collecting A
// 852 [main @coroutine#1] Emitting B!
// 852 [main @coroutine#1] Collecting B

출력의 타임스탬프를 보면 수집자가 플로우의 로직을 실행하는 책임이 있습니다. 원소 A와 B 사이의 지연 시간은 약 700밀리초입니다. 이는 수집자가 플로우 빌더에 정의된 로직의 실행을 촉발해서 첫 번째 배출을 발생시키고, 수집자와 연결된 람다가 호출되면서 메시지를 기록하고 500밀리초 동안 지연됩니다. 그 후 플로우 람다가 계속 실행되며 200밀리초 동안 추가 지연과 배출이 발생합니다.

콜드 플로우에서 collect를 여러 번 호출하면 그 코드가 여러 번 실행됩니다.

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*
import kotlin.time.Duration.Companion.milliseconds

fun main() = runBlocking {
	letters.collect {
		log("(1) Collecting $it")
		delay(500.milliseconds)
	}
	letters.collect {
		log("(2) Collecting $it")
		delay(500.milliseconds)
	}
}

// 126 [main @coroutine#1] Emitting A!
// 135 [main @coroutine#1] (1) Collecting A
// 845 [main @coroutine#1] Emitting B!
// 846 [main @coroutine#1] (1) Collecting B
// 1347 [main @coroutine#1] Emitting A!
// 1347 [main @coroutine#1] (2) Collecting A
// 2048 [main @coroutine#1] Emitting B!
// 2048 [main @coroutine#1] (2) Collecting B

collect 함수는 플로우의 모든 원소가 처리될 때까지 일시 중단됩니다.


플로우 수집을 취소하기

코루틴을 취소하는 메커니즘은 플로우 수집자에게도 적용됩니다. 수집자의 코루틴을 취소하면 다음 취소 지점에서 플로우 수집이 중단됩니다.

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*
import kotlin.time.Duration.Companion.seconds

fun main() = runBlocking {
	val collector = launch {
		counterFlow.collect {
			println(it)
		}
	}
	delay(5.seconds)
	collector.cancel()
}
// 0
// 1
// ...
// 24

콜드 플로우의 내부 구현은?

코틀린의 콜드 플로우는 일시 중단 함수와 수신 객체 지정 람다를 결합한 조합입니다. 콜드 플로우의 정의는 매우 간단하며 Flow와 FlowCollector라는 2가지 인터페이스만 필요합니다.

interface Flow<T> {
	suspend fun collect(collector: FlowCollector<T>)
}

interface FlowCollector<T> {
	suspend fun emit(value: T)
}

flow 빌더 함수를 사용해 플로우를 정의할 때 제공된 람다의 수신 객체 타입은 FlowCollector입니다. 이 때문에 빌더 안에서 emit 함수를 호출할 수 있고, emit 함수는 collect 함수에 전달된 람다를 호출하기에, 결과적으로 두 람다가 서로 호출하는 구조를 갖습니다.

val letters = flow {
	delay(300.milliseconds)
	emit("A")
	delay(300.milliseconds)
	emit("B")
}

letters.collect { letter ->
	println(letter)
	delay(200.milliseconds)
}

채널 플로우를 사용한 동시성 플로우는?

지금까지 flow 빌더 함수를 사용해 만든 콜드 플로우는 모두 순차적으로 실행되며, 코드 블록은 일시 중단 함수의 본문처럼 하나의 코루틴으로 실행됩니다. 하지만 async와 같은 동시성으로 수행하기 적합하고 병렬로 수행하면 더 좋을 것 같은 코드도 있을 것입니다.

하지만 그렇게 하면 오류가 나타납니다. 이는 콜드 플로우 추상화가 같은 코루틴 안에서만 emit 함수를 호출할 수 있게 허용하기 때문입니다. 이런 상황일 때 여러 코루틴에서 배출을 허용하는 동시성 플로우를 작성하게 해주는 빌더가 필요한데, 이런 플로우를 채널 플로우라고 합니다.

채널 플로우는 channelFlow 빌더 함수로 만들 수 있으며, 콜드 플로우의 특별한 유형입니다.

채널 플로우는 순차적으로 배출을 허용하는 emit 함수를 제공하지 않는 대신 여러 코루틴에서 send를 사용해 값을 제공할 수 있습니다. 플로우의 수집자는 여전히 값을 순차적으로 수신하며, collect 람다가 그 작업을 수행합니다.

import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.launch

suspend fun getRandomNumber(): Int {
	delay(500.milliseconds)
	return Random.nextInt()
}

val randomNumbers = channelFlow {
	repeat(10) {
		launch {
			send(getRandomNumber())
		}
	}
}

fun main() = runBlocking {
	randomNumbers.collect {
		log(it)
	}
}

// 668 [main @coroutine#1] -1309714929
// 670 [main @coroutine#1] 481147455
// 670 [main @coroutine#1] 595734353
// 670 [main @coroutine#1] 1738511056
// 670 [main @coroutine#1] 1680382856
// 671 [main @coroutine#1] 468100645
// 671 [main @coroutine#1] -621640135
// 671 [main @coroutine#1] -2125216273
// 671 [main @coroutine#1] -1894456128
// 671 [main @coroutine#1] -1665117038

채널 플로우를 사용하면 동시적으로 실행되며 전체 실행 시간이 줄어드는 것을 확인할 수 있습니다.

일반적인 콜드 플로우와 채널 플로우 중 어떤 것을 쓸지 결정할 때는 플로우 안에서 새로운 코루틴을 시작하는 경우에만 채널 플로우를 선택하고, 그렇지 않으면 일반적인 콜드 플로우를 선택하는 편이 더 낫습니다.


핫 플로우

핫 플로우에서는 각 수집자가 플로우 로직 실행을 독립적으로 촉발하는 대신, 여러 구독자라고 불리는 수집자들이 배출된 항목을 공유합니다. 이는 시스템에서 이벤트나 상태 변경이 발생해서 수집자가 존재하는지 여부에 상관없이 값을 배출해야 하는 경우에 적합합니다.

핫 플로우는 항상 활성 상태이기 때문에 구도갖의 유무에 관계없이 배출이 발생할 수 있습니다. 코루틴에는 2가지 핫 플로우 구현이 기본적으로 제공됩니다.

  • 공유 플로우는 값을 브로드캐스트하기 위해 사용됩니다.
  • 상태 플로우는 상태를 전달하는 특별한 경우에 사용됩니다.

공유 플로우는?

공유 플로우는 구독자가 존재하는지 여부에 상관없이 배출이 발생하는 브로드캐스트 방식으로 동작합니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.random.*
import kotlin.time.Duration.Companion.milliseconds

class RadioStation {
	private val _messageFlow = MutableSharedFlow<Int>()
	val messageFlow = _messageFlow.asSharedFlow()
	
	fun beginBroadcasting(scope: CoroutineScope) {
		scope.launch {
			while(true) {
				delay(500.milliseconds)
				val number = Random.nextInt(0..10)
				log("Emitting $number!")
				_messageFlow.emit(number)
			}
		}
	}
}

핫 플로우를 만들 때는 플로우 빌더를 사용하는 대신 가변적인 플로우에 대한 참조를 얻습니다. 배출이 구독자 유무와 관계없이 발생하므로 당사자가 실제 배출을 수행하는 코루틴을 시작할 책임이 있습니다. 이는 별다른 어려움 없이 여러 코루틴에서 가변 공유 플로우에 값을 배출할 수 있다는 뜻이기도 합니다.

RadioStation 클래스의 인스턴스를 생성하고 beginBroadcasting 함수를 호출하면 구독자가 없어도 브로드캐스트가 즉시 시작됩니다.

fun main() = runBlocking {
	RadioStation().beginBroadcasting(this)
}

구독자를 추가하는 방법은 콜드 플로우를 수집하는 것과 동일하게 collect를 호출하면 됩니다. 배출이 발생할 때마다 제공한 람다가 실행되지만 구독자는 구독 시작 이후에 배출된 값만 수신한다는 점을 유의해야 합니다.

fun main() = runBlocking {
	val radioStation = RadioStation()
	radioStation.beginBroadcasting(this)
	delay(600.milliseconds)
	radioStation.messageFlow.collect {
		log("A collecting $it!")
	}
}

공유 플로우는 브로드캐스트 방식으로 작동하기 때문에 구독자를 추가해서 이미 존재하는 messageFlow의 배출을 수신할 수 있습니다. 하지만 공유 플로우 구독자는 구독을 시작한 이후에 배출된 값만 수신합니다. 구독자가 구독 이전에 배출된 원소도 수신하기를 원한다면 MutableSharedFlow를 생성할 때 replay 파라미터를 사용해 새 구독자를 위해 제공할 값의 캐시를 설정할 수 있습니다.

private val _messageFlow = MutableSharedFlow<Int>(replay = 5)

이렇게 바꾸고 나면 600밀리초가 지난 다음에 수집자를 시작하더라도 구독 직전에 발생한 최대 5개의 값을 수신할 수 있습니다.


콜드 플로우를 공유 플로우로 전환하기

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.random.*
import kotlin.time.Duration.Companion.milliseconds

fun querySensor(): Int = Random.nextInt(-10..30)

fun getTemperatures(): Flow<Int> {
	return flow {
		while(true) {
			emit(querySensor())
			delay(500.milliseconds)
		}
	}
}

이 코드는 500밀리초 간격으로 수집되는 값의 스트림을 제공하는 함수입니다. 이 함수를 여러 번 호출하려 한다면 각 수집자가 센서에 독립적으로 질의를 하게 됩니다.

fun celsiusToFahrenheit(celsius: Int) = celsius * 9.0 / 5.0 + 32.0

fun main() {
	val temps = getTemperatures()
	runBlocking {
		launch {
			temps.collect {
				log("$it Celsius")
			}
		}
		launch {
			temps.collect {
				log("${celsiusToFahrenheit(it)} Fahrenheit")
			}
		}
	}
}

이런 경우 반환된 플로우를 두 수집자가 공유해야하며, 이들 무도 같은 원소를 받아야 합니다.

shareIn 함수를 사용하면 주어진 콜드 플로우를 한 플로우인 공유 플로우로 변환할 수 있습니다. 이 변환은 플로우의 코드가 실행되게 하므로 shareIn을 코루틴 안에서 호출해야 합니다.

shareIn은 CoroutineScope 타입의 scope 파라미터와 플로우가 실제로 언제 시작돼야 하는지 정의하는 started 파라미터를 사용합니다.

started는 아래와 같은 여러 가지 다른 동작을 지정할 수 있습니다.

  • Eagerly는 플로우 수집을 즉시 시작합니다.
  • Lazily는 첫 번째 구독자가 나타나야만 수집을 시작합니다.
  • WhileSubscribed는 첫 번째 구독자가 나타나야 수집을 시작하고, 마지막 구독자가 사라지면 플로우 수집을 취소합니다.
fun main() {
	val temps = getTemperatures()
	runBlocking {
		val sharedTemps = temps.shareIn(this, SharingStarted.Lazily)
		launch {
			sharedTemps.collect {
				log("$it Celsius")
			}
		}
		launch {
			sharedTemps.collect {
				log("${celsiusToFahrenheit(it)} Fahrenheit")
			}
		}
	}
}

상태 플로우는?

상태 플로우는 변수의 상태 변화를 쉽게 추적할 수 있는 공유 플로우의 특별한 버전입니다. 상태 플로우를 생성하는 방법은 공유 플로우를 생성하는 것과 비슷합니다. 클래스의 private 속성으로 MutableStateFlow를 생성하고, 같은 변수의 읽기 전용 StateFlow버전을 노출합니다.

값을 배출하는 emit을 사용하는 대신, 값을 갱신하는 update 함수를 사용합니다.

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*

class ViewCounter {
	private val _counter = MutableStateFlow(0)
	val counter = _counter.asStateFlow()
	fun increment() {
		_counter.update{ it + 1 }
	}
}

fun main() {
	val vc = ViewCounter()
	vc.increment()
	println(vc.counter.value)
	// 1
}

가변 상태 플로우로 표현한 현재 상태를 value 속성으로 접근할 수 있습니다. 이 속성은 일시 중단 없이 값을 안전하게 읽을 수 있게 해줍니다.

또한 공유 플로우처럼 상태 플로우도 collect 함수를 호출해 시간에 따라 값을 구독할 수 있습니다.

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*

enum class Direction{ LEFT, RIGHT }

class DirectionSelector {
	private val _direction = MutableStateFlow(Direction.LEFT)
	val direction = _direction.asStateFlow()
	
	fun turn(d: Direction) {
		_direction.update{ d }
	}
}

fun main() = runBlocking {
	val switch = DirectionSelector()
	launch {
		switch.direction.collect {
			log("Direction now $it")
		}
	}
	delay(200.milliseconds)
	switch.turn(Direction.RIGHT)
	delay(200.milliseconds)
	switch.turn(Direction.LEFT)
	delay(200.milliseconds)
	switch.turn(Direction.LEFT)
}

이 코드를 출력해보면 LEFT라는 인자를 2번 연속으로 전달했음에도 구독자가 한 번만 호출된다는 점을 볼 수 있습니다. 이는 상태 플로우가 동등성 기반 통합을 수행하기 때문입니다. 즉, 값이 실제로 달라졌을 때만 구독자에게 값을 배출한다는 뜻입니다. 이전 값과 새 값이 같으면 배출이 발생하지 않습니다.


콜드 플로우를 상태 플로우로 변환하기

stateIn 함수를 사용해 콜드 플로우를 상태 플로우로 변환할 수 있습니다. 이렇게 하면 원래 플로우에서 배출된 최신 값을 항상 읽을 수 있습니다.

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*
import kotlin.time.Duration.Companion.milliseconds

fun main() {
	val temps = getTemperatures()
	runBlocking {
		val tempState = temps.stateIn(this)
		println(tempState.value)
		delay(800.milliseconds)
		println(tempState.value)
	}
}

마무리입니다!

이번 글에서는 플로우가 무엇인지, 어떻게 나눠지며 사용되는지 등에 대해 정리해봤습니다.

코루틴만 사용했을 때는 delay를 선언해도 여러 값을 순차적으로 가져오는 동작이 되지 않고, 코루틴 주기가 끝난 후에 한 번에 받아오는 것만이 가능했습니다. 하지만 플로우라는 개념을 배우고 데이터를 받는 즉시 불러올 수 있는 것을 알게 되었습니다.

또한 콜드 플로우와 핫 플로우라는 것을 사용하여 하나의 수집자 또는 여러 개의 수집자에게 값을 배출할 수 있어 상황에 따라 유연하게 플로우를 적용할 수 있다는 사실을 배웠습니다.

다음에는 플로우 연산자에 대한 정리글로 돌아오겠습니다.

읽어주셔서 감사합니다!🙂‍↕️

profile
천천히, 꾸준히, 한 걸음씩

0개의 댓글