꽤나 예전부터 회사에서 쓰고 있던 레거시로 RxJava가 있는데, 우린 이 RxJava를 쓰는 걸 지양하자고 이야기를 나눴다. RxJava를 완전 도려내기엔 생각보다 많이 쓰이고, 엮여 있었기 때문이었다.
우리 프로젝트에서 deprecated 시키고자 했는데, 기존에 쓰고 있던 코드들은 어떻게 바꿔야할까?
api 호출은 java를 kotlin으로 바꾸고 Single Language로 통일하면서 코루틴으로 변경하면 되는데 문제는 EventBus였다.
RxJava를 이용해서 썼던 RxBus(a.k.a eventbus)는 어떤 걸로 대체해야하며, 우리 프로젝트에 알맞은 게 뭘까?
일단 먼저 기존에 썼던 RxJava
를 쓰면 어떤 문제가 있는지 살펴보자
Q) 설명만 보면 앱 개발에 꼭 필요한 라이브러리 아닌가요?
-> 물론 좋은 라이브러리다. 그치만 개발하고자 하는 프로젝트에 과연 맞는지 생각해보자.
-> RxJava 기능을 반도 쓰지 못하고 그저 data stream으로만 이용할 뿐이라면 우린 이 라이브러리를 쓰는게 적합할까?
-> Google Android에서는 계속해서 better한 android 개발을 위해 좋은 라이브러리를 내고, 밀고 있다. 공식에서 os에 맞는 라이브러리를 제공해주는데 사용을 마다할 필요가 있을까?
-> 또한, Rx에는 몇 가지 단점들이 있다. 아래에서 마저 알아보자
단점 중 하나는 메모리 누수다.
메모리 누수는 보통 참조가 완료되었지만 할당한메모리를 해제하지않아서 발생한다. 특히 강한 참조의 경우 가비지 컬렉터가 메모리에서 객체를 제거할 수 없으므로 라이프사이클에 맞게 참조를 끊어야 사용하지 않는 메모리를 해제할 수 있다. 이는 시스템에도 영향이 가기 때문에 잘 관리해야 한다.
한 예시로 아래 코드가 있다.
DisposableObserver<String> observer = new DisposableObserver<String>() {
@Override
public void onNext(String s) {
textView.setText(s);
}
@Override
public void onError(Throwable e) { }
@Override
public void onComplete() { }
};
mDisposable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Hello Rx!");
e.onComplete();
}
}).subscribeWith(observer);
}
그냥 막상 보면 누수가 어디있는지 모르겠다. 정상 작동할 거처럼 생겼지만, 메모리 누수가 있는 코드이다.
Observable은 안드로이드 컨텍스트를 복사하여 유지하고 onComplete(), onError() 함수가 호출되면 내부에서 자동으로 unsubscribe() 함수를 유지한다.
그런데 subscriber가 텍스트뷰를(textView.setText(s);
) 참조하기 때문에 액티비티가 비정상적으로 종료되면 텍스트뷰가 참조하는 액티비티는 종료해도 가비지 컬렉션의 대상이 되지 못한다. 따라서 메모리 누수가 발생한다.
물론 대안으로 여러가지가 있다.
1) RxLifecycle 라이브러리 사용 -> 액티비티 부모를 RxAppCompatActivity으로 변경하고 compose() 함수를 이용해야 함
2) CompositeDisposable 클래스 이용 -> 직접 개발자가 라이프사이클 맞춰서 dispose()
시켜야 함
그래도 불구하고 이런 의문이 드는 것이다.
굳이 개발자가 라이브러리를 여러개 써야하고 라이프사이클에 맞춰 코드를 넣어줘야할까? 만약 코드 넣는 걸 잊었다면? 이미 Android는 Jetpack 라이브러리를 통해 안드로이드 앱 개발자에게 좋은 라이브러리를 제공하려고 하는데 불편한 라이브러리를 고집해야할 이유가 있을까?
(RxKotlin, RxAndroid도 RxJava에서 문법적으로 플랫폼적으로 잘 쓰기위해 develop한 reactive 기반이므로 비슷하다. 그러므로 이하 생략하겠다)
또한, 단점 중 하나로 Rx의 가독성은 좋지 않다. 많은 기능을 제공하는 라이브러리다 보니 많은 콜백함수와 러닝커브가 존재한다.
개발자는 다른 말로 Code Reader이므로 가독성도 중요하단 걸 잊지말자
그럼 대체 뭐를 사용해야할까? Rx를 대체해서 어떤 라이브러리를 사용해야할까?
이렇게 2개를 꼽아보고 싶다.
+) 안드 앱 개발자 사이에서 api 호출 및 Thread 관리에는 이미 Kotlin Coroutines을 쓰는게 명백해졌으니 rx vs coroutine
은 이 포스팅에서 다루지 않겠다.
Flow는 코루틴 기반으로 빌드되며 여러 값을 제공할 수 있습니다. Flow는 비동기 데이터 스트림의 개념입니다. 내보낸 값은(emitted value) 동일한 유형이어야 합니다. 예를 들어
Flow<Int>
는 Int 값을 내보내는 흐름입니다.
출처: 안드로이드 Flow 한글 공식 문서
코루틴 기반으로 만들어진 라이브러리로 비동기 패러다임을 갖고 있다.
자세한 건 예전에 이런 포스팅을 했으니 궁금하다며 읽어보길 추천한다!
그렇다면 자세하게 Rx와 다른 점이 무엇일까?
-> 생명주기에 따른 메모리 관리
Rx 의 경우는 CompositeDisposable 등을 만들어서 그곳에서 관리를 해주어야 한다.
private val disposables = CompositeDisposable()
private fun initFeed(){
val disposable = feedRepo.initFeed()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { results ->
when(results.status) {
LOADING -> // do loading..
SUCCESS -> // do success with results.data
ERROR -> // do error..
}
}
disposables.add(disposable)
}
private clearDisposables(){
disposables.clear()
}
Flow 가 init 될 때 launchIn 을 통해 lifecycle 이 정의된다.
viewModelScope 는 기본적으로 Dispatchers.Main.immediate thread 를 사용하며, android ViewModel 과 생명주기를 함께 한다. 그래서 main thread 에서 view state 를 return 하는 것이면 충분하다.
feedRepo.initFeed().onEach { results ->
when (results.status) {
LOADING -> // do loading..
SUCCESS -> // do success with results.data
ERROR -> // do error..
}
}.launchIn(viewModelScope)
-> 코드 양의 차이
만약 연락처 앱을 만든다고 가정하고, 연락처 storage에 접근해야한다고 한다면?
@Singleton
class ContactRepository @Inject constructor(
private val storage: DataSource
) {
fun getAll(): Observable<Contact> = Observable.create { emitter ->
try {
storage.getAll().forEach { contact ->
emitter.onNext(contact)
}
emitter.onComplete()
} catch (e: Exception) {
emitter.onError(e)
}
}
}
@Singleton
class ContactRepository @Inject constructor(
private val storage: DataSource
) {
fun getAll(): Flow<Contact> = flow {
storage.getAll().forEach { contact ->
emit(contact)
}
}
}
Kotlin Flow는 몇 가지 확장 기능 을 포함하여 더 간단하지만 확장 가능한 API를 제공한다. API는 언어에 잘 통합된 느낌을 주는 "Kotlinic"
RxJava에서 Observable.create { emitter -> ... }
빌더가 flow { ... }
빌더로 대체되고 명시적인 try-catch를 제거할 수 있다. Flow는 자동으로 데이터 스트림을 닫고 데이터 소스에서 예외가 발생하면 상위 계층으로 전달된다.
그럼에도 불구하고 기존 RxJava
는 워낙 operator도 많고 확장 가능한 함수가 많은 api이므로 "아 이런건 Flow는 안돼?" 같은 불만이 있을 수 있다. 또한, Flow는 기본적으로 코루틴에서 value를 순차적으로 emit하므로 Cold Stream 형태를 띄고 있다.
그래서 Flow를 상속받는 Hot Stream인 SharedFlow
, StateFlow
를 소개하고자 한다.
- Cold stream
collect() (또는 이를 subscribe 할 때)를 호출할 때마다 flow block이 재실행 된다. 즉 1~10까지 emit 하는 flow가 있다면 collect 할때마다 1~10을 전달 받는다. 여러곳에서 collect를 호출하면 각각의 collect에서 1~10을 전달받는다.
- Hot Stream
collect (또는 이를 subscribe 할때)를 호출하더라도 flow block이 호출되지 않는다. collect() 시점 이후에 emit 된 데이터를 전달받는다.
쉽게 말하면, 값을 캐싱하고 있으며 등록된 collector 들에게 새로운 값을 전달한다.
이 두 가지의 상속 관계는 다음과 같다
Flow <- SharedFlow <- StateFlow
StateFlow : Rx의 BehaviorSubject와 비슷하다.
StateFlow는 값이 업데이트 된 경우에만 반환하고 동일한 값을 반환하지 않는다.
StateFlow는 기본적으로 같은 값은 emit 하지 않기에 distinctUntilChanged 연산자의 기능을 가지고 있다. 그래서 업데이트될 때 같은 값이라면 필터링 된다.
initial value를 갖고 있기에 항상 값을 가지고 있고 value를 통해 현재 갖고 있는 값에 바로 접근할 수 있다.
SharedFlow : Rx의 PublishSubject 유사하다.
StateFlow와는 달리 replay 를 통해 캐싱하고 있을 값의 개수를 정할 수 있다.
값의 버퍼가 가득 차면 어떤 일이 발생하는지 정의할 수 있다. (BufferOverFlow를 통해 버퍼가 가득 찬 경우의 대처법을 설정 가능)
그래도 이 둘의 차이가 이해가 가지 않을 수 있다. 이 둘은 어느 상황에서 적용하는게 맞을까?
우리는 애플리케이션을 개발할 때 필연적으로 “상태”와 “이벤트” 를 다루게 된다.
MVVM 아키텍처를 이용할 경우, 보통 UI 와 관련된 “상태” 들은 ViewModel (VM) 에 위치하여 ViewModel 에 바인드 된 View 들이 상태를 표현할 수 있도록 한다.
필요한 상태를 정의하고, 애플리케이션은 정의 된 상태들로 전환해 가면서 실행될 것이다. 이러한 애플리케이션 “상태” 와 함께 우리가 애플리케이션을 개발할 때 고려해야 하는 다른 한가지는 바로 “이벤트” 이다.
그럼 Subscriber 입장에서 상태와 이벤트는 어떻게 다를까?
물론 LiveData를 써도 된다. 그치만 이 라이브러리도 상황에 맞게 써야한다.
Android 에서는 ViewModel 에서 이런 상태 데이터를 다루기 위한 장치로 LiveData 를 제공하고 있다.
LiveData 는 Android lifecycle에 종속성을 가지므로 편리하게 사용할 수 있다. 다만, LiveData 는 Lifecycle 이 있는 UI 와 인터렉션 하도록 디자인 되었으며 비지니스 레이어에서의(도메인 및 데이터 레이어까지 포괄적인 용어) 사용에는 무리가 있다. 특히, 비지니스 레이어를 플랫폼 독립적으로 가져가고자 한다면 더욱이 그렇다.
Flow는 이런 종속성 없이 비즈니스 레이어에 사용할 수 있다.
(StateFlow의 경우 초기 상태를 생성자에 전달해야 하지만 LiveData의 경우는 전달하지 않기도 한다.)
그리고 뷰가 STOPPED 상태가 되면 LiveData.observe()는 소비자를 자동으로 등록 취소가 되서, 이런 livedata 특성을 StateFlow를 갖지 않잖아요? 라고 의문이 들 수 있다
왜냐하면 StateFlow 또는 다른 Flow에서 collect {}
하는 경우 자동으로 수집을 중지하지 않기 때문인데, 이 또한 해결방법이 있다.
소비자 자동 등록 취소처럼 동일한 동작을 실행하려면 Lifecycle.repeatOnLifecycle 블록에서 Flow을 collect {}
하면 된다.
class LatestNewsActivity : AppCompatActivity() {
private val latestNewsViewModel = // getViewModel()
override fun onCreate(savedInstanceState: Bundle?) {
...
// Start a coroutine in the lifecycle scope
lifecycleScope.launch {
// repeatOnLifecycle launches the block in a new coroutine every time the
// lifecycle is in the STARTED state (or above) and cancels it when it's STOPPED.
repeatOnLifecycle(Lifecycle.State.STARTED) {
// Trigger the flow and start listening for values.
// Note that this happens when lifecycle is STARTED and stops
// collecting when the lifecycle is STOPPED
latestNewsViewModel.uiState.collect { uiState ->
// New value received
when (uiState) {
is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
is LatestNewsUiState.Error -> showError(uiState.exception)
}
}
}
}
}
}
물론, Flow는 Android ViewModel 에서 AndroidX 를 통해 제공되는 Coroutine viewModelScope 를 이용하여 ViewModel lifecycle 에 바인딩 되어 편리하게 사용할 수 있기도 하다.
종속성은 없되, lifecycle을 따를 수 있단 점에 있어 매력적으로 보인다.
그리고 Flow 가 Rx Observable 과 비교하여 갖는 강력한 장점 중 하나는 Flow chain 에서 suspend 함수
를 사용할 수 있다. 이것은 코드의 많은 부분에서 가독성을 높이고 유지보수를 용이하게 해줄 수 있을 거다.
위에서 잠깐 언급한거처럼 stateflow는 init value를 지정할 수 있다. 또한 이미 업데이트 됐던 값에 대해 필터링이 가능하므로 상태 변경에서 주로 쓰는게 용이하다.
@HiltViewModel
class MainViewModel(
private val refreshDataUseCase: RefreshDataUseCase,
@Assisted private val savedStateHandle: SavedStateHandle,
) : ViewModel() {
private val _dataState: MutableStateFlow<UiState<List<MembershipUiModel>>>
= MutableStateFlow(UiState.InProgress())
val dataState = _dataState.asStateFlow()
fun refreshData() = viewModelScope.launch {
runCatching {
refreshDataUseCase.execute()
}.onSuccess { data ->
_dataState.emit(UiState.Success(data))
}.onFailure { throwable ->
_dataState.emit(UiState.Fail(throwable))
}
}
}
State는 기본 값을 갖고 작동하지만, 이벤트는 기본값 없이 특정 상황이 발생했을 때 subscriber들에게 발생한 상황을 특정 이벤트 형태로 전달한다.
일반적으로 UI Layer 에서 사용자와 뷰의 인터렉션으로 인해 발생한 이벤트를 정의하여 전달하거나, 시스템에 발생 한 메모리 부족, 인증 오류 발생 등의 이벤트에 관련 컴포넌트들이 대응할 수 있도록 하기 위해 사용한다.
SharedFlow가 이벤트에 적합하다고 여기는 이유에 대해서 예제를 통해 더 알아보자
@HiltViewModel
class MainViewModel(
@Assisted private val savedStateHandle: SavedStateHandle,
) : ViewModel() {
private val _systemEvent: MutableSharedFlow<Unit> =
MutableSharedFlow(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val systemEvent = _systemEvent.asSharedFlow()
init {
viewModelScope.launch {
systemEvent.collect { systemEvent ->
when(systemEvent) {
is SystemEvent.MemoryWarning -> { TODO() }
is SystemEvent.StorageWarning -> { TODO() }
else -> { }
}
}
}
}
fun reportSystemEvent(systemEvent: SystemEvent) {
_systemEvent.emit(systemEvent)
}
}
전체적인 구조는 StateFlow 와 유사하다고 볼 수 있다. 다만 MutableSharedFlow 생성 시 몇 가지 옵션을 전달하여 SharedFlow 의 동작을 재정의 할 수 있다.
replay = 0 : 새로운 구독자에게 이전 이벤트를 전달하지 않음
extraBufferCapacity = 1 : 추가 버퍼를 생성하여 emit 한 데이터가 버퍼에 유지되도록 함
onBufferOverflow = BufferOverflow.DROP_OLDEST : 버퍼가 가득찼을 시 오래된 데이터 제거
BufferOverflow.SUSPEND : buffer가 꽉 찼을 때 emit을 수행하면 emit 코드가 blocking. 즉, buffer의 빈자리가 생겨야 emit 코드 이후의 코드가 수행 가능.
BufferOverflow.DROP_OLDEST : buffer가 꽉 찼을 때 emit을 수행하면 오래된 데이터 부터 삭제하면서 새로운 데이터를 넣기.
BufferOverflow.DROP_LATEST : buffer가 꽉찼을때 emit을 수행하면 최근 데이터를 삭제하고 새로운 데이터를 넣기.
위 세가지 옵션(replay, extra...., onBuffer..) 을 통해 우리가 원하는 SharedFlow 를 생성할 수 있으며, 이는 우리가 RxJava/Kotlin 에서 사용하던 PublishSubject 와 유사하다고 할 수 있다.
+) 번외
MutableSharedFlow의 내부 코드
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
require(replay >= 0) { "replay cannot be negative, but was $replay" }
require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }
require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {
"replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"
}
val bufferCapacity0 = replay + extraBufferCapacity
val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}
여기까지 달려오느라 고생했다! EventBus를 과연 어떤 라이브러리로 대체할 것인가.. 에 대해 말하다보니 간략하게 정리한다해도 개념 정리 위주로 달려온 거 같다.
지금까지 글을 적은 걸 보면 눈치 채지 않았을까 싶다. 바로 RxJava로 EventBus로 쓸 라이브러리는 SharedFlow
를 사용할 것이다.
우리 프로젝트는 언어는 코틀린으로 가져가고 MVVM 싱글 아키텍처로 맞출 예정이다. 또한 특정 상황에서의 이벤트를 받기만 하면 되기 때문에, RxJava 대신 경량화된 Reactive streams 구현체 라고 볼 수 있는 Flow 를 이용하여 이를 구현할 것이다.
그러므로 Flow를 대체해서 사용할 것이고 이벤트를 다루는 EventBus는 SharedFlow로 구현하려고 한다.
아래 구현한 예제는 정말 초보적인 구현 example이므로 참고만 해달라 :)
MainActivity에서 아무것도 안하고 다른 activity를 돌아다니다가 wish event를 발생하면 Main에서 subscribe하고 있었기 때문에 해당 event 값 가지고 view 반영
class EventBus {
private val _events = MutableSharedFlow<WishEvent>()
private val events = _events.asSharedFlow()
suspend fun invokeEvent(event: WishEvent) = _events.emit(event)
suspend fun subscribeEvent(onEvent: (WishEvent) -> Unit) {
events.collect {
onEvent(it)
}
}
}
enum class WishEvent {
Wished,
UnWished
}
@AndroidEntryPoint
class EmitActivity : AppCompatActivity() {
private lateinit var binding: ActivityEmitBinding
private val viewModel: EmitViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
binding = ActivityEmitBinding.inflate(layoutInflater)
setContentView(binding.root)
binding.btnWishChanged.setOnClickListener {
binding.btnWishChanged.isSelected = binding.btnWishChanged.isSelected.not()
viewModel.onWishUpdated(it.isSelected)
}
}
}
@HiltViewModel
class EmitViewModel @Inject constructor(private val eventBus: EventBus) : ViewModel() {
fun onWishUpdated(selected: Boolean) {
viewModelScope.launch {
when(selected) {
true -> eventBus.invokeEvent(WishEvent.Wished)
false -> eventBus.invokeEvent(WishEvent.UnWished)
}
}
}
}
@AndroidEntryPoint
class MainActivity : AppCompatActivity() {
private lateinit var binding: ActivityMainBinding
private val viewModel: MainViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
binding = ActivityMainBinding.inflate(layoutInflater)
setContentView(binding.root)
binding.btnActivityStart.setOnClickListener {
val intent = Intent(this, BActivity::class.java)
startActivity(intent)
}
observe()
}
private fun observe() {
viewModel.wishUIState.observe(this, {
binding.wish.isSelected = it
})
}
}
@HiltViewModel
class MainViewModel @Inject constructor(private val eventBus: EventBus) : ViewModel() {
private val _wishUIState = MutableLiveData<Boolean>()
val wishUIState = _wishUIState
init {
initEventBusSubscribe()
}
private fun initEventBusSubscribe() {
viewModelScope.launch {
eventBus.subscribeEvent {
when (it) {
WishEvent.Wished -> _wishUIState.value = true
WishEvent.UnWished -> _wishUIState.value = false
}
}
}
}
}
https://stackoverflow.com/questions/42066066/how-kotlin-coroutines-are-better-than-rxkotlin
https://myungpyo.medium.com/stateflow-%EC%99%80-sharedflow-32fdb49f9a32
https://mashup-android.vercel.app/mashup-11th/mkspace/coroutineflow/coroutine_flow/
https://www.raywenderlich.com/22030171-reactive-streams-on-kotlin-sharedflow-and-stateflow
잘읽었습니다