[kotlin] Terminal flow operators

문돌이 개발자·2025년 1월 3일

Terminal flow operators

collect가 가장 대표적인 flow를 시작하는 suspend fun이지만 다른 fun들도 있다고 한다. 알아보자

4가지가 소개되는데 reduce부터 알아보자

reduce()

/**
 * Accumulates value starting with the first element and applying [operation] to current accumulator value and each element.
 * Throws [NoSuchElementException] if flow was empty.
 */
public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S {
    var accumulator: Any? = NULL

    collect { value ->
        accumulator = if (accumulator !== NULL) {
            @Suppress("UNCHECKED_CAST")
            operation(accumulator as S, value) // 두번째부턴 연산하여 축적
        } else {
            value // 첫번째 요소일 경우
        }
    }

    if (accumulator === NULL) throw NoSuchElementException("Empty flow can't be reduced")
    @Suppress("UNCHECKED_CAST")
    return accumulator as S
}

이렇게 구현되어 있다. 축적하는 값의 시작을 컬렉션의 첫번째 요소로 설정하여 연산을 이어나간다.

    val sum = list.asFlow()
        .map { it * it } // squares of numbers from 1 to 5
        .reduce { a, b -> a + b } // sum them (terminal operator)
    println(sum) // result: 55

fold()

/**
 * Accumulates value starting with [initial] value and applying [operation] current accumulator value and each element
 */
public suspend inline fun <T, R> Flow<T>.fold(
    initial: R,
    crossinline operation: suspend (acc: R, value: T) -> R
): R {
    var accumulator = initial // 초기값 설정
    collect { value ->
        accumulator = operation(accumulator, value) // 들어오는 값 모두 연산하여 축적
    }
    return accumulator
}

fold()의 구현이다. reduce()랑 거의 비슷하지만 초기값을 직접 지정한다.

    val sum2 = list.asFlow().fold(0) { a, b -> a + b }
    println(sum2)

first()

이름만 보고 첫 값만 받는줄? 알았으나 조건에 첫번째로 만족하는 값을 가져오는 fun이었다.

/**
 * The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection.
 * Throws [NoSuchElementException] if the flow has not contained elements matching the [predicate].
 */
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
    var result: Any? = NULL
    collectWhile {
        if (predicate(it)) {
            result = it // 조건 만족하면
            false // 종료
        } else {
            true
        }
    }
    if (result === NULL) throw NoSuchElementException("Expected at least one element matching the predicate $predicate")
    return result as T
}

collectWhile도 같이 살펴보자

// Internal building block for non-tailcalling flow-truncating operators
internal suspend inline fun <T> Flow<T>.collectWhile(crossinline predicate: suspend (value: T) -> Boolean) {
    val collector = object : FlowCollector<T> {
        override suspend fun emit(value: T) {
            // Note: we are checking predicate first, then throw. If the predicate does suspend (calls emit, for example)
            // the resulting code is never tail-suspending and produces a state-machine
            if (!predicate(value)) {
                throw AbortFlowException(this) // 값을 찾으면 종료
            }
        }
    }
    try {
        collect(collector)
    } catch (e: AbortFlowException) {
        e.checkOwnership(collector)
    }
}

tail-suspending이란?
collectWhile fun에서 나와있어 궁금해졌다.
우선 꼬리호출 최적화에 대해서 알아야 한다.
a 함수가 마지막 동작으로 b 함수를 호출할때 스택에서 a함수를 제거하는 게 꼬리호출 최적화다. 마지막 동작이므로 a 함수를 스택에 유지할 필요가 없기 때문이다.
그런데 coroutine의 suspend 함수의 경우 일시중단한 시점의 상태를 유지해야하기 때문에 마지막 호출이어도 이 최적화가 일어나지 않는 것을 의미한다.

  • Coroutine은 스택이 아니라 상태 기반 머신으로 동작한다
    val first = list.asFlow().first { it ->
        it == 2
    }
    println(first) // result: 2

single()

flow가 하나의 값만 방출하는지 확인한다.

/**
* The terminal operator that awaits for one and only one value to be emitted.
* Throws [NoSuchElementException] for empty flow and [IllegalArgumentException] for flow
* that contains more than one element.
*/
public suspend fun <T> Flow<T>.single(): T {
   var result: Any? = NULL
   collect { value ->
       require(result === NULL) { "Flow has more than one element" } // 또 들어오면 
       result = value // 첫번째로 들어온 값 할당
   }

   if (result === NULL) throw NoSuchElementException("Flow is empty")
   return result as T
}

require()도 살펴봐야겠다

/**
 * Throws an [IllegalArgumentException] with the result of calling [lazyMessage] if the [value] is false.
 *
 * @sample samples.misc.Preconditions.failRequireWithLazyMessage
 */
@kotlin.internal.InlineOnly
public inline fun require(value: Boolean, lazyMessage: () -> Any): Unit {
    contract {
        returns() implies value
    }
    if (!value) {
        val message = lazyMessage()
        throw IllegalArgumentException(message.toString()) // false면 message로 IllegalArgumentException 던지기
    }
}

간단히 말해서 값이 2개 이상 들어오면 IllegalArgumentException을 던진다.

    val single = list.asFlow().singleOrNull()
    println(single) // result: null

라이브러리 소스코드를 받아서 구현을 확인하면서 보는게 확실히 더 재미있는 것 같다. 꼬리호출 최적화는 알아두면 좋을 것 같다. 그리고 Coroutine은 상태 머신 기반으로 동작하는 것도!

profile
까먹고 다시 보려고 남기는 기록

0개의 댓글