StateFlow, SharedFlow, CallbackFlow 비교

강현석·2022년 11월 21일
1

article

목록 보기
2/9

본 내용은 학습을 위해 Comparing StateFlow, SharedFlow, and CallbackFlow 을 보고 입맛대로 정리한 글입니다.


개념

StateFlow

  • collect를 시작할 때마다 항상 마지막에 emit한 값을 얻고 싶은 경우
  • collect를 중단하지 않음

SharedFlow

  • collect를 시작할 때, 최근에 collect된 emit한 값을 얻고 싶은 경우
  • collect를 중단하지 않음

CallbackFlow

  • collect를 시작하고 어느 시점에 중지하고 싶은 경우
  • 가장 최근에 collect된 emit한 값만 collect

Flow 초기화

StateFlow

val stateFlow = MutableStateFlow(0)

fun emitStateData(value : Int) {
    CoroutineScope(Dispatchers.IO).launch {
        println("emitted $value")
        stateFlow.emit(value)
    }
}

fun collectStateData() = CoroutineScope(Dispatchers.Default).launch {
    stateFlow.collect {
        println("collected $it")
    }
}
  • 초기값 설정을 요구함
  • emit되기 전에 collect가 일어나면, 초기값이 emit됨

SharedFlow

val sharedFlow = MutableSharedFlow<Int>()

fun emitSharedData(value : Int) {
    CoroutineScope(Dispatchers.IO).launch {
        println("emitted $value")
        sharedFlow.emit(value)
    }
}

fun collectSharedData() = CoroutineScope(Dispatchers.Default).launch {
    sharedFlow.collect {
        println("collected $it")
    }
}
  • 초기값 설정을 요구하지 않음
  • 버퍼 크기를 설정하지 않으면, 기본값은 0

CallbackFlow

var sendData: (data: Int) -> Unit = { }
var closeChannel: () -> Unit = { }

val callbackFlow = callbackFlow {
    sendData = { data -> 
        println("callback send $data")
        trySend(data) 
    }
    closeChannel = { close() }
    awaitClose {
        sendData = {}
        closeChannel = {}
        println("Close CallbackFlow")
    }
}

fun collectCallbackData() = CoroutineScope(Dispatchers.Default).launch {
    callbackFlow.collect {
        println("callback collect $it")
    }
}

collection 전에 emit

이해를 돕기 위해, 이후 모든 예제 코드 중 delay를 개행(줄바꿈)으로 대체

StateFlow

fun main(): Unit = runBlocking {

    println("Emit 1 before collect")
    emitStateData(1)

    println("Collect started")
    val job1 = collectStateData()

}

출력 결과

Emit 1 before collect
emitted 1

Collect started
collected 1

  • emit된 값은 저장됨
  • collect시 저장된 값 emit

SharedFlow

fun main(): Unit = runBlocking {

    println("Emit 1 before collect")
    emitSharedData(1)

	println("Collect started")
    val job1 = collectSharedData()

}

출력 결과

Emit 1 before collect
emitted 1

Collect started

  • emit된 값은 저장되지 않음
  • collect시 어떤 값도 emit되지 않음

CallbackFlow

fun main(): Unit = runBlocking {

    println("Emit 1 before collect")
    sendData(1)
    
    println("Collect started")
    val job1 = collectCallbackData()

}

출력 결과

Emit 1 before collect

Collect started

  • emit된 값을 얻을 수 없음
    • collection이 시작되기 전에 callbackFlow가 비활성화 되었기 때문
    • 즉, callbackFlow 람다는 실행되지 않음

요약

  • StateFlow는 collection이 시작할 때 마지막 값을 collect
  • SharedFlow는 collection이 시작할 때 마지막 값을 collect하지 않음
  • CallbackFlow는 collection 전에 emit이 시작하는 것을 허용하지 않음

collection 이후에 emit

  • 모든 경우에 emit된 값이 collect됨

collection 이후에 이전과 같은 값 emit

StateFlow

fun main(): Unit = runBlocking {

    println("Collect started")
    val job1 = collectStateData()

	println("Emit 2 after collect")
    emitStateData(2)

    println("Emit same value 2 again")
    emitStateData(2)

}

출력 결과

Collect started
collected 0

Emit 2 after collect
emitted 2
collected 2

Emit same value 2 again
emitted 2

  • 동일한 값이 두 번 이상 연속으로 emit되면, 필터링되어 collect되지 않음
    • 단, 처음에 초기값과 동일한 값을 emit하면, 필터링되지 않고 collect

SharedFlow와 CallbackFlow

// SharedFlow
fun main(): Unit = runBlocking {

    println("Collect started")
    val job1 = collectSharedData()

    println("Emit 2 after collect")
    emitSharedData(2)

    println("Emit same value 2 again")
    emitSharedData(2)

}

// CallbackFlow
fun main(): Unit = runBlocking {

    println("Collect started")
    val job1 = collectCallbackData()

    println("Emit 2 after collect")
    sendData(2)

    println("Emit same value 2 again")
    sendData(2)

}

출력 결과

Collect started

Emit 2 after collect
emitted 2
collected 2

Emit same value 2 again
emitted 2
collected 2

  • 이전 값과 상관 없이 emit된 모든 값을 collect

요약

  • StateFlow는 이전과 동일한 값이 collect되지 않도록 필터링함
  • SharedFlow, CallbackFlow는 항상 emit된 모든 값을 collect

collection-emit 반복

  • collect 시작 -> emit -> 새 collect 시작 -> emit 하면 어떻게 될까?

StateFlow

fun main(): Unit = runBlocking {

    println("Collect started")
    val job1 = collectStateData()

    println("Emit a new value 3")
    emitStateData(3)

    println("Collect again")
    val job2 = collectStateData()

    println("Emit 3 after collect again")
    emitStateData(3)

}

출력 결과

Collect started
collected 0

Emit a new value 3
emitted 3
collected 3

Collect again
collected 3

Emit 3 after collect again
emitted 3

  • 첫 번째 emit(녹색)은 첫 번째와 두 번째 collection에서 collect
  • 두 번째 emit(빨간색)은 이전에 emit된 값과 동일하므로 필터링됨

만약 두 번째 값이 다르다면?

fun main(): Unit = runBlocking {

    println("Collect started")
    val job1 = collectStateData()

    println("Emit a new value 3")
    emitStateData(3)

    println("Collect again")
    val job2 = collectStateData()

    println("Emit 4 after collect again")
    emitStateData(4)

}

출력 결과

Collect started
collected 0

Emit a new value 3
emitted 3
collected 3

Collect again
collected 3

Emit 4 after collect again
emitted 4
collected 4
collected 4

SharedFlow

fun main(): Unit = runBlocking {

    println("Collect started")
    val job1 = collectSharedData()

    println("Emit a new value 3")
    emitSharedData(3)

    println("Collect again")
    val job2 = collectSharedData()

    println("Emit 3 after collect again")
    emitSharedData(3)

}

출력 결과

Collect started

Emit a new value 3
emitted 3
collected 3

Collect again

Emit 3 after collect again
emitted 3
collected 3
collected 3

  • emit된 값은 collect 후에 시작됨

StateFlow와의 차이점

  • collect 전에 emit된 값은 collect 하지 않음
  • 이전에 emit된 값과 동일한 값이 emit되어도 collect

CallbackFlow

fun main(): Unit = runBlocking {

    println("Collect started")
    val job1 = collectCallbackData()

    println("Emit a new value 3")
    sendData(3)

    println("Collect again")
    val job2 = collectCallbackData()

    println("Emit 3 after collect again")
    sendData(3)

}

출력 결과

Collect started

Emit a new value 3
callback send 3
callback collect 3

Collect again

Emit 3 after collect again
callback send 3
callback collect 3

  • 각 collecter 별로 collect
  • 항상 하나의 collection이 활성화

요약

  • StateFlow, SharedFlow는 시작된 collection 수에 따라, 각 emit된 값을 collect
  • CallbackFlow는 시작된 collection 수와 관련 없이, 하나만 활성화

취소 처리

StateFlow와 SharedFlow

  • CoroutineScope에서 return되는 job 취소
// StateFlow
fun main(): Unit = runBlocking {

    println("Collect started")
    val job1 = collectStateData()

    println("Collect again")
    val job2 = collectStateData()

    println("Emit 4 after collect again")
    emitStateData(4)

    job1.cancel()
    println("Cancel Job 1")

    println("Emit 5 after cancel job1")
    emitStateData(5)

    job2.cancel()
    println("Cancel Job 2")

    println("Emit 6 after cancel job1")
    emitStateData(6)

}

//SharedFlow
fun main(): Unit = runBlocking {

    println("Collect started")
    val job1 = collectSharedData()

    println("Collect again")
    val job2 = collectSharedData()

    println("Emit 4 after collect again")
    emitSharedData(4)

    job1.cancel()
    println("Cancel Job 1")

    println("Emit 5 after cancel job1")
    emitSharedData(5)

    job2.cancel()
    println("Cancel Job 2")

    println("Emit 6 after cancel job2")
    emitSharedData(6)

}

출력 결과

Collect started
collected 0          <-- StateFlow

Collect again
collected 0          <-- StateFlow

Emit 4 after collect again
emitted 4
collected 4
collected 4

Cancel Job 1

Emit 5 after cancel job1
emitted 5
collected 5

Cancel Job 2

Emit 6 after cancel job2
emitted 6

  • 취소는 각 코루틴 Job에 개별적으로 수행됨

CallbackFlow

fun main(): Unit = runBlocking {

    println("Collect started")
    val job1 = collectCallbackData()

    println("Collect again")
    val job2 = collectCallbackData()

    println("Emit 4 after collect again")
    sendData(4)

    println("Cancel Job 1")
    job1.cancel()

    println("Emit 5 after cancel job1")
    sendData(5)

    job2.cancel()
    println("Cancel Job 2")

    println("Emit 6 after cancel job2")
    sendData(6)

    println("Close Channel")
    closeChannel()

}

출력 결과

Collect started

Collect again

Emit 4 after collect again
callback send 4
callback collect 4

Cancel Job 1
Close CallbackFlow

Emit 5 after cancel job1
Close CallbackFlow

Cancel Job 2

Emit 6 after cancel job2

Close Channel

  • single collection이 중지됨
  • collect, emit 불가
  • 작업이 취소될 때마다 awaitClose {...} 만 호출됨 (빨간색 사각형)
  • job을 stop하기 전에 close()를 호출하여 채널을 닫으면, 두 코루틴에 awaitClose {...}가 실행됨 (빨간색 사각형)

요약

  • StateFlow와 SharedFlow는 각 코루틴들을 개별적으로 취소 처리를 해줘야함
  • CallbackFlow는 CoroutineScope 취소가 트리거되면, collect 또는 emit할 수 없음
    • closeChannel 를 사용하여 취소를 수행하면, 모든 것이 자동으로 취소되고 awaitClose {...} 를 호출함
    • close()를 직접 호출하여 callbackFlow 람다 내에서 자체 취소를 수행할 수 있음

전체 요약

profile
볼링을 좋아하는 안드로이드 개발자

1개의 댓글

comment-user-thumbnail
2022년 12월 15일

아주 좋은 비교 글 감사합니다!!!

답글 달기