collect가 가장 대표적인 flow를 시작하는 suspend fun이지만 다른 fun들도 있다고 한다. 알아보자
4가지가 소개되는데 reduce부터 알아보자
/**
* Accumulates value starting with the first element and applying [operation] to current accumulator value and each element.
* Throws [NoSuchElementException] if flow was empty.
*/
public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S {
var accumulator: Any? = NULL
collect { value ->
accumulator = if (accumulator !== NULL) {
@Suppress("UNCHECKED_CAST")
operation(accumulator as S, value) // 두번째부턴 연산하여 축적
} else {
value // 첫번째 요소일 경우
}
}
if (accumulator === NULL) throw NoSuchElementException("Empty flow can't be reduced")
@Suppress("UNCHECKED_CAST")
return accumulator as S
}
이렇게 구현되어 있다. 축적하는 값의 시작을 컬렉션의 첫번째 요소로 설정하여 연산을 이어나간다.
val sum = list.asFlow()
.map { it * it } // squares of numbers from 1 to 5
.reduce { a, b -> a + b } // sum them (terminal operator)
println(sum) // result: 55
/**
* Accumulates value starting with [initial] value and applying [operation] current accumulator value and each element
*/
public suspend inline fun <T, R> Flow<T>.fold(
initial: R,
crossinline operation: suspend (acc: R, value: T) -> R
): R {
var accumulator = initial // 초기값 설정
collect { value ->
accumulator = operation(accumulator, value) // 들어오는 값 모두 연산하여 축적
}
return accumulator
}
fold()의 구현이다. reduce()랑 거의 비슷하지만 초기값을 직접 지정한다.
val sum2 = list.asFlow().fold(0) { a, b -> a + b }
println(sum2)
이름만 보고 첫 값만 받는줄? 알았으나 조건에 첫번째로 만족하는 값을 가져오는 fun이었다.
/**
* The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection.
* Throws [NoSuchElementException] if the flow has not contained elements matching the [predicate].
*/
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
var result: Any? = NULL
collectWhile {
if (predicate(it)) {
result = it // 조건 만족하면
false // 종료
} else {
true
}
}
if (result === NULL) throw NoSuchElementException("Expected at least one element matching the predicate $predicate")
return result as T
}
collectWhile도 같이 살펴보자
// Internal building block for non-tailcalling flow-truncating operators
internal suspend inline fun <T> Flow<T>.collectWhile(crossinline predicate: suspend (value: T) -> Boolean) {
val collector = object : FlowCollector<T> {
override suspend fun emit(value: T) {
// Note: we are checking predicate first, then throw. If the predicate does suspend (calls emit, for example)
// the resulting code is never tail-suspending and produces a state-machine
if (!predicate(value)) {
throw AbortFlowException(this) // 값을 찾으면 종료
}
}
}
try {
collect(collector)
} catch (e: AbortFlowException) {
e.checkOwnership(collector)
}
}
tail-suspending이란?
collectWhile fun에서 나와있어 궁금해졌다.
우선 꼬리호출 최적화에 대해서 알아야 한다.
a 함수가 마지막 동작으로 b 함수를 호출할때 스택에서 a함수를 제거하는 게 꼬리호출 최적화다. 마지막 동작이므로 a 함수를 스택에 유지할 필요가 없기 때문이다.
그런데 coroutine의 suspend 함수의 경우 일시중단한 시점의 상태를 유지해야하기 때문에 마지막 호출이어도 이 최적화가 일어나지 않는 것을 의미한다.
- Coroutine은 스택이 아니라 상태 기반 머신으로 동작한다
val first = list.asFlow().first { it ->
it == 2
}
println(first) // result: 2
flow가 하나의 값만 방출하는지 확인한다.
/**
* The terminal operator that awaits for one and only one value to be emitted.
* Throws [NoSuchElementException] for empty flow and [IllegalArgumentException] for flow
* that contains more than one element.
*/
public suspend fun <T> Flow<T>.single(): T {
var result: Any? = NULL
collect { value ->
require(result === NULL) { "Flow has more than one element" } // 또 들어오면
result = value // 첫번째로 들어온 값 할당
}
if (result === NULL) throw NoSuchElementException("Flow is empty")
return result as T
}
require()도 살펴봐야겠다
/**
* Throws an [IllegalArgumentException] with the result of calling [lazyMessage] if the [value] is false.
*
* @sample samples.misc.Preconditions.failRequireWithLazyMessage
*/
@kotlin.internal.InlineOnly
public inline fun require(value: Boolean, lazyMessage: () -> Any): Unit {
contract {
returns() implies value
}
if (!value) {
val message = lazyMessage()
throw IllegalArgumentException(message.toString()) // false면 message로 IllegalArgumentException 던지기
}
}
간단히 말해서 값이 2개 이상 들어오면 IllegalArgumentException을 던진다.
val single = list.asFlow().singleOrNull()
println(single) // result: null
라이브러리 소스코드를 받아서 구현을 확인하면서 보는게 확실히 더 재미있는 것 같다. 꼬리호출 최적화는 알아두면 좋을 것 같다. 그리고 Coroutine은 상태 머신 기반으로 동작하는 것도!