안드로이드 개발을 하다보면 상태와 이벤트처리를 해야합니다. 기존에는 LiveData, SingleLiveData를 활용해서 View의 상태를 업데이트하거나 이벤트를 전달하는 방법을 제공했습니다.
하지만 LiveData를 도메인 레이어안에서 플랫폼 독립적으로 사용하려고 한다면 안드로이드 의존성 때문에 어려움이 생깁니다. 이전 RxJava를 많이 사용하던 시절에는 RxJava/Kotlin의 Hot Stream인 Subject인 BehaviorSubject를 이용해 상태를 관리하여 안드로이드 플랫폼에 독립적으로 사용했습니다.
그러나 이제는 안드로이드 앱들이 Kotlin으로만 작성되고 있어 RxJava/Kotlin의 의존성없이 Kotlin 코루틴의 Flow를 사용해서 데이터 스트림을 구현할 수 있습니다.
Flow는 기본적으로 Cold Stream 이지만 이를 Hot Stream으로 사용할 수 있는 Flow가 StateFlow
, SharedFlow
입니다. 이 중 최신 상태를 관리하고 전달해 주는 StateFlow
의 특징에 대해서 먼저 알아보겠습니다.
StateFlow
는 컬렉터에게 단일의 최신 데이터 값만 업데이트를 보장하는 핫 플로우입니다.
StateFlow
의 특징은 초기값 가지며 emit()
혹은 value
로 프로퍼티에 접근할 수 있습니다. 또한 이전에 내보낸 값과 동일한 값을 소비자에게 전달하지 않도록 distinctUntilChanged()
와 같은 연산자의 기능을 가지고 있습니다.
📘 StateFlow는 Any.equals를 이용해서 이전에 내보낸 값과 동일한 값인지 비교하고 결합합니다.
Any.equals의 규칙을 따르지 않은 클래스는 StateFlow에서 의도된 동작을 보장하지 못합니다.
private val _stateFlow = MutableStateFlow(0)
val stateFlow = _stateFlow.asStateFlow() // ready-only
_stateFlow.emit(1)
_stateFlow.value = 1
StateFlow
는 항상 최신의 값을 가져오기 때문에 최신 상태를 유지하는 데이터 홀더로써 사용이 권장됩니다
SharedFlow
도 마찬가지로 Hot Flow로 애플리케이션에서 구독자들에게 이벤트를 전달하는 목적으로 사용합니다.
SharedFlow
의 생성은 MutableSharedFlow() 생성자 함수로 만들 수 있습니다. 파라미터가 없는 기본 생성자로 만들 수도 있지만 아래와 같은 파라미터로 SharedFlow
의 옵션들을 설정할 수 있습니다.
replay : 새로운 구독자에게 이전 이벤트를 전달할 개수
extraBufferCapacity : 추가적인 버퍼를 생성하고 emit 한 데이터를 버퍼에 유지
onBufferOverflow : 버퍼가 가득찬 경우 어떤 동작을 할지 정의
SharedFlow
는 replay cache에서 가장 최신의 값들을 가지며 새로운 구독자가 생겼을 때 가장 먼저 replay cache에서 데이터를 가져오고 이후 새로 emit된 값을 가져옵니다. replay cache는 resetReplayCache()
함수로 재설정할 수 있습니다.
또한 replay cache는 버퍼를 제공하는데 뒤늦게 구독한 구독자가 일시 중단하지 않고 버퍼에서 값을 가져올 수 있도록 도와줍니다. 이러한 버퍼 값은 SharedFlow
를 생성할 때 extraBufferCapacity
변수를 사용하여 추가 버퍼를 예약하여 사용할 수 있으며 이렇게 설정한 버퍼 공간은 앞서 구독한 구독자와 뒤늦게 구독한 구독자와의 데이터 지연의 정도를 결정합니다.
SharedFlow
의버퍼사이즈는 replay와 extraBufferCapacity를 합친 값으로 정의되므로 buffer를 사용하기 위해서는 extraBufferCapacity 값을 1이상으로 설정해주어야합니다.
하지만 이렇게 설정한 버퍼도 용량이 가득차게 된다면 오버플로에 맞는 전략을 수립해야합니다. SharedFlow에서 적용 가능한 BufferOverFlow는 3가지를 제공합니다.
BufferOverflow.SUSPEND : 버퍼가 가득찼을 때 업스트림인 send()나 emit()이 blocking 됩니다.
BufferOverflow.DROP_OLDEST : 가장 오래된 값을 버리고 새로운 값을 추가합니다.
BufferOverflow.LATEST : 최신 값을 버리고 새로운 값을 추가합니다.
📘 버퍼 오버플로는 새로운 값을 받을 준비가 안된 최소 하나의 구독자가 있을 때만 발생합니다.
구독자가 하나도 없는 경우에는 최근의 replay value만 저장되며 오버플로는 트리거되지 않으며 만약 BufferOverFlow.SUSPEND
, BufferOverFlow.DROP_LATEST
사용에도 blocking 되지 않습니다.
기본적으로 SharedFlow
는 매개변수 없는 생성자 함수로 생성시 replay cache나 buffer가 없는 StateFlow
를 만들 수 있습니다. 이런 경우에 emit()
함수를 호출하면 구독자가 값을 받을 때 까지 일시중단하며 구독자가 없는 경우는 즉시 리턴합니다.
override suspend fun emit(value: T) {
if (tryEmit(value)) return // fast-path
emitSuspend(value)
}
SharedFlow의 내부 코드를 살펴보면 emit()
함수는 tryEmit()
함수가 true
라면 리턴하게 되어있습니다.
override fun tryEmit(value: T): Boolean {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val emitted = synchronized(this) {
if (tryEmitLocked(value)) {
resumes = findSlotsToResumeLocked(resumes)
true
} else {
false
}
}
for (cont in resumes) cont?.resume(Unit)
return emitted
}
따라서 tryEmit()
은 구독자가 없는 경우라면 true
를 반환하는 것을 알 수 있습니다. (이 경우 내보내려고 한 값은 즉시 손실됩니다)
SharedFlow를 이용해서 안드로이드에서 이벤트를 전파하는 코드는 다음과 같습니다. 안드로이드에선 일반적으로 ViewModel에서 해당코드를 작성하게 됩니다.
private val _events = MutableSharedFlow<Event>(
replay = 0,
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.SUSPEND
)
val events = _events.asSharedFlow()
suspend fun sendEvent(event: Event) {
_events.emit(event)
}
StateFlow와 SharedFlow의 계층 구조는 다음과 같습니다.
StateFlow와 SharedFlow 모두 각각의 특징을 가지고 있어 이러한 Hot Flow들을 적절히 활용하면 직관적이고 간결한 코드를 작성할 수 있습니다.