Flow를 사용하다보면, Hot Flow
를 쓸건지, Cold Flow
를 쓸건지 결정하게 된다. 이때 Hot Flow
를 사용하기로 결정했다면, SharedFlow
나 StateFlow
중 어떤걸 사용해야할 지 고민이 들 수도 있는데, 이 둘의 관계와 차이에 대해 설명하고자 한다. 또한 이에 기반한 각 클래스들의 메서드 활용법 또한 적어보고자 한다.
SharedFlow
와 StateFlow
의 차이에 대해 알아보기 전, Hot Flow
와 Cold Flow
의 차이를 먼저 짚어보고자 한다. 우선 가장 결정적인 차이는 구독자의 생성 여부에 따라 생산자의 데이터 발행이 결정된다는 것이다.
ColdFlow
의 경우, 해당 플로우에 구독자가 생겨야만, 생산자가 데이터를 발행하며, 구독자가 사라졌을 경우, 해당 Flow가 종료된다. 반면, HotFlow
의 경우, 구독자의 데이터 발행 여부와 관계 없이, 생산자가 데이터를 발행할 수 있으며, Flow종료시점의 경우는 상황에따라 다르다.
여기서 중요한 표현을 했는데, 마지막 문장에 '데이터를 발행할 수 있다.'라고 한 점이다. 즉, HotFlow
의 매커니즘 또한 ColdFlow
처럼 될 수 있다는 것을 생각해야 한다. 이 차이는 stateIn()
과 sharedIn()
메서드(coldFlow
를 HotFlow
로 바꿔줌)를 사용하여 sharingStarted
파라미터를 어떻게 주었느냐에따라 달라진다.
stateIn()
과 같은 메서드를 사용하면 대략 아래와 같은 그림을 띈다.
flow {
emit("")
}.stateIn(
scope = CoroutineScope(Dispatchers.IO),
started = SharingStarted.WhileSubscribed(5000L),
initialValue = ""
)
위 그림 내, started
파라미터에 따라, 해당 Hot Flow
의 데이터 생산과 종료 시점이 달라지게 된다.
SharedFlow
의 정의는 다음과 같다.
이벤트성으로 데이터를 반복적으로 발행하기 좋은 HotFlow
SharedFlow
는 StateFlow
와는 다르게 이벤트성으로 데이터를 발행하기 적합하다. StateFlow
의 경우, 동일한 데이터를 발행한다 하더라도, 구독자는 이를 수신받을 수 없다. 하지만, SharedFlow
는 아니다.
그러기에 SharedFlow
는 StateFlow
와는 다른 특징을 지니는데, 바로 replacyCache
와 buffer
의 개념이다.
[replayCache]
생산자가 최근에 발행한 데이터를 최근에 생긴 구독자에게 몇개까지 데이터를 보여줄지를 결정하는 파라미터를 replayCache
라 한다. 예를 들어, 최근에 데이터 생산자가 데이터를 10개를 오름차순으로 발행했다.(0, 1, 2,...) 이때, replayCache
가 3으로 설정되어 있을 때, 최근에 발행되었던 데이터 9, 8, 7을 수신받는다.
[buffer]
buffer
를 사용하려거든, 아래 사항을 반드시 점검할 필요가 있다.
구독자의 데이터 처리 속도가 생산자의 데이터 발행 속도보다 느린가?
만약 그렇다면 신규 발행 데이터가 중간에 저장될 공간이 필요하게 되고, 이때 buffer
사용을 고려할 수 있다. buffer
란, 구독자의 데이터 처리 속도가 생산자의 데이터 발행 속도보다 느릴 때, 신규 발행 데이터를 중간에 저장해놓을 공간을 의미한다. 또한 버퍼 사용을 결정한다 했을 경우, 버퍼 처리 정책에 대해서도 또한 결정해줘야 한다. 이는 onBufferOverflow
파라미터를 의미하며, DROP_LATEST
, DROP_OLDEST
, SUSPEND
타입을 추가 지정하여 버퍼의 처리 정책을 정의할 수 있다.
replayCache
와 buffer
의 값을 조율한 도표를 그려보면 아래와 같다.
첫 번째로 생산자가 데이터를 발행한다. 첫 발행 데이터는 replayCache
를 거쳐, 를 필연적으로 구독자에게 바로 전달된다. 그 후, 구독자는 해당 데이터에 있어 연산을 수행한다. 이렇게 연산을 수행하는 사이, 생산자는 추가 데이터를 발행한다. 해당 데이터 또한 replayCache
를 거치게 되지만, 이는 구독자에게 전달될 수 없다. 이유는 구독자가 첫 번째로 전달된 데이터를 이미 처리하고 있기 때문이다.
따라서 이런 경우, 해당 데이터는 buffer
에 저장되며, 구독자의 데이터 처리가 완료된 직후 바로 사용될 수 있게 된다. 즉, 선입선출의 형태로 동작하는 것이다.
[onBufferOverflow]
buffer
가 가득 찬 상황에서, 생산자가 데이터를 발행했을 때, buffer
의 처리 정책을 의미한다. DROP_LATEST
의 경우, 가장 최근의 데이터를 소실시킴과 동시에 신규 발행 데이터를 buffer
에 적재한다. DROP_OLDEST
의 경우, 가장 오래된 데이터(=buffer
내, 가장 오른쪽의 데이터)를 소실시킴과 동시에 신규 발행 데이터를 buffer
에 적재한다. SUSPEND
의 경우, 구독자가 데이터 처리를 완료함과 동시에 buffer
의 오래된 데이터가 구독자에게 전달되고, buffer
의 여유공간이 남을때까지, 생산자의 중단함수(ex. emit()
)을 중단한다는 의미이다.
따라서 해당 옵션은 정확히 아래 상황 때 사용하기 위함임을 유추할 수 있다.
구독자가 발행한 데이터를 처리하는 중에 있으며, 생산자가 데이터 계속 추가 발행하고 있음.
[emit vs tryEmit]
SharedFlow
가 데이터를 발행하는 방법은 크게 2가지가 있다.
emit()
: 중단함수로 정의되어 있으며, 신규 발행 데이터가 buffer
또는 구독자에게 전달되지 못했을 때, 생산자를 중단시킴(단, onBufferOverFlow.SUSPEND
로 선언돼 있어야 함)tryEmit()
: 일반함수로 정의되어 있고, 신규 발행 데이터가 buffer
또는 구독자에게 전달되지 못했을 때, false를 반환만약, 코루틴 환경 내에서, 데이터 발행 실패로 생산자 코드의 중단 및 다음 순서 진행을 막고자 할 땐, emit()
사용을 고려할 수 있다. 하지만 데이터 발행 실패를 true
/ false
로 반환받고 이에 따른 조건문을 실행하고자할 땐, tryEmit()
사용을 고려해볼 수 있다.
StateFlow
의 정의는 다음과 같다.
원자적으로 상태를 업데이트하기 좋은 HotFlow
따라서 StateFlow
는 update()
or updateAndGet()
과 같은 메서드를 많이 사용하며, 이는 멀티 스레드 환경에서 상태값을 원자적으로 업데이트할 수 있게한다. 따라서 멀티스레드 환경 내, 경쟁상태로 인한 상태값의 무결성 파괴를 방지한다. 아래는 메서드를 활용한 간단한 샘플코드로, 멀티 스레드 환경 내, update
메서드를 활용하여 데이터 무결성이 보장된다는 것을 알 수 있다.
fun dataIntegrityTest() {
repeat(1000) {
viewModelScope.launch(Dispatchers.Default) {
_counter.update { _counter.value + 1 } // 1000 나옴
_counter.value += 1 // 321과 같은 값 나옴
}
}
}
[replayCache는 기본 1, buffer는 없음]
StateFlow
에 구독자를 생산해서 데이터를 받아보면, 가장 최근에 업데이트된 상태값을 받는다. 이는 replayCache
가 기본 1로 설정되었기 때문이다. 또한 StateFlow
를 사용하는 목적은 데이터 무결성이 보장된 최신 상태값을 조회하는 것이므로, 이전에 갱신되었던 상태값 또한 받을 필요가 없으며, 받을 수도 없다.(=replayCache
수정 불가)
또한 StateFlow
는 무결성이 보장된 최신 상태값을 보유하는 것이 중요하다. 따라서 굳이 buffer
를 설정할 필요가 없으며 존재하지도 않는다. (bufferCapacity
, onBufferOverflow
설정이 불가)
[emit, tryEmit쓰는건 바보]
StateFlow
는 SharedFlow
를 상속받았다. 따라서 emit()
과 tryEmit()
을 그대로 사용할 수 있다. 또한 emit()
, tryEmit()
을 사용해도 StateFlow
의 내부 상태값이 변경되는걸 확인할 수 있다. 하지만 이는 그다지 추천할만한 방법이 아니다.
StateFlow
는 애초에 update()
, updateAndGet()
과 같은 원자성을 보장해주며 상태값을 갱신해주는 메서드를 제공해준다. 하지만, 데이터 무결성이 깨질 수 있는 메서드인 emit()
와 tryEmit()
을 사용해서 상태값을 갱신할 필요가 있을까? 해당 메서드를 사용할거면, SharedFlow()
의 사용을 한번 더 고려해보는게 좋을 수도 있다.
2가지 정의만 기억하면 된다.
SharedFlow
: 데이터를 이벤트성으로 연속적으로 발행하기 좋은 Hot FlowStateFlow
: 상태값을 원자적으로 업데이트하지 좋은 Hot Flow[SharedFlow]
ViewModelTest
와 같은 단위테스트를 진행할 때, TestRepository
또한 임의로 만들어 진행하게 된다. 이때 TestRepository
내부엔 A라는 메서드의 동작으로 인해 B메서드가 결과값을 반응적으로 수신받아야 할 경우가 있다. 자세한 사항은 Now In Android의 단위테스트를 참고하면 좋을것 같다.RepositoryImpl
내, SharedFlow
를 사용하여 동기화가 필요한 데이터를 이벤트성으로 내려주는 방법이 있다. (이때, RepositoryImpl
은 Singletone) [StateFlow]
StateFlow()
의 update()
를 사용한다.