[TIL] ๐ผ24/04/18๐ผ#Kotlin Flow
Kotlin Flow
๐๋์๋๋ ์ง๋ ๋ธ๋ก๊ทธ ํฌ์คํ
๐์ฐธ๊ณ ์๋ฃ
Flow
- used to represent the stream of values that are being computed asynchronously
- Flows are cold
- similar to sequences
- code inside a flow builder does not run until the flow is collected
- Flow starts afresh every time it is collected
- flow collection can be cancelled when the flow is suspended in a cancellable suspending function (ex.
delay
)
- applied to an upstream flow -> returns a downstream flow
- runs quickly
- returns **definition of a new transformed flow
- intermediate flow operators are cold
- Transform operator
- Size-limiting operators(ex.
take
)
- cancel the execution of the flow when reached size limit
(coroutine cancellation is always performed by throwing an exception)
Terminal flow operators
- suspending functions that starts a collection of the flow
collect
- conversion to collections (ex.
toList
, toSet
)
- get value (ex.
first
, single
)
- reducing a flow value (ex.
reduce
, fold
)
Flows are sequential
Flow context
- collection of flow always happens in the context of the calling coroutine
= context preservation
- wrong way to switch context of the flow
- flow builder์ context preservation ์์ฑ ๋๋ฌธ์ ๋ค๋ฅธ context์์ ๊ฐ์ emitํ ์ ์์!
fun simple(): Flow<Int> = flow {
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100)
emit(i)
}
}
}
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
- right way to switch context of the flow =
flowOn
ํจ์ ์ฌ์ฉํ๊ธฐ
- flow works & emits in the background thread
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100)
log("Emitting $i")
emit(i)
}
}.flowOn(Dispatchers.Default)
Buffering
- use
buffer
operator on flow
-> run emmiting code of the flow concurrently with collecting code
- Conflation
- use a flow may not be necessary to process each value, instead, only the most recent ones
conflate
operator is used to drop emmitted intermediate value
- Processing the latest value
xxxLast
operators are used to cancel code in block on new value
Composing multiple flows
- Zip
- combine corresponding values of two flows
- Combine
- produce result at each emission from either flows
Flattening flows
- flattening a flow of flows(
Flow<Flow<T>>
)
- flatMapConcat, flattenConcat
- wait for inner flow to complete before starting to collect the next inner flow
- flatMapMerge, flattenMerge
- concurrently collect all the incoming flows -> merge values into a single flow
- flatMapLatest
- collection of the previous flow is cancelled as new flow is emitted
Flow exceptions
- collector can use
try/catch
block
-> any exception happening is caught
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
catch
operator
- preserves exception transparency
- encapsulates exception handling
- do not need
try/catch
block anymore
- catches only upstream exceptions
fun main() = runBlocking<Unit> {
simple()
.catch { e -> println("Caught $e") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
simple()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") } / "Caught ..." message printed
.collect()
Flow completion
- execute action when flow collection completed
- collector can use
try/finally
block
onCompletion
operator
- nullable
Throwable
parameter of lambda can be used
- does not handle exception unlike
catch
-> the exception still flows downstream
simple()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
Launching Flow
- need terminal operator to collect the launch the flow
launchIn
terminal operator
- launch collection of flow in a separate coroutine
- must specify a CoroutineScope
- ex. lifetime of Entity -> flow cancelled if entity terminates
- returns a
Job
cancellable
operator to check for cancellation