24.02.13 StateFlow(1)

KSang·2024년 2월 17일
0

TIL

목록 보기
59/101

지난번에 StateFlow의 라이브 데이터와 차이점 SharedFlow 와 차이점에 대해 알아봤는데

좀더 자세히 알아보려고한다.


StateFlow

private class StateFlowImpl<T>(
    initialState: Any // T | NULL
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
    private val _state = atomic(initialState) // T | NULL
    private var sequence = 0 // serializes updates, value update is in process when sequence is odd

    @Suppress("UNCHECKED_CAST")
    public override var value: T
        get() = NULL.unbox(_state.value)
        set(value) { updateState(null, value ?: NULL) }

    override fun compareAndSet(expect: T, update: T): Boolean =
        updateState(expect ?: NULL, update ?: NULL)

    private fun updateState(expectedState: Any?, newState: Any): Boolean {
        var curSequence: Int
        var curSlots: Array<StateFlowSlot?>? // benign race, we will not use it
        synchronized(this) {
            val oldState = _state.value
            if (expectedState != null && oldState != expectedState) return false // CAS support
            if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
            _state.value = newState
            curSequence = sequence
            if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
                curSequence++ // make it odd
                sequence = curSequence
            } else {
                // update is already in process, notify it, and return
                sequence = curSequence + 2 // change sequence to notify, keep it odd
                return true // updated
            }
            curSlots = slots // read current reference to collectors under lock
        }
        /*
           Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines.
           Loop until we're done firing all the changes. This is a sort of simple flat combining that
           ensures sequential firing of concurrent updates and avoids the storm of collector resumes
           when updates happen concurrently from many threads.
         */
        while (true) {
            // Benign race on element read from array
            curSlots?.forEach {
                it?.makePending()
            }
            // check if the value was updated again while we were updating the old one
            synchronized(this) {
                if (sequence == curSequence) { // nothing changed, we are done
                    sequence = curSequence + 1 // make sequence even again
                    return true // done, updated
                }
                // reread everything for the next loop under the lock
                curSequence = sequence
                curSlots = slots
            }
        }
    }

    override val replayCache: List<T>
        get() = listOf(value)

    override fun tryEmit(value: T): Boolean {
        this.value = value
        return true
    }

    override suspend fun emit(value: T) {
        this.value = value
    }

    @Suppress("UNCHECKED_CAST")
    override fun resetReplayCache() {
        throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
    }

    override suspend fun collect(collector: FlowCollector<T>): Nothing {
        val slot = allocateSlot()
        try {
            if (collector is SubscribedFlowCollector) collector.onSubscription()
            val collectorJob = currentCoroutineContext()[Job]
            var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
            // The loop is arranged so that it starts delivering current value without waiting first
            while (true) {
                // Here the coroutine could have waited for a while to be dispatched,
                // so we use the most recent state here to ensure the best possible conflation of stale values
                val newState = _state.value
                // always check for cancellation
                collectorJob?.ensureActive()
                // Conflate value emissions using equality
                if (oldState == null || oldState != newState) {
                    collector.emit(NULL.unbox(newState))
                    oldState = newState
                }
                // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
                if (!slot.takePending()) { // try fast-path without suspending first
                    slot.awaitPending() // only suspend for new values when needed
                }
            }
        } finally {
            freeSlot(slot)
        }
    }

    override fun createSlot() = StateFlowSlot()
    override fun createSlotArray(size: Int): Array<StateFlowSlot?> = arrayOfNulls(size)

    override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
        fuseStateFlow(context, capacity, onBufferOverflow)
}

스테이트 플로우 내부로 들어가보면 AbstractSharedFlow(), MutableStateFlow, CancellableFlow, FusibleFlow로 다중 상속을 받은 걸 볼 수 있다.

여기서 기본적인 updateState 는 sequence로 현재 업데이트 상태인지 아닌지를 나타내는데,

상태 업데이트가 시작될 때, sequence 값이 짝수라면 이는 현재 상태 업데이트가 없음을 의미하고, 이때 sequence 값을 1 증가시켜 홀수로 만들어 상태 업데이트가 진행 중임을 표시한다.

updateState 함수는 새로운 상태 값을 설정하고, 해당 업데이트가 모든 수집기(collectors)에게 전파되어야 하는지를 결정한다.

StateFlowImpl에서는 상태 업데이트가 여러 코루틴에서 동시에 발생할 수 있다.

sequence 변수를 통해 이러한 동시성 상황을 관리하며, 상태 업데이트가 안전하게 진행될 수 있도록 한다.

업데이트 과정에서 synchronized 블록을 사용하여 상태 변경이 원자적으로 이루어지고, 모든 업데이트가 올바른 순서로 처리되도록 보장한다.


replayCache

replayCache는 StateFlow의 현재 값을 보유하는 리스트를 반환한다.

StateFlow는 항상 최신 값 하나를 저장하고 이 값을 새로운 구독자에게 재전송하는데 이 특성을 구현하는데 사용된다.

replayCache는 StateFlow의 현재 상태를 빠르게 파악하고 대응하는데 유용하게 사용될 수 있다.


tryEmit

tryEmit(value: T) 메서드는 StateFlow에 새로운 값을 비동기적으로 설정한다.

이 메서드는 항상 true를 반환하며, 이는 StateFlow가 값을 성공적으로 업데이트할 수 있음을 의미한다.

StateFlow의 경우, tryEmit 호출이 실패하는 경우는 없으며, 이는 StateFlow의 단순성과 항상 사용 가능한 현재 상태 값을 보장한다


emit 함수

suspend fun emit(value: T) 메서드는 코루틴 내에서 StateFlow의 값을 업데이트한다.
이 메서드는 tryEmit와 동일하게 작동하며, 새로운 값을 StateFlow에 설정한다.


collect

StateFlow는 기본적으로 코루틴 스코프 에서 사용하고 라이브데이터의 옵저버(observer)대신
collect를 사용한다.

lifecycleScope.launch {
            viewState.collect {

이 collect 내부로 들어가면 이렇게 구성되있다.

    override suspend fun collect(collector: FlowCollector<T>): Nothing {
        val slot = allocateSlot()
        try {
            if (collector is SubscribedFlowCollector) collector.onSubscription()
            val collectorJob = currentCoroutineContext()[Job]
            var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
            // The loop is arranged so that it starts delivering current value without waiting first
            while (true) {
                // Here the coroutine could have waited for a while to be dispatched,
                // so we use the most recent state here to ensure the best possible conflation of stale values
                val newState = _state.value
                // always check for cancellation
                collectorJob?.ensureActive()
                // Conflate value emissions using equality
                if (oldState == null || oldState != newState) {
                    collector.emit(NULL.unbox(newState))
                    oldState = newState
                }
                // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
                if (!slot.takePending()) { // try fast-path without suspending first
                    slot.awaitPending() // only suspend for new values when needed
                }
            }
        } finally {
            freeSlot(slot)
        }
    }

와일문으로 무한 루프를 통해 StateFlow의 상태 변화를 지속적으로 감지한다.

와일문 내에서 이전 상태와 현재 상태를 비교하여 변화가 있을 때만 수집기에 새로운 값을 전달한다.

collectorJob 부분에서 현재 코루틴의 작업이 취소되었는지 확인한다.
취소된 경우 ensureActive 메서드가 CancellationException을 던져 루프가 종료된다.

if (!slot.takePending())에서 현재 슬롯이 대기상태(pending)인지 확인한다

새로운 값이 들어올 때까지 대기하기 때문에 StateFlow에서 새로운 상태가 발행될 때까지 효율적으로 대기할 수 있게 된다

asStateFlow

val mutableStateFlow = MutableStateFlow(0)
val stateFlow: StateFlow<Int> = mutableStateFlow.asStateFlow()

읽기 전용인 StateFlow로 변환할 수 있다.

asLiveData

Flow를 LiveData로 변환해주는대, 뷰의 라이프 사이클을 따라야 하는 경우 사용된다.

오늘은 여기까지,,,

0개의 댓글