[Kotlin] Flow에 대하여 (feat: RxJava)

Falco·2023년 4월 24일
1

Android

목록 보기
49/55
post-thumbnail

ReactiveX와 Rxjava에 관하여

지난번에는 ReactvieX와 Rxjava에 대하여 간단하게 알아보았다.

간단하게 요악하자면

Rxjava에서는 쓰레드와 스케줄러를 활용하여 반응형 프로그래밍을 제공한다.
ObservableOperator를 활용해 데이터스트림을 조작하고 구독하며 비동기 작업을 수행한다.

안드로이드 개발에서도 RxJava는 지속해서 쓰이고 있지다. 현재 2.x 버전은 유지보수 모드로 버그 픽스만 적용될 뿐 새로운 기능 업데이트는 없고 현재 3.x버전이 업데이트 되고 있다. [RxJava - Github]


Flow에 관하여

Kotlin에서는 Flow는 비동기 데이터 스트림 처리 라이브러리이다. 코루틴을 기반으로 작동하며 반응형 프로그래밍을 구현할 수 있다.

// Dependency
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4")

기본적으로 반응형 프로그래밍은 Producer(생산자), Intermediary(중간 연산자), Consumer(소비자)로 이루어진다.

다음은 간단한 숫자를 생산하고 소비하는 예제이다.

// Producer
val emitter = flow<Int> {
    (0..100).forEach {
        println("emit $it")
        emit(it)
    }
}


fun main() = runBlocking {
    emitter
        // Intermediary
        .filter {
            it % 2 == 0
        }
        // Consumer
        .collect {
            println("collect: $it")
        }
}

이런 Flow는 기본적으로 Cold Stream이며 상태를 가지지 않으며, 새로운 데이터가 들어올 때 마다 새로운 스트림이 형성된다.

예시로는 TVOTT 생각해보자. TV는 켰을 때 현재 방송중인 프로그램(방출중인 데이터)를 수신해야 하지만, OTT는 기존의 보았던 시간때부터 다시 프로그램을 볼 수 있다.(이전 상태를 기록하고 있음)

Cold Stream을 활용하면 사용자가 핸드폰 화면을 돌릴 때 마다 데이터가 초기화 될 것이고, 서버 또는 DB로부터 데이터를 다시 가져와야 한다.

이를 방지하기 위해 StateFlow를 제공한다. 이는 Hot Stream이며 상태를 가지는 동시에 데이터 스트림의 역할까지 수행한다. StateFlowViewModel단에 저장하고 화면이 돌아가도 데이터 홀더의 역할을 수행하기에 데이터를 다시 불러올 필요가 없다.

Android Developer - 상태 생성 및 관리

data class DiceUiState(
    val firstDieValue: Int? = null,
    val secondDieValue: Int? = null,
    val numberOfRolls: Int = 0,
)

class DiceRollViewModel : ViewModel() {

    private val _uiState = MutableStateFlow(DiceUiState())
    val uiState: StateFlow<DiceUiState> get() = _uiState.asStateFlow()

    // Called from the UI
    fun rollDice() {
        _uiState.update { currentState ->
            currentState.copy(
            firstDieValue = Random.nextInt(from = 1, until = 7),
            secondDieValue = Random.nextInt(from = 1, until = 7),
            numberOfRolls = currentState.numberOfRolls + 1,
            )
        }
    }
}

Fragment, Activity등의 생명주기에 맞추어 uiStateObserving하며 뷰의 업데이트 수행한다.

lifecycleScope.launch {
	repeatOnLifecycle(Lifecycle.State.STARTED) {
    	viewModel.uiState.collect { uiState ->
        
        }
	}
}

옵저빙을 수행하며 뷰를 업데이트할 때 Livedata를 활용하는 것과 어떤 차이가 있을까?는 LiveData 및 StateFlow에 대하여를 참고해보자!


참고) Compose에서는 Flow말고 State를 활용하여 상태를 저장하고, 이를 옵저빙할 필요가 없다!

Compose에서의 UI 상태 관리 소스

@Stable
interface DiceUiState {
    val firstDieValue: Int?
    val secondDieValue: Int?
    val numberOfRolls: Int?
}

private class MutableDiceUiState: DiceUiState {
    override var firstDieValue: Int? by mutableStateOf(null)
    override var secondDieValue: Int? by mutableStateOf(null)
    override var numberOfRolls: Int by mutableStateOf(0)
}

class DiceRollViewModel : ViewModel() {

    private val _uiState = MutableDiceUiState()
    val uiState: DiceUiState = _uiState

    fun rollDice() {
        _uiState.firstDieValue = Random.nextInt(from = 1, until = 7)
        _uiState.secondDieValue = Random.nextInt(from = 1, until = 7)
        _uiState.numberOfRolls = _uiState.numberOfRolls + 1
    }
}

Flow를 응용해보자.

Flow는 데이터 파이프라인을 생성하며 이 데이터는 collect에 의해 소비된다.

다음 설명내용은 해당 블로그의 내용들을 [collect와 collectLatest의 차이점]를 많이 참고하였다.

데이터의 지연

위에서 예제로 보았던 emitter를 그대로 들고오자.

val emitter = flow<Int> {
    (0..20).forEach {
    	delay(100)
        println("emit $it")
        emit(it)
    }
}

그리고 데이터를 소비하는데 엄청난 시간이 걸리는 소스를 작성해보자.

fun main() = runBlocking {
    emitter
        .collect {
        	// 엄청난 처리중...
            delay(Long.MAX_VALUE)
            println("collect: $it")
        }
}

특정 데이터를 처리하기위해 많은 시간이 소요된다고 하면 그다음 데이터를 처리하는데 엄청난 지연이 발생할 것이다.

소비가 안돼...

이 이유는 Flowcollect함수는 이전 데이터의 소비가 끝나야지 다음 데이터를를 소비하기 때문이다.

이를 방지하기위해 collectLatest를 활용할 수 있다.

이는 새로운 데이터 스트림이 들어오면 기존의 처리를 강제 종료시키고 새로운 데이터를 처리한다.

fun main() = runBlocking {
    emitter
        .collectLatest {
            delay(200)
            println("collect: $it")
        }
}

0.2초를 기다리고 데이터를 소비하는데 0.2초가 지나기전에 새로운 데이터 스트림이 들어와 마지막 20만이 소비되는 것을 볼 수 있다.

이것이 collectLatest의 한계점이기도 하다. 해당 데이터를 처리하는 속도가 emit하는 속도보다 느리다면 마지막 데이터스트림까지 뷰에 아무런 업데이트도 수행되지 않을 것이다.

이를 방지하기 위해서는 conflate()를 활용할 수 있다. 이는 한번 시작 된 데이터 스트림의 소비는 끝날 때 까지 수행하고, 끝난 시점에서의 가장 최신 데이터를 소비하는 것이다.

데이터를 발행하는데 0.1초가 걸리고 소비가 0.2초 걸릴 때 결과는 다음과 같다.

fun main() = runBlocking {
    emitter
        .conflate()
        .collect() {
            delay(200)
            println("collect: $it")
        }
}

0이 소비됬을 때 최신 상태인 1을 소비하고 1이 소비되었을 때 최신 상태인 3을 소비한다.

collect의 발행과 소비의 방식

flowcollect를 활용하면 하나의 Coroutine에서 발행과 소비가 같이 일어나기 때문에 데이터를 다 소비한 후 다음 데이터가 발행된다.

발행과 소비에 모두 시간이 오래걸린다면 이것은 매우 비효율적인 코드가 될 것이다.

val emitter = flow<Int> {
    (0..10).forEach {
        delay(1000)
        println("emit $it")
        emit(it)
    }
}


fun main() = runBlocking {
    emitter
        .collect {
            delay(2000)
            println("collect: $it")
        }
}

--------

emit 0	// 1초
collect: 0 // 3초
emit 1 // 4초
collect: 1 // 6초

이를 방지하기 위해 buffer를 활용하여 발행과 소비를 위한 코루틴을 분리할 수 있다.

buffer()란??

지정된 용량의 채널을 통해 흐름 방출을 버퍼링하고 별도의 코루틴에서 컬렉터를 실행합니다.

간단하게 말하자면 발행을 위한 코루틴 채널을 따로 분리한다고 생각하면 된다.

기본적으로 flow의 발행과 소비는 순서적이며 Q자료구조를 가지고 실행된다.

하지만 Buffer()를 활용하면 데이터를 새로운 코루틴에서 발행한다.

이를 활용하여 다음 예제를 확인해보자.

// 0.1초마다 데이터스트림 방출
fun main() = runBlocking {
    emitter
        .buffer()
        .collect {
            delay(200)
            println("collect: $it")
        }
}

데이터스트림의 방출과 소비가 각각 다른 코루틴에서 진행되는 것을 볼 수 있다.


여기서 Backpressure이라는 문제를 알고 넘어가면 좋을 것 같다.

Backpressure란?

[Backpressure in yout Kotlin Flows]

한국어로 번역하면 배압이라고 하며 간단하게 설명하면 데이터스트림의 병목현상이라고 할 수 있겠다.

데이터를 방출하는 속도보다 소비하는 속도가 느릴 때 발생한다.

RxJava에서는 Observable이 계속 쌓이면서 배압을 제어하지 못해 MissingBackpressureException이 발생할 수 있다.

하지만 Kotlin Flow에서는 위에 예시들을 활용하여 이를 관리할 수 있다.

  • collect함수는 기본적으로 순서적으로(Q)처럼 작동하여 배압현상이 일어나지 않는다.
  • collectLatest활용해 가장 최신의 데이터 스트림만 활용할 수 있다.
  • conflate()함수를 활용해 가장 최신의 데이터 스트림만 활용할 수 있다.

    하지만 buffer()를 활용하면 새로운 코루틴에 해당 값이 쌓이면서 이 backpressure현상이 발생할 수 있다. 따라서 capacity를 조절하거나, dropLatest()연산자를 활용하여 사이즈를 조절해야 한다.


플로우끼리 합치기

flatMapConcat

flatMapConcatFlow에서 각각의 데이터를 처리하기 위해 다른 Flow를 호출하고, 그 결과들을 순차적으로 결합하여 새로운 Flow를 만드는 연산자이다.

val nums = flowOf(1, 2, 3)

nums.flatMapConcat { num ->
    flow {
        emit(num * 1)
        delay(1000)
        emit(num * 2)
        emit(num * 3)
    }
}.collect { println(it) } // 1, 2, 3, 2, 4, 6, 3, 6, 9가 출력됩니다.

flatMapConcat의 Flow의 처리는 순차적이며 이전 플로우가 발행되어야 다음 Flow가 실행된다.

flatMapLatest함수도 제공하며 이는 가장 최신의 flow만 소비한다.

flatMapMerge

flatMapMerge는 각각의 다른 Flow들을 결합하여 새로운 Flow를 만드는 연산자이다.

flatMapConcat과의 차이점은 병렬적으로 실행되어 이전 처리가 완료되지 않아도 수행된다는 것이다. 따라서 순서를 보장하지 않는다.

val nums = flowOf(1, 2, 3)

nums.flatMapMerge { num ->
    flow {
        emit(num * 1)
        delay(1000)
        emit(num * 2)
        emit(num * 3)
    }
}
.collect { println(it) } 
// 1, 2, 3, 2, 3, 4, 6, 6, 9가 출력됩니다.

1, 2, 3이 먼저 출력된 후에 delay(1000)를 거치고 난 후 그다음 값들이 emit

combine

combine은 여러 개의 Flow를 동시에 처리하고, 각 Flow에서 발행한 데이터를 조합하여 새로운 데이터를 만드는 연산자이다.

이 때, 모든 Flow가 새로운 데이터를 발행할 때마다 새로운 데이터를 만들어 낸다. 이는 병렬적으로 실행된다.

val nums1 = flowOf(1, 2, 3)
val nums2 = flowOf(10, 20, 30)

nums1.combine(nums2) { a, b ->
    a + b
}
.collect { println(it) } // 11, 22, 33이 출력됩니다.

예를 들어 nums1nums2 Flowcombine하여 새로운 Flow를 만든다. 이 때, 각 Flow에서 발행한 데이터를 더하여 새로운 Flow를 만든다. 이 연산자는 각 Flow가 새로운 데이터를 발행할 때마다 실행된다.

다음 예는 Android Compose에서의 combine의 예제이다.

    val uiState = combine(
        _homeBanners, _nickName
    ) { homeBannerItems, nickName ->
        HomeUiState(
            homeBanners = homeBanners,
            nickName = nickName
        )
    }.stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5000),
        initialValue = HomeUiState()
    )

예제에서는 _homeBanners_nickName이라는 Flow를 수집하고 이를 합쳐서 uiState라는 새로운 Flow를 구성한다.

profile
강단있는 개발자가 되기위하여

0개의 댓글