stateFlow 가 동일한 값을 방출하지 않는 이유

이지훈·2023년 4월 21일
0

https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/distinct-until-changed.html

코틀린 공식 문서에 Flow의 연산자 중에 하나인 distinctUntilChanged 의 문서를 확인해보면 다음과 같은 설명이 쓰여있는 것을 확인 할 수 있다.

Note that any instance of StateFlow already behaves as if distinctUntilChanged operator is applied to it, so applying distinctUntilChanged to a StateFlow has no effect.
See StateFlow documentation on Operator Fusion. Also, repeated application of distinctUntilChanged operator on any flow has no effect.

StateFlow 하이퍼 링크를 타고 들어가면 다음과 같은 연산자들도 역시 StateFlow에 적용할 경우 no effect 라고 한다.

나머지 연산자들에 대해선 추후에 확인 해보도록하고, 기존에 알고있던 바에 의하면,
StateFlow 는 내부적으로 distinctUntilChanged 기능을 수행하기 때문에 해당 연산자를 붙혀주지 않아도 된다고 알고 있었다.

당시에 StateFlow를 학습할때는 그 연산자들을 어떻게 사용하는지 방법들을 숙지하느라 급급해서 빠르게 넘어갔던 내용이지만, 내부적으로 어떻게 구현되어있길래 사용할 필요가 없다고 하는지 알아보도록 하겠다.
(최근에 컨퍼런스에 참여하면서 주니어, 시니어 개발자분들께 중요하다고 반복해서 강조 하셨던 조언이, 기술을 그냥 사용만 하지 말고, 사용하는 기술들이 내부적으로 어떻게 동작하는지 내부 코드를 확인하며 그 흐름을 이해 해보라는 것이었는데, 앞으로 많은 기술들을 까보려고 한다!)

바로 StateFlow 에 마우스 올리고, ctrl(or cmd) + 좌클릭 on

StateFlow 의 구현체는 생각보다 그 길이가 짧다. 전체 코드를 올려도 부담이 없을 정도로, 따라서 올려보도록 하겠다.

public interface StateFlow<out T> : SharedFlow<T> {
    /**
     * The current value of this state flow.
     */
    public val value: T
}

private class StateFlowImpl<T>(
    initialState: Any // T | NULL
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
    private val _state = atomic(initialState) // T | NULL
    private var sequence = 0 // serializes updates, value update is in process when sequence is odd

    @Suppress("UNCHECKED_CAST")
    public override var value: T
        get() = NULL.unbox(_state.value)
        set(value) { updateState(null, value ?: NULL) }

    override fun compareAndSet(expect: T, update: T): Boolean =
        updateState(expect ?: NULL, update ?: NULL)

    private fun updateState(expectedState: Any?, newState: Any): Boolean {
        var curSequence = 0
        var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it
        synchronized(this) {
            val oldState = _state.value
            if (expectedState != null && oldState != expectedState) return false // CAS support
            if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
            _state.value = newState
            curSequence = sequence
            if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
                curSequence++ // make it odd
                sequence = curSequence
            } else {
                // update is already in process, notify it, and return
                sequence = curSequence + 2 // change sequence to notify, keep it odd
                return true // updated
            }
            curSlots = slots // read current reference to collectors under lock
        }
        /*
           Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines.
           Loop until we're done firing all the changes. This is a sort of simple flat combining that
           ensures sequential firing of concurrent updates and avoids the storm of collector resumes
           when updates happen concurrently from many threads.
         */
        while (true) {
            // Benign race on element read from array
            curSlots?.forEach {
                it?.makePending()
            }
            // check if the value was updated again while we were updating the old one
            synchronized(this) {
                if (sequence == curSequence) { // nothing changed, we are done
                    sequence = curSequence + 1 // make sequence even again
                    return true // done, updated
                }
                // reread everything for the next loop under the lock
                curSequence = sequence
                curSlots = slots
            }
        }
    }

    override val replayCache: List<T>
        get() = listOf(value)

    override fun tryEmit(value: T): Boolean {
        this.value = value
        return true
    }

    override suspend fun emit(value: T) {
        this.value = value
    }

    @Suppress("UNCHECKED_CAST")
    override fun resetReplayCache() {
        throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
    }

    override suspend fun collect(collector: FlowCollector<T>): Nothing {
        val slot = allocateSlot()
        try {
            if (collector is SubscribedFlowCollector) collector.onSubscription()
            val collectorJob = currentCoroutineContext()[Job]
            var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
            // The loop is arranged so that it starts delivering current value without waiting first
            while (true) {
                // Here the coroutine could have waited for a while to be dispatched,
                // so we use the most recent state here to ensure the best possible conflation of stale values
                val newState = _state.value
                // always check for cancellation
                collectorJob?.ensureActive()
                // Conflate value emissions using equality
                if (oldState == null || oldState != newState) {
                    collector.emit(NULL.unbox(newState))
                    oldState = newState
                }
                // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
                if (!slot.takePending()) { // try fast-path without suspending first
                    slot.awaitPending() // only suspend for new values when needed
                }
            }
        } finally {
            freeSlot(slot)
        }
    }

    override fun createSlot() = StateFlowSlot()
    override fun createSlotArray(size: Int): Array<StateFlowSlot?> = arrayOfNulls(size)

    override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
        fuseStateFlow(context, capacity, onBufferOverflow)
}

우선 제일 위에서 확인 할 수 있듯이, StateFlow 는 SharedFlow를 상속하여 만들어진 형태이고, out T 키워드를 통해 공변성을 허용하여 하위 타입의 인스턴스를 상위 타입의 인스턴스로 간주할 수 있게 되며, 클래스의 반환타입으로만 사용 가능하다.

제네릭과 공변성의 관한 더 자세한 내용은 해당 블로그 글을 참고하면 좋을 것 같다.

https://medium.com/mj-studio/코틀린-제네릭-in-out-3b809869610e

그리고 글의 핵심 내용인 distinctUntilChanged 의 역할을 하는 코드는 값을 수집하는 collect 함수의 내부에 포함되어있다.

    override suspend fun collect(collector: FlowCollector<T>): Nothing {
        val slot = allocateSlot()
        try {
            if (collector is SubscribedFlowCollector) collector.onSubscription()
            val collectorJob = currentCoroutineContext()[Job]
            var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
            // The loop is arranged so that it starts delivering current value without waiting first
            while (true) {
                // Here the coroutine could have waited for a while to be dispatched,
                // so we use the most recent state here to ensure the best possible conflation of stale values
                val newState = _state.value
                // always check for cancellation
                collectorJob?.ensureActive()
                // Conflate value emissions using equality
                if (oldState == null || oldState != newState) {
                    collector.emit(NULL.unbox(newState))
                    oldState = newState
                }
                // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
                if (!slot.takePending()) { // try fast-path without suspending first
                    slot.awaitPending() // only suspend for new values when needed
                }
            }
        } finally {
            freeSlot(slot)
        }
    }

collect 함수는 내부적으로 무한 루프를 돌고 있는 구조이며(break 를 호출하지 않아 종료되지 않음)

if (oldState == null || oldState != newState) {
	collector.emit(NULL.unbox(newState))
    oldState = newState
}

이전 값과의 비교를 통해 값이 같지 않을때만 방출을 한다

여기서 드는 의문은 수집을 위한 collect 함수인데 emit, 방출을 하고 있네? 인데, collect 함수는 Flow에서 값을 수집하고, 그 수집된 값을 처리하기 위해 FlowCollector interface의 emit을 호출한다고 생각하면 좋을 것 같다.

public fun interface FlowCollector<in T> {

    /**
     * Collects the value emitted by the upstream.
     * This method is not thread-safe and should not be invoked concurrently.
     */
    public suspend fun emit(value: T)
}

그리고 그 전달된 값을 처리하는 로직인 emit은 FlowCollector를 구현하는 구현체들에서 각각 다르게 정의된다.

참고 자료)
https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-state-flow/

https://medium.com/@jaesungleee/android-상태와-이벤트를-효과적으로-다루기-위한-방법-fef79f572189

profile
실력은 고통의 총합이다. Android Developer

0개의 댓글