[TIL] ๐ŸŒผ24/04/18๐ŸŒผ#Kotlin Flow

0

TIL

๋ชฉ๋ก ๋ณด๊ธฐ
85/104
post-thumbnail

[TIL] ๐ŸŒผ24/04/18๐ŸŒผ#Kotlin Flow

Kotlin Flow

๐Ÿ“Œ๋„์›€๋˜๋Š” ์ง€๋‚œ ๋ธ”๋กœ๊ทธ ํฌ์ŠคํŒ…

๐Ÿ“Œ์ฐธ๊ณ ์ž๋ฃŒ

Flow

  • used to represent the stream of values that are being computed asynchronously
  • Flows are cold
    • similar to sequences
    • code inside a flow builder does not run until the flow is collected
  • Flow starts afresh every time it is collected
  • flow collection can be cancelled when the flow is suspended in a cancellable suspending function (ex. delay)

Intermediate flow operators

  • applied to an upstream flow -> returns a downstream flow
    • runs quickly
    • returns **definition of a new transformed flow
  • intermediate flow operators are cold
  • Transform operator
  • Size-limiting operators(ex.take)
    • cancel the execution of the flow when reached size limit
      (coroutine cancellation is always performed by throwing an exception)

Terminal flow operators

  • suspending functions that starts a collection of the flow
    • collect
    • conversion to collections (ex. toList, toSet)
    • get value (ex. first, single)
    • reducing a flow value (ex. reduce, fold)

Flows are sequential

Flow context

  • collection of flow always happens in the context of the calling coroutine
    = context preservation
  • wrong way to switch context of the flow
    • flow builder์˜ context preservation ์†์„ฑ ๋•Œ๋ฌธ์— ๋‹ค๋ฅธ context์—์„œ ๊ฐ’์„ emitํ•  ์ˆ˜ ์—†์Œ!
fun simple(): Flow<Int> = flow {
    // The WRONG way to change context for CPU-consuming code in flow builder
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value
        }
    }
}
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
		Please refer to 'flow' documentation or use 'flowOn' instead
	at ...
  • right way to switch context of the flow = flowOn ํ•จ์ˆ˜ ์‚ฌ์šฉํ•˜๊ธฐ
    • flow works & emits in the background thread
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code i

Buffering

  • use buffer operator on flow
    -> run emmiting code of the flow concurrently with collecting code
  • Conflation
    • use a flow may not be necessary to process each value, instead, only the most recent ones
    • conflate operator is used to drop emmitted intermediate value
  • Processing the latest value
    • xxxLast operators are used to cancel code in block on new value

Composing multiple flows

  • Zip
    • combine corresponding values of two flows
  • Combine
    • produce result at each emission from either flows

Flattening flows

  • flattening a flow of flows(Flow<Flow<T>>)
  • flatMapConcat, flattenConcat
    • wait for inner flow to complete before starting to collect the next inner flow
  • flatMapMerge, flattenMerge
    • concurrently collect all the incoming flows -> merge values into a single flow
  • flatMapLatest
    • collection of the previous flow is cancelled as new flow is emitted

Flow exceptions

  • collector can use try/catch block
    -> any exception happening is caught
fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}            
  • catch operator
    • preserves exception transparency
    • encapsulates exception handling
  • do not need try/catch block anymore
  • catches only upstream exceptions
fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") } // does not catch downstream exceptions
        .collect { value ->
            check(value <= 1) { "Collected $value" } // "Caught ..." message not printed                 
            println(value) 
        }
}        
simple()
    .onEach { value ->
        check(value <= 1) { "Collected $value" }                 
        println(value) 
    }
    .catch { e -> println("Caught $e") } / "Caught ..." message printed         
    .collect()

Flow completion

  • execute action when flow collection completed
  • collector can use try/finally block
  • onCompletion operator
    • nullable Throwable parameter of lambda can be used
    • does not handle exception unlike catch
      -> the exception still flows downstream
    simple()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }

Launching Flow

  • need terminal operator to collect the launch the flow
  • launchIn terminal operator
    • launch collection of flow in a separate coroutine
    • must specify a CoroutineScope
      • ex. lifetime of Entity -> flow cancelled if entity terminates
    • returns a Job
  • cancellable operator to check for cancellation

0๊ฐœ์˜ ๋Œ“๊ธ€