플로우는 어떤 연산을 실행할지 정의한 것입니다. 중단 가능한 람다식에 몇 가지 요소를 추가한 거라고 생각하면 됩니다. 이 장에서는 람다식을 변환하여 Flow 인터페이스와 flow 빌더를 어떻게 구현하는지 단계별로 보여 줄 것입니다.
간단한 람다식부터 시작해 보겠습니다. 각 람다식은 한 번만 정의되고 여러 번 호출할 수 있습니다.
fun main() {
val f: () -> Unit = {
print("A")
print("B")
print("C")
}
f() // ABC
f() // ABC
}
그럼 내부에 지연이 있는 람다식 suspend를 만들어 보겠습니다. 람다식은 순차적으로 호출되기 때문에, 이전 호출이 완료되기 전에 같은 람다식을 추가적으로 호출할 수 없습니다.
suspend fun main() {
val f: suspend () -> Unit = {
print("A")
delay(1000)
print("B")
delay(1000)
print("C")
}
f()
f()
}
// A
// (1초 후)
// B
// (1초 후)
// C
// A
// (1초 후)
// B
// (1초 후)
// C
람다식은 함수를 나타내는 파라미터를 가질 수 있습니다. 이 파라미터를 emit이라고 해 봅시다. 람다식 f를 호출할 때 emit으로 사용될 또 다른 람다식을 명시해야 합니다.
suspend fun main() {
val f: suspend ((String) -> Unit) -> Unit = { emit ->
emit("A")
emit("B")
emit("C")
}
f { print(it) } // ABC
f { print(it) } // ABC
}
이때 emit은 중단 함수가 되어야 합니다. 함수형이 많이 복잡해진 상태이므로, emit이라는 추상 메서드를 가진 FlowCollector 함수형 인터페이스를 정의해 간단하게 만들어 봅시다. 함수형 인터페이스는 람다식으로 정의할 수 있기 때문에 f 호출을 바꿀 필요가 없습니다.
import kotlin.*
fun interface FlowCollector {
suspend fun emit(value: String)
}
suspend fun main() {
val f: suspend (FlowCollector) -> Unit = {
emit("A")
emit("B")
emit("C")
}
f { print(it) } // ABC
f { print(it) } // ABC
}
람다식을 전달하는 대신에, 인터페이스를 구현한 객체를 만드는 편이 낫습니다. 이때 인터페이스를 Flow라 하고, 해당 인터페이스의 정의는 객체 표현식으로 래핑하면 됩니다.
import kotlin.*
fun interface FlowCollector {
suspend fun emit(value: String)
}
interface Flow {
suspend fun collect(collector: FlowCollector)
}
suspend fun main() {
val builder: suspend FlowCollector.() -> Unit = {
emit("A")
emit("B")
emit("C")
}
val flow: Flow = object : Flow {
override suspend fun collect(
collector: FlowCollector
) {
collector.builder()
}
}
flow.collect { print(it) } // ABC
flow.collect { print(it) } // ABC
}
마지막으로 플로우 생성을 간단하게 만들기 위해 flow 빌더를 정의합니다.
import kotlin.*
fun interface FlowCollector {
suspend fun emit(value: String)
}
interface Flow {
suspend fun collect (collector: FlowCollector)
}
fun flow(
builder: suspend FlowCollector.() -> Unit
) = object : Flow {
override suspend fun collect(collector: FlowCollector) {
collector.builder()
}
}
suspend fun main() {
val f: Flow = flow {
emit("A")
emit("B")
emit("C")
}
f.collect { print(it) } // ABC
f.collect { print(it) } // ABC
}
마지막으러 타입에 상관없이 값을 방출하고 모으기 위해 String을 제네릭 타입으로 바꿉니다.
import kotlin.*
fun interface FlowCollector<T> {
suspend fun emit(value: T)
}
interface Flow<T> {
suspend fun collect(collector: FlowCollector<T>)
}
fun <T> flow(
builder: suspend FlowCollector<T>.() -> Unit
) = object : Flow<T> {
override suspend fun collect(
collector: FlowCollector<T>
) {
collector.builder()
}
}
suspend fun main() {
val f: Flow<String> = flow {
emit("A")
emit("B")
emit("C")
}
f.collect { print(it) } // ABC
f.collect { print(it) } // ABC
}
collect를 호출하면, flow 빌더를 호출할 때 넣은 람다식이 실행됩니다. 빌더의 람다식이 emit을 호출하면 collect가 호출되었을 때 명시된 람다식이 호출됩니다.
Flow는 리시버가 있는 중단 람다식에 비해 훨씬 복잡하다고 여겨집니다. 하지만 플로우의 강력한 점은 플로우를 생성하고, 처리하고, 그리고 감지하기 위해 정의한 함수에서 찾을 수 있습니다.
간단하게 flow 메서드인 map의 구현을 살펴보도록 합시다
이 함수는 새로운 플로우를 만들기 때문에, flow 빌더를 사용합니다. 플로우가 시작되면 래핑하고 있는 플로우를 시작하게 되므로, 빌더 내부에서 collect 메서드를 호출합니다. 원소를 받을 때마다, map은 원소를 변환하고 새로운 플로우로 방출합니다.
fun <T, R> Flow<T>.map(
transformation: suspend (T) -> R
): Flow<R> = flow {
collect {
emit(transformation(it))
}
}
suspend fun main() {
flowOf("A", "B", "C")
.map {
delay(100)
it.lowercase()
}.collect { println(it) }
}
// (1 초 후)
// a
// (1초 후)
// b
// (1초 후)
// c
앞으로 소개할 대부분의 flow 메서드들의 작동 방식도 map과 비슷하게 간단합니다. 어떻게 작동하는지 쉽게 이해할 수 있으며, 비슷한 함수를 작성하는데 도움이 됩니다.
플로우 또한 중단 함수처럼 동기로 작동하기 때문에, 플로우가 완료될 때까지 collect 호출이 중단됩니다. 즉, 플로우는 새로운 코루틴을 시작하지 않습니다. 중단 함수가 코루틴을 시작할 수 있는 것처럼, 플로우의 각 단계에서도 코루틴을 시작할 수 있지만 중단 함수의 기본 동작은 아닙니다.
플로우에서 각각의 처리 단계는 동기로 실행되기 때문에, onEach 내부에서 delay가 있으면 모든 원소가 처리되기 전이 아니 각 원소 사이에 지연이 생깁니다.
suspend fun main() {
flowOf("A","B","C")
.onEach { delay(1000) }
.collect { println(it) }
}
// (1초 후)
// A
// (1초 후)
// B
// (1초 후)
// C
플로우 처리를 통해 좀더 복잡한 알고리즘을 구현할 때는 언제 변수에 대한 접근을 동기화해야 하는지 알아야 합니다.
다음 예제들을 통해 flow의 공유 상태에 대해 살펴보겠습니다.
커스텀 플로우 처리 함수를 구현할 때, 플로우의 각 단계가 동기로 작동하기 때문에 동기화 없이도 플로우 내부에서 변경 가능한 상태를 정의할 수 있습니다.
다음 예제에서는 동기로 작동하기에 결과값이 일정하게 나오는 것을 보여주고 있습니다.
fun Flow<*>.counter() = flow<Int> {
var counter = 0
collect {
counter++
}
// 잠깐 동안 바쁘게 만듭니다.
List(100) { Random.nextLong() }.shuffled().sorted()
emit(counter)
}
}
suspend fun main(): Unit = coroutineScope {
val fl = List(1000) { "$it" }.asFlow()
val f2 = List(1000) { "$it" }.asFlow()
.counter()
launch { println(f1.counter().last()) } // 1000
launch { println(f1.counter().last()) } // 1000
launch { println(f2.last())} // 1000
launch { println(f2.last())} // 1000
}
그러나 플로우 단계의 외부 변수를 추출해서 함수에서 사용하면 해당 값은 동기적으로 작동하지 않습니다.
외부 변수는 같은 플로우가 모으는 모든 코루틴이 공유하게 됩니다. 이런 경우 동기화가 필수이며 플로우 컬렉션이 아니라 플로우에 종속되게 됩니다.
따라서 두개의 코루틴이 병렬로 원소를 세개 되고, f2.last()는 1000이 아니라 2000을 반환하게 됩니다.
fun Flow<*>.counter(): Flow<Int> {
var counter = 0
return this.map {
counter++
// 잠깐 동안 바쁘게 만듭니다
List(100) { Random.nextLong() }.shuffled().sorted()
counter
}
}
suspend fun main(): Unit = coroutineScope {
val fl = List(1_000) { "$it" }.asFlow()
val f2 = List(1_000) { "$it" }.asFlow()
.counter()
launch { println(fl.counter().last()) } // 1000
launch { println(fl.counter().last()) } // 1000
launch { println(f2.1ast()) } // 2000보다 작은 값이 출력됩니다
launch { println(f2.1ast()) } // 2000보다 작은 값이 출력됩니다
}
플로우는 어디선가 시작되어야 합니다. 플로우가 필요한 경우에 따라 플로우를 시작하는 방법은 다양합니다. 이번 장에서는 플로우를 시작하는 중요한 방법 몇가지를 소개하고자 합니다.
플로우를 만드는 가장 간단한 방법은 플로우가 어떤 값을 가져야 하는지 정의하는 flowOf 함수를 사용하는 것입니다.
suspend fun main() {
flowOf(1, 2, 3, 4, 5)
.collect { print(it) } // 12345
}
값이 없는 플로우가 필요한 경우도 있습니다. 이런 경우에는 emptyFlow() 함수를 사용하면 됩니다.
suspend fun main() {
emptyFlow<Int>()
.collect { print(it) } // 아무것도 출력되지 않옴
}
asFlow 함수를 사용해서 Iterable, Iterator, Sequence를 Flow로 바꿀 수 있습니다.
suspend fun main() {
listOf(lr 2f 3, 4, 5)
// 또는 setOf(lr 2, 3, 4, 5)
// 또는 sequenceOf (1, 2, 3, 4, 5)
.asFlow()
.collect { print(it) } // 12345
}
asFlow 함수는 즉시 사용 가능한 원소들의 플로우를 만듭니다. 플로우 처리 함수를 사용해 처리 가능한 원소들의 플로우를 시작할 때 유용합니다.
플로우는 시간상 지연되는 하나의 값을 나타낼 때 자주 사용됩니다. 따라서 중단 함수를 플로우로 변환하는 것 또한 가능합니다. 이때 중단 함수의 결과가 플로우의 유일한 값이 됩니다. 중단 함수를 플로우로 바꾸고 싶다면, 함수형의 확장 함수인 asFlow를 사용할 수 있습니다. 다음 예제에서는 중단 람다식을 Flow로 변환하기 위해 asFlow를 사용합니다.
suspend fun main(){
val function = suspend {
// 중단 함수를 람다식으로 만든 것입니다
delay(1000)
"UserName"
}
function.asFlow()
.collect { println(it) }
}
// (1 초 후)
// UserName
일반 함수를 변경할려면 함수 참조값이 필요합니다. 코틀린에서는 ::를 사용해서 참조할 수 있습니다.
suspend fun getUserName(): String {
delay(1000)
return "UserName"
}
suspend fun main() {
::getUserName
.asFlow()
.collect { println(it) }
}
// (1 초 후)
// UserName
플로우를 만들 때 가장 많이 사용하는 방법은 flow 빌더를 통해 만드는 것입니다. flow 빌더는 시퀀스를 만드는 Sequence 빌더나 채널을 만드는 produce 빌더와 비슷하게 작동합니다. 빌더는 flow 함수를 먼저 호출하고, 람다식 내부에서 emit 함수를 사용해 다음 값을 방출합니다. Channel이나 Flow에서 모든 값을 방출하려면 emitAll을 사용할 수 있습니다.
참고로 emitAll(flow)는 flow.collect { emit(it) }를 줄여 쓴 것입니다.
fun allUsersFlow(
api: UserApi
): Flow<User> = flow {
var page = 0
do {
val users = api.takePage(page++) // 중단 함수
emitAll(users)
} while (!users.isNullOrEmpty())
}
Flow는 콜드 데이터 스트림이므로 필요할 때만 값을 생성합니다. 앞에서 봤던 allUserFlow를 떠올려 보면 사용자 목록의 다름 페이지는 리시버가 필요로 할 때 요청합니다.
data class User(val name: String)
interface UserApi {
suspend fun takePage(pageNumber: Int): List<User>
}
class FakeUserApi : UserApi {
private val users = List(20) { User(MUser$it") }
private val pagesize: Int = 3
override suspend fun takePage(
pageNumber: Int
): List<User> {
delay(1000) // 중단 항수
return users
.drop(pageSize * pageNumber)
.take(pagesize)
}
}
fun allUsersFlow(api: UserApi): Flow<User> = flow {
var page = 0
do {
println("Fetching page $page")
val users = api.takePage( page++) // 중단 함수
emitAll(users.asFlow())
} while (!users.isNullOrEmpty())
}
suspend fun main() {
val api = FakeUserApi()
val users = allUsersFlow(api)
val user = users
.first {
println("Checking $it")
delay (1000) // 중단 함수
it.name == "UserB"
}
println(user)
}
// Fetching page 0
// (1 초 후)
// Checking User(name=User0)
// (1초 후)
// Checking User(name=User1)
// (1 초 후)
// Checking User(name=User2)
// (1초 후)
// Fetching page 1
// (1 초 후)
// Checking User(name=User3)
// (1 초 후)
// User(name=User3)
하지만 원소를 처리하고 있을 때 미리 페이지를 받아오고 싶은 경우가 있습니다. 네트워크 호출을 더 빈번하게 한다는 단점이 있지만 결과를 더 빠르게 얻어올 수 있습니다. 이는 채널의 핫 데이터 스트림의 전형적인 특징입니다. 따라서 채널과 플로우를 합친 형태가 필요합니다.
channelFlow 함수는 플로우처럼 Flow 인터페이스를 구현하기 때문에 플로우가 가지는 특징을 제공합니다. 채널플로우 빌더는 일반 함수이며 (collect와 같은) 최종 연산으로 시작됩니다. 한 번 시작하기만 하면 리시버를 기다릴 필요 없이 분리된 코루틴에서 값을 생성한다는 점엣 채널과 비슷하다고 할 수 있습니다.
따라서 다음 페이지를 얻어오는 동시에 사용자를 확인할 수 있습니다.
fun allUsersFlow(api: UserApi): Flow<User> = channelFlow {
var page = 0
do {
println("Fetching page $page")
val users = api. takePage(page++) // 중단 함수
users?.forEach { send(it) }
} while (!users.isNullOrEmpty())
}
suspend fun main() {
val api = FakeUserApi()
val users = allllsersFlow(api)
val user = users
.first {
printInC'Checking $it")
delay(1000)
it.name == "User3"
}
println(user)
}
// Fetching page 0
// (1 초 후)
// Checking User(name=User0)
// Fetching page 1
// (1 초 후)
// Checking User(name=User1)
// Fetching page 2
// (1 초 후)
// Checking User(name=User2)
// Fetching page 3
// (1 초 후)
// Checking User(name=User3)
// Fetching page 4
// (1초 후)
// User(name=User3)
channelFlow는 ProduceScope에서 작동합니다. ProducerScope는 produce 빌더가 사용하는 것과 같은 타입입니다.
ProducerScope는 CoroutineScope를 구현했기 때문에 빌더에서 새로운 코루틴을 시작할 때 사용할 수 있습니다. 원소를 생성하려면 emit 대신 send를 사용합니다. 채널에 접근해 SendChannel 함수로 직접 조작할 수 있습니다.
interface ProducerScope<in E>: Coroutinescope, SendChannel<E> {
val channel: SendChannel<E>
}
여러 개의 값을 독립적으로 계산해야 할 때 channelFlow를 주로 사용합니다. channelFlow는 코루틴 스코프를 생성해 launch와 같은 코루틴 빌더를 직접 시작할 수 있습니다. flow는 코루틴 빌더가 필요로 하는 스코프를 만들지 못하기 때문에 다음 코드를 실행할 수 없습니다.
fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> =
channelFlow {
launch {
collect { send(it) }
}
other.collect { send(it) }
}
fun <T> contextualFIow(): Flow<T> = channelFlow {
launch(Dispatchers.IO) {
send(computeloValue())
}
launch(Dispatchers.Default) {
send(computeCpuValue())
}
}
다른 코루틴처럼 channelFlow도 모든 자식 코루틴이 종료 상태가 될 때까지 끝나지 않습니다.
callbackFlow는 콜백 함수를 래핑하여 Flow를 반환할 수 있습니다. callbackFlow는 ProducerScope에서 작동합니다. 다음은 콜백을 래핑하는데 유용한 몇가지 함수입니다.
다음은 callbackFlow가 사용되는 전형적인 방법입니다.
fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
val callback = object : Callback {
override fun onNextValue(value: T) {
trySendBlocking(value)
}
override fun onApiError(cause: Throwable) {
cancel(CancellationException("API Error", cause))
}
override fun onCompleted() = channel.close()
api.register(callback)
awaitClose { api.unregister(callback) }
}
만약 awaitClose가 없다면 콜백을 등록하고 나서 코루틴은 곧바로 끝나게 됩니다. callbackFlow 빌더의 실행이 끝나고 기다릴 자식 코루틴이 없다면 코루틴은 종료됩니다. awaitClose를 사용해 코루틴이 종료되는 것을 막을 수 있고, 채널이 닫힐 때까지 어떤 방식으로든 간에 원소를 감지합니다.
플로우는 요청이 한쪽 방향으로 흐르고 요청에 의해 생성된 값이 다른 방향으로 흐르는 파이프라 생각할 수 있습니다. 플로우가 완료되거나 예외가 발생했을 때, 이러한 정보가 전달되어 중간 단계가 종료됩니다. 모든 정보가 플로우로 전달되므로 값, 예외 및 다른 특정 이벤트를 감지할 수 있습니다. 그리고 지금부터, 해당 정보들을 얻을 때 사용하는 메서드들을 살펴보겠습니다.
플로우의 값을 하나씩 받기 위해 onEach 함수를 사용합니다.
suspend fun main() {
flowOf(1, 2, 3, 4)
.onEach { print(it) }
.collect() // 1234
}
onEach 람다식은 중단 함수이며, 원소는 순서대로 처리됩니다. 따라서 onEach에 delay를 넣으면 각각의 값이 흐를 때마다 지연되게 됩니다.
onStart 함수는 최종 연산이 호출될 때와 같이 플로우가 시작되는 경우에 호출되는 리스너를 설정합니다. onStart는 첫 번째 원소가 생성되는 걸 기다렸다 호출되는 게 아니라는 것이 중요합니다. 첫 번째 원소를 요청했을 때 호출되는 함수입니다.
suspend fun main() {
flowOf(1, 2)
.onEach { delay(1000) }
.onStart { println("Before") }
.collect { println(it) }
}
// Before
// (1초 후)
// 1
// (1초 후)
// 2
(onCompletion, onEmpty, catch와 같이) onStart에서도 원소를 보낼 수 있습니다. 원소들은 onStart부터 아래로 흐르게 됩니다.
suspend fun main() {
flowOf(1, 2)
.onEach { delay(1000) }
.onStart { emit(0) }
.collect { println(it) }
}
// 0
// (1초 후)
// 1
// (1초 후)
// 2
플로우를 완료할 수 있는 여러 방법이 있습니다. 잡히지 않는 예외가 발생했거나 코루틴이 취소되었을 때도 포함되지만, 가장 흔한 방법은 플로우 빌더가 끝났을 때 입니다. onCompletion 메서드를 사용해 플로우가 완료되었을 때 호출되는 리스너를 추가할 수 있습니다.
suspend fun main() = coroutineScope {
flowOf(1, 2)
.onEach { delay(1000) }
.onCompletion { println("Completed") }
.collect { println(it) }
}
// (1 초 후)
// 1
// (1 초 후)
// 2
// Completed
suspend fun main() = coroutineScope {
val job = launch {
flowOf(1, 2)
.onEach { delay(1000) }
.onCompletion { println("Completed") }
.collect { println(it) }
}
delay(1100)
job.cancel()
}
// (1 초 후)
// 1
// (0.1 초 후)
// Completed
플로우는 예기치 않은 이벤트가 발생하면 값을 내보내기 전에 완료될 수 있습니다. onEmpty 함수는 원소를 내보내기 전에 플로우가 완료되면 실행됩니다. onEmpty는 기본값을 내보내기 위한 목적으로 사용됩니다.
suspend fun main() = coroutineScope {
flow<List<Int>> { delay(1000) }
.onEmpty { emit(emptyList()) }
.collect { println(it) }
}
// (1 초 후)
// []
플로우를 만들거나 처리하는 도중에 예외가 발생할 수 있습니다. 이러한 예외는 아래로 흐르면서 처리하는 단계를 하나씩 닫습니다. 하지만 catch 메서드를 이용해 예외를 잡고 관리할 수 있습니다. 해당 메서드는 예외를 인자로 받고 정리를 위한 연산을 수행할 수 있습니다.
class MyError : Throwable("My error")
val flow = flow {
emit(1)
emit(2)
throw MyError()
}
suspend fun main(): Unit {
flow.onEach { println ("Got $it") }
.catch { println("Caught $it") }
.collect { println("Collected $it") }
}
// Got 1
// Collected 1
// Got 2
// Collected 2
// Caught MyError: My error
💡
위 예제에서 onEach는 예외에 반응하지 않습니다. map, filter와 같은 다른 함수에서도 마찬가지입니다. 오직 onCompletion 핸들러만 예외가 발생했을 때 호출됩니다.
catch 메서드는 예외를 잡아 전파되는 것을 멈춥니다. 이전 단계는 이미 완료된 상태지만, catch는 새로운 값을 여전히 내보낼 수 있어 남은 플로우를 지속할 수 있습니다.
val flow = flow {
emit("Message1")
throw MyError()
}
suspend fun main(): Unit {
flow.catch { emit("Error") }
.collect { println("Collected $it") }
}
// Collected Message1
// Collected Error
플로우에서 잡히지 않은 예외는 플로우를 즉시 취소하며, collect는 예외를 다시 던집니다. 중단 함수가 예외를 처리하는 방식과 같으며, coroutineScope 또한 같은 방식으로 예외를 처리합니다. 플로우 바깥에서 전통적인 try-catch 블록을 사용해서 예외를 잡을 수 있습니다.
val flow = flow {
emit("Message1")
throw MyError()
}
suspend fun main(): Unit {
try {
flow.collect { println("Collected $it") }
} catch (e: MyError) {
println("Caught")
}
}
// Collected Message1
// Caught
catch를 사용하는건 최종 연산에서 발생한 예외를 처리하는데 전혀 도움이 되지 않습니다. 따라서 collect에서 예외가 발생하면 예외를 잡지 못하게 되어 블록 밖으로 예외가 전달됩니다.
val flow = flow {
emit("Message1")
emit("Message2")
}
suspend fun main(): Unit {
flow.onStart { println("Before") }
.catch { println("Caught $it") }
.collect { throw MyError() }
}
}
// Before
// Exception in thread MyError: My error
onEach, onStart, onCompletion과 같은 플로우 연산과 flow나 channelFlow와 같은 플로우 빌더의 인자로 사용되는 람다식은 모두 중단 함수입니다. 중단 함수는 컨텍스트가 필요하며(구조화된 동시성을 위해) 부모와 관계를 유지합니다. 플로우의 함수들이 어디서 컨텍스트를 얻어올까요? 답은 collect가 호출되는 곳의 컨텍스트입니다.
fun usersFlow(): Flow<String> = flow {
repeat(2) {
val ctx = currentCoroutineContext()
val name = ctx[CoroutineName]?.name
emit("User$it in $name")
}
}
suspend fun main() {
val users = usersFlow()
withContext(CoroutineName("Name1")) {
users.collect { println(it)}
}
withContext(CoroutineName("Name2")) {
users.collect { println(it) }
}
}
// UserO in Name1
// User1 in Name1
// UserO in Name2
// User1 in Name2
최종 연산을 호출하면 상위에 있는 모든 원소를 요청하면서 코루틴 컨텍스트를 전달합니다. 하지만 flowOn 함수로 컨텍스트를 변경할 수도 있습니다.
suspend fun present(place: String, message: String) {
val ctx = coroutineContext
val name = ctx[CoroutineName]?.name
println("[$name] $message on $place")
}
fun messagesFlow(): Flow<String> = flow {
present("flow builder", "Message")
emit("Message")
}
suspend fun main() {
val users = messagesFlow()
withContext(CoroutineName("Name1")) {
users
.flowOn (CoroutineName ("Name3"))
.onEach { present("onEach", it) }
.flowOn(CoroutineName("Name2"))
.collect { present("collect", it) }
}
}
// [Name3] Message on flow builder
// [Name2] Message on onEach
// [Name1] Message on collect
flowOn은 플로우에서 윗부분에 있는 함수에서만 작동하는 걸 기억해야 합니다.
collect는 플로우가 완료될 때까지 코루틴을 중단하는 중단 연산입니다. launch 빌더로 collect를 래핑하면 플로우를 다른 코루틴에서 처리할 수 있습니다. 플로우의 확장 함수인 launchIn을 사용하면 유일한 인자로 스코프를 받아서 collect를 새로운 코루틴에서 시작할 수 있습니다.
fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job =
scope.launch { collect() }
별도의 코루틴에서 플로우를 시작하기 위해 launchIn을 주로 사용합니다.
suspend fun main(): Unit = coroutinescope {
flowOf("User1", "User2")
.onStart { println("Users:") }
.onEach { println(it) }
.launchIn(this)
}
// Users:
// Userl
// User2
지금까지 플로우를 값이 흐르는 파이프로 생각했습니다. 값이 흐르기 때문에 제외하고, 곱하고, 변형하거나 합치는 등의 여러 가지 방법으로 변경하는 것도 가능합니다. 플로우 생성과 최종 연산 사이의 이러한 연산들을 플로우 처리라고 합니다. 이 장에서는 플로우 처리를 위해 사용하는 함수들을 살펴보겠습니다.
map은 플로우의 각 원소를 변환 함수에 따라 변환하는 메서드 입니다. map 메서드에 정의된 연산에 따라 값이 변환되어 흐르게 됩니다.
suspend fun main() {
flowOf(1, 2, 3) // [1, 2, 3]
.map { it * it } // [1, 4, 9]
.collect { print(it) } // 149
}
map을 구현하려면 flow 빌더를 사용해 새로운 플로우를 만들면 됩니다. 그리고 만들어진 플로우에서 원소들을 모은 뒤, 변형된 원소들을 하나씩 내보내면 됩니다. 다음 코드는 kotlinx.coroutines 라이브러리의 실제 구현을 좀더 간단히 나타낸 것입니다.
fun <T, R> Flow<T>.map(
transform: suspend (value: T) -> R
): Flow<R> = flow { // 여기서 새로운 플로우률 만듭니다
collect { value -> //여기서 리시버를 통해 원소를 모옵니다
emit(transform(value))
}
}
filter는 원래 플로우에서 주어진 조건에 맞는 값들만 가진 플로우를 반환하는 메서드입니다.
suspend fun main() {
(1..10).asFlow() // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
.filter { it <= 5 } // [1, 2, 3, 4, 5]
.filter { isEven(it) } // [2, 4]
.collect { print(it) } // 24
}
fun isEven(num: Int): Boolean = num % 2 == 0
filter 또한 플로우 빌더를 사용해 아주 쉽게 구현할 수 있습니다.
fun <T> Flow<T>.filter(
predicate: suspend (T) -> Boolean
): Flow<T> = flow { // 여기서 새로운 플로우를 만듭니다,
collect { value -> // 여기서 리시버를 통해 원소를 모옵니다
if (predicate(value)) {
emit(value)
}
}
}
filter는 관심 없는 원소를 제거할 때 주로 사용됩니다.
특정 수의 원소만 통과시키기 위해 take를 사용할 수 있습니다.
suspend fun main() {
('A'..'Z').asFlow()
.take(5) // [A, B, C, D, E]
.collect { print(it) } // ABCDE
drop을 사용하면 특정 수의 원소를 무시할 수 있습니다.
suspend fun main() {
('A'..'Z').asFlow()
.drop(20) // [U, V, W, X, Y, Z]
.collect { print(it) } // UVWXYZ
두 개의 플로우를 하나의 플로우로 합치는 것에 대해 생각해 봅시다.
합치는 방법에는 여러 가지가 있습니다. 가장 간단한 방법은 두 개의 플로우에서 생성된 원소들을 하나로 합치는 것입니다. 최상위 레벨 함수인 merge를 사용하면 이런 과정을 수행할 수 있습니다.
suspend fun main() {
val ints: Flow<Int> = flowOf(1, 2, 3)
val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)
val together: Flow<Number> = merge(ints, doubles)
print(together.toList())
// [1, 0.1, 0.2, 0.3, 2, 3]
// 또는 [1,0.1, 0.2, 0.3, 2, 3]
// 또는 [0.1, 1, 2, 3, 0.2, 0.3]
// 또는 다른 가능한 조합 중 하나
}
merge는 한 플로우의 원소가 다른 플로우를 기다리지 않는다는 점이 중요합니다. 첫 번재 플로우의 원소 생성이 지연된다고 해서 두 번째 플로우의 원소 생성이 중단되지는 않습니다.
zip은 두 플로우로부터 쌍을 만들어서 내보냅니다. 그렇기에 두 원소가 쌍을 이루는 방법을 정의하는 함수가 필요합니다. 또한 쌍을 이뤄야하기 때문에 한 원소는 다른 원소가 생성되기를 기다려야 합니다.
suspend fun main() {
val flow1 = flowOf("A", "B", "C")
.onEach { delay(400) }
val flow2 = flowOf(l, 2, 3, 4)
.onEach { delay(1000) }
flow1.zip(flow2) { f1, f2 -> "${f1}_${f2}"}
.collect { println(it) }
}
// (1초 후)
// A_1
// (1초 후)
// B_2
// (1초 후)
// C_3
zip은 쌍을 이뤄야 하기 때문에 다른 원소를 기다려야 합니다. 하지만 combine은 기다릴 필요가 없습니다. 첫 번째 쌍이 이미 만들어졌다면 다른 플로우의 이전 원소가 함께 새로운 쌍이 만들어집니다.
suspend fun main() {
val flow1 = flowOf("A", "B","C")
.onEach { delay(400) }
val flow2 = flowOf(1, 2, 3, 4)
.onEach { delay(1000) }
flow1.combine(flow2) { f1, f2 -> "${f1}_${f2}" }
.collect { println(it) }
}
// (1초 후)
// B_1 -> A는 1원소를 기다리다가 B로 대체됩니다.
// (0.2초 후)
// C_1
// (0.8초 후)
// C_2
// (1초 후)
// C_3
// (1초 후)
// C_4
combine은 두 데이터 소스의 변화를 능동적으로 감지할 때 주로 사용합니다.
위의 예제처럼 첫 쌍이 소실되지 않기를 원한다면 합쳐질 각 플로우에 초기값을 더하면 됩니다.
userUpdateFlow.onStart { emit(currentuser) }
콜렉션에서의 fold는 주어진 원소 각각에 대해 두 개의 값을 하나로 합치는 연산을 적용하여 컬렉션의 모든 값을 하나로 합칩니다. 밑의 예제는 fold를 사용해 값을 더합니다.
fun main() {
val list = listOf(1, 2, 3, 4)
val res = list.fold(0) { acc, i -> acc + i }
println(res) // 10
val res2 = list.fold(1) { acc, i -> acc * i }
println(res2) // 24
}
fold는 flow에서도 사용할 수 있으며, 플로우가 왼료될 때까지 중단됩니다.
suspend fun main() {
val list = flowOf(1, 2, 3, 4)
.onEach { delay(1000) }
val res = list.fold(0) { accr i -> acc + i }
println(res)
}
// (4초 후)
// 10
fold 대신 scan을 사용할 수 있습니다. scan은 누적되는 과정의 모든 값을 생성하는 중간 연삽입니다.
fun main() {
val list = listOf(1, 2, 3, 4)
val res = list.scan(O) { acc, i -> acc + i }
println(res) // [0, 1, 3, 6, 10]
}
scan은 이전 단계에서 값을 받으면 즉시 새로운 값을 만들기 때문에 Flow에서 유용하게 사용됩니다.
suspend fun main() {
flowOf(1, 2, 3, 4)
.onEach { delay(1000) }
.scan(0) { acc, v -> acc + v }
.collect { println(it) }
}
// 0
// (1초 후)
// 1
// (1 초 후)
// 3
// (1초 후)
// 6
// (1초 후)
// 10
scan은 변경해야 할 사항을 플로우로 가지고 있으며, 변경 내역에 대한 객체가 필요할 때 주로 사용합니다.
컬렉션에서 잘 알려진 또 다른 함수는 flatMap입니다. 컬렉션의 경우, flatMap은 맵과 비슷하지만 변환 함수가 평탄화된 컬렉션을 반환해야 한다는 점이 다릅니다.
val allEmployees: List<Employee> = departments
.flatMap { department -> department.employees }
// 맵을 사용하면 리스트의 리스트를 대신 얻게 됩니다.
val listOfListsOfEmployee: List<List<Employee>> = departments
.map { department -> department.employees }
플로우에서 flatMap을 어떻게 봐야할까요? 변환 함수가 평탄화된 플로우를 반환한다고 생각하는 것이 직관적입니다. 다만 리스트와 다른 점은 플로우 원소가 나오는 시간이 다르다는 것입니다.
그렇다면 두 번째 원소에서 만들어진 플로우가 첫 번째 플로우에서 만들어진 원소를 기다려야 할까요? 아니면 동시에 처리해야 할까요? 이런 이유로 Flow에는 flatMap 함수가 없으며, 그 대신 다양한 함수들이 있습니다.
flatMapConcat 함수는 생성된 플로우를 하나씩 처리합니다. 그래서 두 번째 플로우는 첫 번째 플로우가 완료되었을 때 시작할 수 있습니다.
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach { delay(1000) }
.map { "${it}_${elem}" }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapConcat { flowFrom(it) }
.collect { println(it) }
}
// (1초 후)
// 1_A
// (1초 후)
// 2_A
// (1초 후)
// 3_A
// (1초 후)
// 1_B
// (1초 후)
// 2_B
// (1초 후)
// 3.B
// (1초 후)
// 1_C
// (1초 후)
// 2_C
// (1초 후)
// 3_C