SharedFlow와 StateFlow의 차이?

SSY·2024년 11월 5일
1

Flow

목록 보기
5/5

시작하며

Flow를 사용하다보면, Hot Flow를 쓸건지, Cold Flow를 쓸건지 결정하게 된다. 이때 Hot Flow를 사용하기로 결정했다면, SharedFlowStateFlow중 어떤걸 사용해야할 지 고민이 들 수도 있는데, 이 둘의 관계와 차이에 대해 설명하고자 한다. 또한 이에 기반한 각 클래스들의 메서드 활용법 또한 적어보고자 한다.

HotFlow vs ColdFlow

SharedFlowStateFlow의 차이에 대해 알아보기 전, Hot FlowCold Flow의 차이를 먼저 짚어보고자 한다. 우선 가장 결정적인 차이는 구독자의 생성 여부에 따라 생산자의 데이터 발행이 결정된다는 것이다.

ColdFlow의 경우, 해당 플로우에 구독자가 생겨야만, 생산자가 데이터를 발행하며, 구독자가 사라졌을 경우, 해당 Flow가 종료된다. 반면, HotFlow의 경우, 구독자의 데이터 발행 여부와 관계 없이, 생산자가 데이터를 발행할 수 있으며, Flow종료시점의 경우는 상황에따라 다르다.

여기서 중요한 표현을 했는데, 마지막 문장에 '데이터를 발행할 수 있다.'라고 한 점이다. 즉, HotFlow의 매커니즘 또한 ColdFlow처럼 될 수 있다는 것을 생각해야 한다. 이 차이는 stateIn()sharedIn()메서드(coldFlowHotFlow로 바꿔줌)를 사용하여 sharingStarted파라미터를 어떻게 주었느냐에따라 달라진다.

HotFlow의 데이터 생산과 구독 타이밍

stateIn()과 같은 메서드를 사용하면 대략 아래와 같은 그림을 띈다.

flow {
    emit("")
}.stateIn(
    scope = CoroutineScope(Dispatchers.IO),
    started = SharingStarted.WhileSubscribed(5000L),
    initialValue = ""
)

위 그림 내, started파라미터에 따라, 해당 Hot Flow의 데이터 생산과 종료 시점이 달라지게 된다.

  • SharingStarted.WhiltSubscribed(5000L) : 첫 구독자가 생기는 즉시, 생산자가 데이터 발행을 시작한다. Flow 종료의 경우, 모든 생산자가 사라진 후, 파라미터에 적힌 시간 경과 이후 Flow가 사라진다.
  • SharingStarted.Lazy : 첫 구독자가 생기는 즉시, 생산자가 데이터 발행을 시작한다. 그 후, 스트림은 종료되지 않는다.
  • SharingStarted.Eagerly : 첫 구독자 생성 여부와 관계 없이, 생산자가 데이터 발행을 시작한다. 그 후, 스트림은 종료되지 않는다.

SharedFlow

SharedFlow의 정의는 다음과 같다.

이벤트성으로 데이터를 반복적으로 발행하기 좋은 HotFlow

SharedFlowStateFlow와는 다르게 이벤트성으로 데이터를 발행하기 적합하다. StateFlow의 경우, 동일한 데이터를 발행한다 하더라도, 구독자는 이를 수신받을 수 없다. 하지만, SharedFlow는 아니다.

그러기에 SharedFlowStateFlow와는 다른 특징을 지니는데, 바로 replacyCachebuffer의 개념이다.

[replayCache]
생산자가 최근에 발행한 데이터를 최근에 생긴 구독자에게 몇개까지 데이터를 보여줄지를 결정하는 파라미터를 replayCache라 한다. 예를 들어, 최근에 데이터 생산자가 데이터를 10개를 오름차순으로 발행했다.(0, 1, 2,...) 이때, replayCache가 3으로 설정되어 있을 때, 최근에 발행되었던 데이터 9, 8, 7을 수신받는다.

[buffer]
buffer를 사용하려거든, 아래 사항을 반드시 점검할 필요가 있다.

구독자의 데이터 처리 속도가 생산자의 데이터 발행 속도보다 느린가?

만약 그렇다면 신규 발행 데이터가 중간에 저장될 공간이 필요하게 되고, 이때 buffer사용을 고려할 수 있다. buffer란, 구독자의 데이터 처리 속도가 생산자의 데이터 발행 속도보다 느릴 때, 신규 발행 데이터를 중간에 저장해놓을 공간을 의미한다. 또한 버퍼 사용을 결정한다 했을 경우, 버퍼 처리 정책에 대해서도 또한 결정해줘야 한다. 이는 onBufferOverflow파라미터를 의미하며, DROP_LATEST, DROP_OLDEST, SUSPEND타입을 추가 지정하여 버퍼의 처리 정책을 정의할 수 있다.

replayCachebuffer의 값을 조율한 도표를 그려보면 아래와 같다.

첫 번째로 생산자가 데이터를 발행한다. 첫 발행 데이터는 replayCache를 거쳐, 를 필연적으로 구독자에게 바로 전달된다. 그 후, 구독자는 해당 데이터에 있어 연산을 수행한다. 이렇게 연산을 수행하는 사이, 생산자는 추가 데이터를 발행한다. 해당 데이터 또한 replayCache를 거치게 되지만, 이는 구독자에게 전달될 수 없다. 이유는 구독자가 첫 번째로 전달된 데이터를 이미 처리하고 있기 때문이다.

따라서 이런 경우, 해당 데이터는 buffer에 저장되며, 구독자의 데이터 처리가 완료된 직후 바로 사용될 수 있게 된다. 즉, 선입선출의 형태로 동작하는 것이다.

[onBufferOverflow]
buffer가 가득 찬 상황에서, 생산자가 데이터를 발행했을 때, buffer의 처리 정책을 의미한다. DROP_LATEST의 경우, 가장 최근의 데이터를 소실시킴과 동시에 신규 발행 데이터를 buffer에 적재한다. DROP_OLDEST의 경우, 가장 오래된 데이터(=buffer내, 가장 오른쪽의 데이터)를 소실시킴과 동시에 신규 발행 데이터를 buffer에 적재한다. SUSPEND의 경우, 구독자가 데이터 처리를 완료함과 동시에 buffer의 오래된 데이터가 구독자에게 전달되고, buffer의 여유공간이 남을때까지, 생산자의 중단함수(ex. emit())을 중단한다는 의미이다.

따라서 해당 옵션은 정확히 아래 상황 때 사용하기 위함임을 유추할 수 있다.

구독자가 발행한 데이터를 처리하는 중에 있으며, 생산자가 데이터 계속 추가 발행하고 있음.

[emit vs tryEmit]
SharedFlow가 데이터를 발행하는 방법은 크게 2가지가 있다.

  • emit() : 중단함수로 정의되어 있으며, 신규 발행 데이터가 buffer 또는 구독자에게 전달되지 못했을 때, 생산자를 중단시킴(단, onBufferOverFlow.SUSPEND로 선언돼 있어야 함)
  • tryEmit() : 일반함수로 정의되어 있고, 신규 발행 데이터가 buffer 또는 구독자에게 전달되지 못했을 때, false를 반환

만약, 코루틴 환경 내에서, 데이터 발행 실패로 생산자 코드의 중단 및 다음 순서 진행을 막고자 할 땐, emit()사용을 고려할 수 있다. 하지만 데이터 발행 실패를 true / false로 반환받고 이에 따른 조건문을 실행하고자할 땐, tryEmit()사용을 고려해볼 수 있다.

StateFlow

StateFlow의 정의는 다음과 같다.

원자적으로 상태를 업데이트하기 좋은 HotFlow

따라서 StateFlowupdate() or updateAndGet()과 같은 메서드를 많이 사용하며, 이는 멀티 스레드 환경에서 상태값을 원자적으로 업데이트할 수 있게한다. 따라서 멀티스레드 환경 내, 경쟁상태로 인한 상태값의 무결성 파괴를 방지한다. 아래는 메서드를 활용한 간단한 샘플코드로, 멀티 스레드 환경 내, update메서드를 활용하여 데이터 무결성이 보장된다는 것을 알 수 있다.

fun dataIntegrityTest() {
        repeat(1000) {
            viewModelScope.launch(Dispatchers.Default) {
                _counter.update { _counter.value + 1 } // 1000 나옴
                _counter.value += 1 // 321과 같은 값 나옴
            }
        }
    }

[replayCache는 기본 1, buffer는 없음]
StateFlow에 구독자를 생산해서 데이터를 받아보면, 가장 최근에 업데이트된 상태값을 받는다. 이는 replayCache가 기본 1로 설정되었기 때문이다. 또한 StateFlow를 사용하는 목적은 데이터 무결성이 보장된 최신 상태값을 조회하는 것이므로, 이전에 갱신되었던 상태값 또한 받을 필요가 없으며, 받을 수도 없다.(=replayCache수정 불가)

또한 StateFlow는 무결성이 보장된 최신 상태값을 보유하는 것이 중요하다. 따라서 굳이 buffer를 설정할 필요가 없으며 존재하지도 않는다. (bufferCapacity, onBufferOverflow설정이 불가)

[emit, tryEmit쓰는건 바보]
StateFlowSharedFlow를 상속받았다. 따라서 emit()tryEmit()을 그대로 사용할 수 있다. 또한 emit(), tryEmit()을 사용해도 StateFlow의 내부 상태값이 변경되는걸 확인할 수 있다. 하지만 이는 그다지 추천할만한 방법이 아니다.

StateFlow는 애초에 update(), updateAndGet()과 같은 원자성을 보장해주며 상태값을 갱신해주는 메서드를 제공해준다. 하지만, 데이터 무결성이 깨질 수 있는 메서드인 emit()tryEmit()을 사용해서 상태값을 갱신할 필요가 있을까? 해당 메서드를 사용할거면, SharedFlow()의 사용을 한번 더 고려해보는게 좋을 수도 있다.

When To Use SharedFlow or StateFlow?

2가지 정의만 기억하면 된다.

  • SharedFlow : 데이터를 이벤트성으로 연속적으로 발행하기 좋은 Hot Flow
  • StateFlow : 상태값을 원자적으로 업데이트하지 좋은 Hot Flow

[SharedFlow]

  • ViewModelTest와 같은 단위테스트를 진행할 때, TestRepository또한 임의로 만들어 진행하게 된다. 이때 TestRepository 내부엔 A라는 메서드의 동작으로 인해 B메서드가 결과값을 반응적으로 수신받아야 할 경우가 있다. 자세한 사항은 Now In Android의 단위테스트를 참고하면 좋을것 같다.
  • C라는 화면에서 이벤트가 발생했고, 이때 이전 백스택에 있던 화면 A, B에도 상태값을 반영해줘야 할 때, RepositoryImpl내, SharedFlow를 사용하여 동기화가 필요한 데이터를 이벤트성으로 내려주는 방법이 있다. (이때, RepositoryImpl은 Singletone)

[StateFlow]

  • ViewModel내부엔, 여러 코루틴들이 동시에 실행되어 UI를 갱신할 가능성이 존재한다. 하나의 이벤트로 인해, 하나의 ViewModel내 A, B코루틴이 실행됐고, 하나의 UiState를 동시에 수정한다 했을 때, 경쟁상태 발생으로 인해 데이터 무결성이 깨질 수도 있다. 이때, 원자적 업데이트를 위해 StateFlow()update()를 사용한다.
profile
불가능보다 가능함에 몰입할 수 있는 개발자가 되기 위해 노력합니다.

0개의 댓글

관련 채용 정보