Rx2 - Coroutine Flow

don9wan·2021년 12월 24일
1

Android

목록 보기
17/18
post-thumbnail

Coroutine Flow?

저번 시간에 Rx 라이브러리, 즉 reactive programming을 하는 이유를 알아봤다. RxJava에 대해 공부를 하던 중 눈에 띄는 것이 하나 있는데 바로 Coroutine Flow이다. Coroutine은 비동기 작업을 간편하게 처리할 수 있게 해주는 lightweight thread이다. Coroutine Flow란 무엇일까.

"Rx is An API for asynchronous programming with observable streams"

Rx(Java)는 옵저버블 스트림으로 비동기 프로그래밍을 하기 위한 api라고 한다. 쉽게 말해선 비동기 처리를 위해 사용하는 Api이다. 그리고 Coroutine에서도 이와 같은 기능을 제공하기 위해 Coroutine Flow Api를 제공하는 것이다. 저번 시간에 Reactive Programming, 즉 Rx는 '순차적으로 실행되는 데이터 스트림을 비동기적으로 처리할 수 있다'라고 했는데, Coroutine Flow도 이와 같은 역할을 수행한다고 보면 되겠다. 이름부터 Flow니 데이터 스트림을 처리하는 무언가라고 강하게 예측된다.

Flow : 비동기 데이터 스트림 생성 함수

fun foo(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

Rx의 Publisher를 알고 있다면, 이 Flow의 의도가 무엇인지 이해가 쉽다. Int 타입의 데이터가 흐르는 데이터 스트림을 시작하는 함수이다. 100ms 간격으로 1, 2, 3의 데이터가 순차적으로 흐른다.

Flow : 비동기 데이터 처리 코드

fun main() = runBlocking<Unit> {
    val flow = foo()
    //flow 시작
    
    flow.collect { value ->
      println(value)
    }
}

위의 foo()함수로 Int타입 데이터 스트림을 만들고, 아래 collect에서 데이터 스트림에 흐르는 데이터가 관찰되면 해당 데이터를 println해주는 코드이다. flow가 publisher, collect가 subscribe()의 역할임을 알 수 있다. 결국 Coroutine Flow는 Reactive Programming을 위한 코루틴의 구성요소인 것이다.

Coroutine Flow 구성요소 (MVVM에서의)

  • new Data
    Producer(생산자, DataSource) : 데이터 (스트림) 생성
  • ⬇️flow
    Intermediary(중간 연산자, Repository) : 데이터 변환
  • ⬇️flow
    Consumer(소비자, ViewModel) : 데이터 소비
  • ⬇️Livedata(StateFlow)
    View(View) : 뷰 업데이트

저번 Reactive Programming에서 Publisher와 Subscriber를 다뤘다. 음. Coroutine flow에서 Producer와 Counsumer의 역할은 예상이 간다. 그런데 Intermediary(중간 연산자)는 무엇일까? 일단 RxJava와 Coroutine flow는 비동기 데이터 스트림에 대해 다룬다. 그리고 데이터 스트림에서 관찰되는 "모든" 데이터를 "관찰된 그대로" 사용할까?

아니다. 상황에 맞게 필요한 데이터만 가져와서 사용하거나, 데이터를 Consumer(ViewModel)가 사용할 수 있도록 연산 or 변환을 해둬야 하기도 한다. 당연히 RxJava와 Coroutine flow는 이러한 과정을 위해 중간 연산자에 대한 기능을 제공한다. Coroutine flow의 대표적인 중간 연산자로는 map(데이터 변형), filter(데이터 필터링), onEach(모든 데이터마다 연산 수행) 등이 존재한다.

Producer(DataSource)

class DataSource( private val restApi : RestApi){
   fun getUserInfoFlow() : Flow<List<UserInfo>> = flow {
      while(ture){
         val userInfos = restApi.fetchLastedUserInfo()
         emit(userInfos)
         delay(60000)
      }
   }
}
//지속적으로(60초마다) 데이터를 가져와 발행해준다.

Intermediary(Repository)

class Repository(private val dataSource : DataSource){
   fun getMyInfo(uid : String){
      dataSource.getMyInfoFlow().map{ it.filter{ this.uid == uid } }
   }
}
//함수의 인자로 전달한 name 값과 동일한 name을 가진 데이터만 (필터링하여) Consumer에게 전달한다.

Cousumer(ViewModel)

class ViewModel(private val repository : Repository) : ViewModel(){
   //livedata 변수 myInfo
   fun collectMyInfo(uid : String){
      viewModelScope.launch{
         repository.getMyInfo(uid).collect{ -> it
            myInfo.value = it
         }
      }
   }
}
//새 데이터로 liveData 값 갱신(collect를 이용해 전달된 데이터를 소비)

LiveData vs StateFlow

앞서 Producer ➡️ Intermediary ➡️ Consumer 클래스 순으로 데이터 스트림을 가지고 왔다. 이제 데이터를 저장하기 위해 데이터 홀더 클래스의 도움이 필요하다. Consumer는 데이터를 소비하는 클래스로써, MVVM에서 ViewModel 클래스가 해당 역할을 수행한다. ViewModel의 데이터 홀더 클래스, LiveData가 곧바로 떠오른다. LiveData와 Coroutine Flow를 함께 사용하여 반응형 프로그래밍이 가능하다. 하지만 제목처럼 알 수 있 듯이 StateFlow라는 기능이 또다른 선택지가 될 수 있다. 그렇다면 왜 ViewModel의 대표적인 데이터 홀더 클래스인 LiveData를 제치고 StateFlow라는 또다른 선택지가 등장한 것일까?

1. LiveData의 일반적인 장점

  • Activity 와 Fragment 는 LiveData 객체를 안전하게 관찰할 수 있고, 생명 주기가 끝나는 즉시 관찰을 멈추기 때문에 누수를 걱정하지 않아도 된다.
  • LiveData 는 옵저버 패턴을 따르기에 LiveData 는 관찰 대상인 데이터가 변경될 때 Observer 객체에 알린다. 그리고 이러한 Observer 객체를 통해 UI 를 업데이트한다면 개발자가 직접 업데이트할 필요가 없게 된다.
  • Activity 가 Back Stack 에 있을 때를 비롯하여 Observer 의 생명 주기가 비활성 상태에 있으면 Observer 는 어떤 LiveData 이벤트도 받지 않아 비정상 종료가 발생하지 않게 된다.
  • 생명 주기가 비활성화(Inactive)되었다가 다시 활성화(Active)될 때 최신 데이터를 수신한다.
  • Room 이나 Retrofit 라이브러리 등과 호환되어 함께 사용하기 좋다.

2. CoroutineFlow + LiveData의 문제점

  • 보일러 플레이트 코드를 만들어낸다. 안드로이드에서 수집하는 UIState가 한 둘이 아니다. 이들을 모두 구독하기 위해 비슷한 코드를 매번 작성해 가독성을 떨어트리는 것은 지양해야 한다.
  • Livedata는 UI에 밀접하게 연관되어 있기 때문에 Data Layer에서 비동기 방식으로 데이터를 처리하기에 자연스럽지 않다. LiveData는 비동기 데이터 스트림을 처리하도록 설계되어 있지 않는데, LiveData의 관찰은 오직 Main Thread에서만 진행되기 때문이다.
  • LiveData는 안드로이드 플랫폼에 속해 있기 때문에 순수 Java/Kotlin 을 사용해야 하는 Domain Layer에서 사용하기 적합하지 않다.

3. LiveData -> Flow로 대체 시 발생하는 문제

그렇다면 LiveData를 Flow의 방식으로 대체해야 하는데 이러한 방식엔 문제가 있었다.

  • Flow 는 스스로 안드로이드 생명주기에 대해 알지 못한다. 그래서 라이프사이클에 따른 중지나 재개가 어렵다.
  • Flow 는 상태가 없어 값이 할당된 것인지, 현재 값은 무엇인지 알기가 어렵다.
  • Flow 는 Cold Stream 방식으로, 연속해서 계속 들어오는 데이터를 처리할 수 없으며 collect 되었을 때만 생성되고 값을 반환한다. 만약 하나의 flow builder 에 대해 다수의 collector 가 있다면 collector 하나마다 하나씩 데이터를 호출하기 때문에 업스트림 로직이 비싼 비용을 요구하는 DB 접근이나 서버 통신 등이라면 여러 번 리소스 요청을 하게 될 수 있다.

4. StateFlow의 등장!

이를 위해 Kotlin 1.41 버전에서 Stable API로 등장한 것이 SharedFlow, StateFlow이다. StateFlow는 SharedFlow의 한 종류인데, LiveData에 가장 가까운 기능이다. StateFlow는 현재 상태와 새로운 상태 업데이트를 collector 에 내보내는 Observable State holder flow이다. LiveData를 대체할 수 있도록 나왔기 때문에, Livedata 처럼 value 프로퍼티를 통해 현재 상태 값을 읽을 수 있다. 앞서 정리하자면 StateFlow는 데이터 홀더(LiveData) 역할을 하면서 Flow의 데이터 스트림 역할까지 한다. 따라서

  • StateFlow는 항상 값을 가지고 있고 오직 한 가지 값을 가진다. (like LiveData)
  • StateFlow는 여러 개의 collector를 지원한다. 이는 flow가 공유된다는 의미이며 앞서 설명했던 flow의 단점(Cold Stream)과는 다르게 업스트림이 collector마다 중복으로 처리되지 않는다.
  • StateFlow는 collector수에 관계없이 항상 구독하고 있는 것의 최신 값을 받는다.
  • StateFlow는 flow builder를 사용하여 빌드된 flow가 cold stream이었던 것과 달리, hot stream이다. 따라서 collector에서 수집하더라도 생산자 코드가 트리거 되지 않고, 일반 flow는 마지막 값의 개념이 없었던 것과 달리 StateFlow는 마지막 값의 개념이 있으며 생성하자마자 활성화된다.

이제 MVVM 패턴에서 LiveData가 사용되는 자리에 StateFlow로 대체하는 것이 가능해졌다. 하지만 StateFlow는 Flow를 위한 데이터 홀더 클래스이기 때문에, 다음과 같은 것이 다르다.

StateFlow, LiveData 동작의 차이점

  • StateFlow 의 경우 초기 상태를 생성자에 전달해야 하지만, LiveData 의 경우는 전달하지 않아도 된다.
  • View 가 STOPPED 상태가 되면 LiveData.observe() 는 Observer 를 자동으로 등록 취소하는 반면, StateFlow 는 자동으로 collect 를 중지하지 않는다. 만약 동일한 동작을 실행하려면 Lifecycle.repeatOnLifecycle 블록에서 흐름을 수집해야 한다. 이는 구현 코드에서 보겠다.

Flow + Data Holder의 구현 코드 1

CoroutineFlow + LiveData

class MyViewModel {
    private val _myUiState = MutableLiveData<Result<UiState>>(Result.Loading)
    val myUiState: LiveData<Result<UiState>> = _myUiState
 
    // Load data from a suspend fun and mutate state
    init {
        viewModelScope.launch { 
            val result = ...
            _myUiState.value = result
        }
    }
}

CoroutineFlow + StateFlow

class MyViewModel {
    private val _myUiState = MutableStateFlow<Result<UiState>>(Result.Loading)
    val myUiState: StateFlow<Result<UiState>> = _myUiState
 
    // Load data from a suspend fun and mutate state
    init {
        viewModelScope.launch { 
            val result = ...
            _myUiState.value = result
        }
    }
}

LiveData와 StateFlow의 사용법이 별반 차이가 없다.

Flow + Data Holder의 구현 코드 2

CoroutineFlow +LiveData

class MyViewModel(...) : ViewModel() {
    val result: LiveData<Result<UiState>> = liveData {
        emit(Result.Loading)
        emit(repository.fetchItem())
    }
}
//liveData{} 는 코루틴 빌더

CoroutineFlow + StateFlow

class MyViewModel(...) : ViewModel() {
    val result: StateFlow<Result<UiState>> = flow {
        emit(repository.fetchItem())
    }.stateIn(
        scope = viewModelScope, 
        started = WhileSubscribed(5000), // Or Lazily because it's a one-shot
        initialValue = Result.Loading
    )
}

LiveData와 StateFlow의 각각 사용법의 차이가 있다. 바로 StateFlow 사용 코드에서 flowBuilder에 stateIn() 메서드를 사용하여 Flow를 StateFlow 객체로 변환해준 것이다.

stateIn 메서드?

Flow을 StateFlow로 변환해주는 메서드이다.
stateIn 메서드가 필요한 이유는 리액티브 프로그래밍에 있다. 리액티브 프로그래밍을 할 때, 여러 데이터 스트림(flow)를 하나의 데이터 스트림(flow)로 만들어 낸다. 왜 합치는 것일까. 예를 들어 우리가 영화 평점 앱을 만드는 경우, 영화 정보, 사용자 정보, 영화 평점 등의 정보을 합쳐 하나의 객체로 만들어야되기 때문이다. 하나로 만들어진 Flow는 UI에서 사용되기 위해 StateFlow로 변환되어야 한다. UI는 이 StateFlow를 구독하고 있는 것이다.

이러한 과정을 수행하기 위해서 Flow를 StateFlow(데이터 홀더)로 변환하는 로직이 필요한 것이다. 또한 LiveData와 달리 StateFlow는 View LifeCycle을 감지하지 않는다. 따라서 StateFlow가 항상 Flow를 구독하여 메모리 누수가 발생하지 않도록, CoroutineScope를 명시해줘야 한다. 우리는 이를 stateIn 메서드를 통해 작업한다. stateIn()가 필요로 하는 세 가지 변수는 다음과 같다.

  • scope : StateFlow가 Flow로부터 데이터를 구독 받을 CoroutineScope를 명시한다.
  • started : Flow로부터 언제부터 구독을 할지 명시할 수 있다.
  • initialValue : StateFlow에 저장될 초기값을 설정한다.
val stringFlow : Flow<String> = flow{
   for(1 in 0..1000){
      emit("integer: $i")
      delay(1000)
   }
}
  • 따라서 다음과 같은 Flow가
val stateFlow = stringFlow.stateIn(
   initialValue = "integer 0",
   started = SharingStarted.WhileSubscribed(5000),
   scope = viewModelScope
)
  • 다음과 같은 StateFlow로 변환된다. 따라서 해당 StateFlow는 초기 저장값이 "integer 0", 구독 후 5초 후에 처음 발행, 해당 ViewModel의 생명주기 동안만 구독받는다.

RxJava vs Coroutine 1
RxJava vs Coroutine 2
Kotlin World - CoroutineFlow 참고 글
LiveData vs StateFlow 참고 글

profile
한 눈에 보기 : https://velog.io/@dongwan999/LIST

0개의 댓글