저번 시간에 Rx 라이브러리, 즉 reactive programming을 하는 이유를 알아봤다. RxJava에 대해 공부를 하던 중 눈에 띄는 것이 하나 있는데 바로 Coroutine Flow이다. Coroutine은 비동기 작업을 간편하게 처리할 수 있게 해주는 lightweight thread이다. Coroutine Flow란 무엇일까.
"Rx is An API for asynchronous programming with observable streams"
Rx(Java)는 옵저버블 스트림으로 비동기 프로그래밍을 하기 위한 api라고 한다. 쉽게 말해선 비동기 처리를 위해 사용하는 Api이다. 그리고 Coroutine에서도 이와 같은 기능을 제공하기 위해 Coroutine Flow Api를 제공하는 것이다. 저번 시간에 Reactive Programming, 즉 Rx는 '순차적으로 실행되는 데이터 스트림을 비동기적으로 처리할 수 있다'라고 했는데, Coroutine Flow도 이와 같은 역할을 수행한다고 보면 되겠다. 이름부터 Flow니 데이터 스트림을 처리하는 무언가라고 강하게 예측된다.
fun foo(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
Rx의 Publisher를 알고 있다면, 이 Flow의 의도가 무엇인지 이해가 쉽다. Int 타입의 데이터가 흐르는 데이터 스트림을 시작하는 함수이다. 100ms 간격으로 1, 2, 3의 데이터가 순차적으로 흐른다.
fun main() = runBlocking<Unit> {
val flow = foo()
//flow 시작
flow.collect { value ->
println(value)
}
}
위의 foo()함수로 Int타입 데이터 스트림을 만들고, 아래 collect에서 데이터 스트림에 흐르는 데이터가 관찰되면 해당 데이터를 println해주는 코드이다. flow가 publisher, collect가 subscribe()의 역할임을 알 수 있다. 결국 Coroutine Flow는 Reactive Programming을 위한 코루틴의 구성요소인 것이다.
저번 Reactive Programming에서 Publisher와 Subscriber를 다뤘다. 음. Coroutine flow에서 Producer와 Counsumer의 역할은 예상이 간다. 그런데 Intermediary(중간 연산자)는 무엇일까? 일단 RxJava와 Coroutine flow는 비동기 데이터 스트림에 대해 다룬다. 그리고 데이터 스트림에서 관찰되는 "모든" 데이터를 "관찰된 그대로" 사용할까?
아니다. 상황에 맞게 필요한 데이터만 가져와서 사용하거나, 데이터를 Consumer(ViewModel)가 사용할 수 있도록 연산 or 변환을 해둬야 하기도 한다. 당연히 RxJava와 Coroutine flow는 이러한 과정을 위해 중간 연산자에 대한 기능을 제공한다. Coroutine flow의 대표적인 중간 연산자로는 map(데이터 변형), filter(데이터 필터링), onEach(모든 데이터마다 연산 수행) 등이 존재한다.
class DataSource( private val restApi : RestApi){
fun getUserInfoFlow() : Flow<List<UserInfo>> = flow {
while(ture){
val userInfos = restApi.fetchLastedUserInfo()
emit(userInfos)
delay(60000)
}
}
}
//지속적으로(60초마다) 데이터를 가져와 발행해준다.
class Repository(private val dataSource : DataSource){
fun getMyInfo(uid : String){
dataSource.getMyInfoFlow().map{ it.filter{ this.uid == uid } }
}
}
//함수의 인자로 전달한 name 값과 동일한 name을 가진 데이터만 (필터링하여) Consumer에게 전달한다.
class ViewModel(private val repository : Repository) : ViewModel(){
//livedata 변수 myInfo
fun collectMyInfo(uid : String){
viewModelScope.launch{
repository.getMyInfo(uid).collect{ -> it
myInfo.value = it
}
}
}
}
//새 데이터로 liveData 값 갱신(collect를 이용해 전달된 데이터를 소비)
앞서 Producer ➡️ Intermediary ➡️ Consumer 클래스 순으로 데이터 스트림을 가지고 왔다. 이제 데이터를 저장하기 위해 데이터 홀더 클래스의 도움이 필요하다. Consumer는 데이터를 소비하는 클래스로써, MVVM에서 ViewModel 클래스가 해당 역할을 수행한다. ViewModel의 데이터 홀더 클래스, LiveData가 곧바로 떠오른다. LiveData와 Coroutine Flow를 함께 사용하여 반응형 프로그래밍이 가능하다. 하지만 제목처럼 알 수 있 듯이 StateFlow라는 기능이 또다른 선택지가 될 수 있다. 그렇다면 왜 ViewModel의 대표적인 데이터 홀더 클래스인 LiveData를 제치고 StateFlow라는 또다른 선택지가 등장한 것일까?
그렇다면 LiveData를 Flow의 방식으로 대체해야 하는데 이러한 방식엔 문제가 있었다.
이를 위해 Kotlin 1.41 버전에서 Stable API로 등장한 것이 SharedFlow, StateFlow이다. StateFlow는 SharedFlow의 한 종류인데, LiveData에 가장 가까운 기능이다. StateFlow는 현재 상태와 새로운 상태 업데이트를 collector 에 내보내는 Observable State holder flow이다. LiveData를 대체할 수 있도록 나왔기 때문에, Livedata 처럼 value 프로퍼티를 통해 현재 상태 값을 읽을 수 있다. 앞서 정리하자면 StateFlow는 데이터 홀더(LiveData) 역할을 하면서 Flow의 데이터 스트림 역할까지 한다. 따라서
이제 MVVM 패턴에서 LiveData가 사용되는 자리에 StateFlow로 대체하는 것이 가능해졌다. 하지만 StateFlow는 Flow를 위한 데이터 홀더 클래스이기 때문에, 다음과 같은 것이 다르다.
CoroutineFlow + LiveData
class MyViewModel {
private val _myUiState = MutableLiveData<Result<UiState>>(Result.Loading)
val myUiState: LiveData<Result<UiState>> = _myUiState
// Load data from a suspend fun and mutate state
init {
viewModelScope.launch {
val result = ...
_myUiState.value = result
}
}
}
CoroutineFlow + StateFlow
class MyViewModel {
private val _myUiState = MutableStateFlow<Result<UiState>>(Result.Loading)
val myUiState: StateFlow<Result<UiState>> = _myUiState
// Load data from a suspend fun and mutate state
init {
viewModelScope.launch {
val result = ...
_myUiState.value = result
}
}
}
LiveData와 StateFlow의 사용법이 별반 차이가 없다.
CoroutineFlow +LiveData
class MyViewModel(...) : ViewModel() {
val result: LiveData<Result<UiState>> = liveData {
emit(Result.Loading)
emit(repository.fetchItem())
}
}
//liveData{} 는 코루틴 빌더
CoroutineFlow + StateFlow
class MyViewModel(...) : ViewModel() {
val result: StateFlow<Result<UiState>> = flow {
emit(repository.fetchItem())
}.stateIn(
scope = viewModelScope,
started = WhileSubscribed(5000), // Or Lazily because it's a one-shot
initialValue = Result.Loading
)
}
LiveData와 StateFlow의 각각 사용법의 차이가 있다. 바로 StateFlow 사용 코드에서 flowBuilder에 stateIn() 메서드를 사용하여 Flow를 StateFlow 객체로 변환해준 것이다.
Flow을 StateFlow로 변환해주는 메서드이다.
stateIn 메서드가 필요한 이유는 리액티브 프로그래밍에 있다. 리액티브 프로그래밍을 할 때, 여러 데이터 스트림(flow)를 하나의 데이터 스트림(flow)로 만들어 낸다. 왜 합치는 것일까. 예를 들어 우리가 영화 평점 앱을 만드는 경우, 영화 정보, 사용자 정보, 영화 평점 등의 정보을 합쳐 하나의 객체로 만들어야되기 때문이다. 하나로 만들어진 Flow는 UI에서 사용되기 위해 StateFlow로 변환되어야 한다. UI는 이 StateFlow를 구독하고 있는 것이다.
이러한 과정을 수행하기 위해서 Flow를 StateFlow(데이터 홀더)로 변환하는 로직이 필요한 것이다. 또한 LiveData와 달리 StateFlow는 View LifeCycle을 감지하지 않는다. 따라서 StateFlow가 항상 Flow를 구독하여 메모리 누수가 발생하지 않도록, CoroutineScope를 명시해줘야 한다. 우리는 이를 stateIn 메서드를 통해 작업한다. stateIn()가 필요로 하는 세 가지 변수는 다음과 같다.
val stringFlow : Flow<String> = flow{
for(1 in 0..1000){
emit("integer: $i")
delay(1000)
}
}
val stateFlow = stringFlow.stateIn(
initialValue = "integer 0",
started = SharingStarted.WhileSubscribed(5000),
scope = viewModelScope
)
RxJava vs Coroutine 1
RxJava vs Coroutine 2
Kotlin World - CoroutineFlow 참고 글
LiveData vs StateFlow 참고 글