#Flow : 코틀린 언어에서 제공해주는 비동기 데이터 스트림을 전달 할 수 있는 API입니다. 비동기 데이터 스트림이란, / [1]시간의 흐름에서 여러 데이터 조각들이 / [2]비동기적으로 생성되고 처리되는 / [3]데이터의 연속적인 흐름을 / 의미한다.
즉, 데이터가 일정한 간격 또는 순서 없이 메인 스레드를 차단하지 않은채로 별도의 프로세스 또는 스레드에서 처리 될 수 있음을 도와줄 수 있게 해준다.
#Flow는 hot stream과 cold stream을 동시에 가지고 있어.
-핫 스트림은 데이터를 소비하는 곳에서 소비를 하든 말든 데이터를 생성한다.
-Flow에서는 SharedFlow과 StateFlow가 있다. SharedFlow는 이전에 구독을 하였든 안 하였든 리스너들이 동일한 데이터 이벤트를 받을 수 있게 방출하며 , StateFlow는 항상 최신 상태의 데이터값을 방출해줘서 상태가 변할 때 마다 업데이트 된다.
-콜드 스트림은 데이터를 소비하는 곳에서 소비하기 전까지는 데이터의 흐름을 방출하지 않고, 소비를 하려고 할 때 데이터의 흐름을 방출하게 된다. 그렇다면 지연되고 있는 데이터를 소비하려고 하는지 아닌지 Flow단에서 어떻게 알 수 있을까? 정답은 collect()이다. collect()를 사용하게 되면 Flow에서 데이터를 emit하게 된다. 기본적인 Flow는 모두 콜드 스트림이다.
#비동기 데이터 스트림 특징
-비동기적 처리 : 데이터 스트림은 메인 스레드를 차단하지 않고, 비동기적으로 처리된다. 즉, 데이터가 도착할 때마다 비동기적으로 데이터를 처리 할 수 있다.
-시간적 분산 : 데이터가 한 번 오고 끝나는 것이 아니라 시간의 흐름에 걸쳐서 하나 하나의 조각들로서 데이터 흐름을 구성한다.
-연속성 : 데이터가 단일값이 아니라 연속적인 값들의 흐름으로 나타나진다.
-FlowBuilder란, flow를 만들어주는 역할을 하는 것이다. FlowBuilder는 'emit을 명시적으로 지정해줘야 요소를 방출할 수 있는 빌더'와 '빌더 자체에 emit 함수가 내장되어 있어서 명시적으로 emit을 작성하지 않아도 요소를 방출 할 수 있는 빌더' 이렇데 두가지의 종류로 나뉜다.
-'flow'는 명시적으로 emit을 작성해야지 flow 요소들을 방출할 수 있다.
-'flowOf' , 'asFlow'는 자체적으로 emit 함수를 내장하고 있다. 따라서 요소를 방출하는 것을 코드상에서 명시적으로 구현하지 않아도 된다.
-flowOf,asFlow 함수를 정의할 때 사용되는 flow{} 블록이 있는데 , 해당 flow{}는 FlowBuilder에서 사용되는 flow{} 블록과는 다른 개념이다. FlowBuilder인 flow{}는 SafeFlow이고 flowOf,asFlow 함수에서 사용되는 flow{}는 unsafeFlow이다.
-위 내용에 대한 설명은 각각의 FlowBuilder 코드를 보면서 아래에서 설명하겠다.
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
-위 코드는 flow{}를 정의하는 코드이다. 코드상에 보이는 것처럼 flow의 후행 람다식 코드 블락은 suspendScope이기 때문에 suspend function을 작성 할 수 있다.
-SafeFlow는 개발자가 안전하게 Flow를 생성할 수 있도록 설계된 고수준의 API이다.
-flow{} 자체는 public fun이기 때문에 flow{}를 사용해서 일반함수를 정의하는데에 사용하는 것은 가능하다.(하단의 예시코드 참고)
fun flowSomething() : Flow<Int> = flow {// Flow<Flow가 방출(emit)할 값의 타입> , 방출이란 스트림의 값을 A로 보내는 흘려보내는 것을 말한다.
repeat(10) {
emit(Random.nextInt(0 , 500))
delay(10L)
}
}
fun main() = runBlocking {
// Flow의 요소들을 사용하기 위해서는 종단 연산자를 사용해서 Flow의 요소들을 사용 할 수 있다.
// Flow는 콜드스트림이기 때문에 요청단에서 종단연산자를 호출해야 값을 방출한다.
// 콜드스트림 : 용청이 있는 경우에 1:1로 값을 전달한다.
flowSomething()
.collect {value ->
println(value)
}
}
public fun <T> flowOf(vararg elements: T): Flow<T> = "[A]"flow {
for (element in elements) {
emit(element)
}
}
-위의 코드는 flowOf를 정의한 코드이다. 보이는 것처럼 함수 자체에서 emit을 사용하고 있다. 또한 elementes가 제너릭 타입으로 선언되었기 때문에 모든 타입을 Flow의 요소로 만들 수 있다.
-그러나 주의할 점은 위에서 사용되는 flow코드 블락이 unsafeFlow라는 점인데 아래 코드가 위에서 사용된 [A]flow가 정의된 함수이다. 이는 unsafeFlow로서 내부 API용으로 공개되었으며, 사용자가 직접 사용하는 것이아니라 특정 사례에서만 사용 할 수 있는 것이며 , 해당 함수가 선언된 모듈내에서만 사용 할 수 있다.
@PublishedApi
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}
fun main() = runBlocking {
println("flowOf")
flowOf(1, 2, "3", 4, 5).collect { value ->
// emit이 flowOf가 정의될 때 사용되었으므로 emit 코드 작성 할 필요 없음.
// flowOf()의 parameter에는
println(value)
}
println("flow")
}
public fun <T> Iterable<T>.asFlow(): Flow<T> = "[B]"flow {
forEach { value ->
emit(value)
}
}
-위 코드는 asFlow를 정의한 코드이다. 마찬가지로 [B]에서 사용된 flow는 unsafeFlow이며, emit 또한 내장되어 있어 emit을 별도로 작성할 필요가 없다.
-asFlow()는 컬렉션||시퀀스를 사용하여 Flow를 만들 수 있다.
fun main() = runBlocking {
println("asFlow")
listOf(1, 2, 3, 4, 5).asFlow().collect { value ->
println(value)
}
(6..10).asFlow().collect {
println(it)
}
}
-Flow 중간연산자 : Flow의 요소들의 값이 방출될 때 각각의 요소들에 대하여 가공 할 수 있게 도와주는 연산자.
-Flow의 중간연산자는 다른 중간 연산자들과 함께 사용하고 마지막에는 종단연산자를 사용해서 Flow 요소들을 이용 할 수 있다 : 즉 , 중간 연산자는 Return 값이 존재하지 않는다.
-transform 연산자를 제외하고는 중단연산자 내부에 emit 함수가 내장되어 있어서 별도로 emit을 하지 않아도 된다. 그리고 중단연산자를 거친 Flow는 항상 새로운 Flow 요소를 만든다.
-Flow + Map : Flow가 방출한 데이터 스트림을 람다식에서 각각의 개별 요소에 대한 처리를 한 후 새로운 Flow의 요소로 변환 시킨다. : Flow 요소를 하나 가져와서 하나씩 가공해
-이때, 이런 변환 작업은 순차적으로 수행이 되고 , 각각의 개별 요소에 대해서 독립적으로 발생한다.
fun flowSomething() : Flow<Int> = flow {
repeat(10) {
emit(Random.nextInt(0 , 500))
delay(10L)
}
}
fun main() = runBlocking {
flowSomething().map {
"$it $it"
}.collect { value ->
println(value)
}
}
-Flow + filter : filter는 조건에 맞는 요소만 남기게 된다. + filter 뒤에 들어가는 조건을 '술어'라고도 부르기도 하고 , predicate라고 부르기도 한다.
fun main() = runBlocking {
(1..20).asFlow().filter {
(it % 2) == 0
}.collect {
println(it)
}
}
-Flow + filterNot : Flow + filter{}의 정반대 개념으로서 {}에 들어간 술어와 일치하지 않는 Flow의 요소들만 새로운 Flow로 반환한다.
-술어(predicate)를 수정해서 새로운 Flow를 반환 할 수 있지만 , 정반대 개념이라면 간편하게 filterNot{}을 통해서 새 Flow를 반환 할 수 있다.
fun main() = runBlocking {
(1..20).asFlow().filterNot {
(it % 2) == 0
}.collect {
println(it)
}
}
-Flow + transform : Flow의 요소들을 복잡한 전환 작업을 통해서 새로운 Flow 요소로 변환하여 방출 할 수 있다. (Flow 표준 연산자들 보다 더 확장적인 연산을 가능하게 한다.)
-Flow 요소를 사용자 정의 조건문등의 여러 로직을 적용해 자유롭게 변환 가능하며 여러 값을 emit() 할 수 있다. 여러값을 emit()할때는 먼저 emit된 값부터 종단연산자로 들어간다.
-주의! 다른 중간연산자들과 다르게 별도로 emit을 해줘야 한다.
suspend fun someCalc(i : Int) : Int {
delay(100L)
return i * 2
}
fun main() = runBlocking {
(1..20).asFlow().transform {
emit(it)
emit(someCalc(it))
}.collect {
println(it)
}
}
-Flow + take : Flow 요소의 일부만 취할 수 있다. (기준은 처음부터 잡고 특정 개수의 'Flow 요소'만 취한다.) 즉, 처음부터 방출되는 순서로 take()의 parameter의 개수만큼만 새로운 Flow 요소로 반환한다.
suspend fun someCalc5(i: Int): Int {
delay(100L)
return i * 2
}
fun main() = runBlocking {
(1..20).asFlow().transform {
emit(it)
emit(someCalc5(it))
}.take(7)
.collect {
println(it)
}
}
-Flow + takeWhile : takeWhile의 코드 블락 내부에 있는 조건을 만족하는 동안에만 emit() 할 수 있다.
suspend fun someCalc6(i: Int): Int {
delay(100L)
return i * 2
}
fun main() = runBlocking {
(1..20).asFlow().transform {
emit(it)
emit(someCalc6(it))
}.takeWhile {
it < 15
}.collect{
println(it)
}
}
-Flow + drop : Flow 요소의 처음 몇개의 요소를 버린다. <-> take와 반대 : take는 처음 몇개의 요소만 방출
suspend fun someCalc7(i: Int): Int {
delay(100L)
return i * 2
}
fun main() = runBlocking {
(1..20).asFlow().transform {
emit(it)
emit(someCalc7(it))
}.drop(5)
.collect {
println(it)
}
}
-Flow + dropWhile : dropWhile의 코드 블락 내부에 있는 조건에 맞지 않을 때까지 Flow 요소들을 다 버리고 , 조건이 처음으로 맞지 않는 순간 부터 해당 요소와 나머지 요소들을 모두 방출
-이때 중요한 점은, 조건이 처음으로 맞지 않아서 방출이 이루어진 순간 이후로는 조건에 맞아도 값의 방출이 진행된다.
-예시코드(1)
suspend fun someCalc8(i: Int): Int {
delay(100L)
return i * 2
}
fun main() = runBlocking {
(1..20).asFlow().transform {
emit(it)
emit(someCalc8(it))
}.dropWhile {
it < 15
}.collect{
println(it)
}
}
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
action(value)
return@transform emit(value)
}
-onEach는 Flow의 각 요소가 방출 될때마다 onEach{} 내부에 정의된 람다 함수가 호출되어, 해당 요소에 대한 특정 작업을 수행한다.
-Flow에서는 onEach를 통해서 어떠한 이벤트를 처리 할 수 있다. : Listener를 설정해서 콜백을 등록하는 것보다 훨씬 간편한다.
-그러나 onEach에는 이벤트 처리를 하기에는 치명적인 단점이 있는데 , collect가 Flow의 스트림이 종료 될때까지 기다리기 때문에 , 이벤트 감시하다가 메인스레드에서 코드 진행이 멈추게 된다. 따라서 collect{}는 이벤트를 위해서는 사용 할 수 없다. 자세한건 맨 아래 FlowLaunching 파트에 있다.
fun events() : Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking {
events()
.onEach { event -> println("Event : $event") }
.collect {
}
println("Done")
}
-종단 연산자란 방출된 Flow의 요소들을 처리함으로서 Flow를 끝내는 연산자이다. terminal operator라고도 불리며 특정 값 , 컬렉션 등의 결과값을 Return한다.
-종단 연산자는 Flow의 요소를 받아서 1개씩 처리하는 suspend function이다. (Flow 요소를 단 한개씩 처리한다는 것이 아주 중요한 개념인것이야.)
-가장 기본적인 종단 연산자로서 걍 Flow요소가지고 노는거야.
-Flow + reduce : Flow 요소를 누적으로 처리한 후 하나의 Flow 요소로 방출한다.
-paramter로 들어갈 수 있는 값은 두개뿐이야.
-예시코드(1)
fun main() = runBlocking {
val value = (1..10)
.asFlow()
.reduce { a, b ->
a + b
}
println(value)
}
-Flow + fold : 초기값이 있다는 것만 제외하면 reduce 연산자와 동일한 기능을 한다.
fun main() = runBlocking {
val value = (1..10)
.asFlow()
.fold(10) { a , b ->
a + b
}
println(value)
}
-Flow + count : 술어를 만족하는 Flow 요소의 갯수를 세서 알려준다.
fun main() = runBlocking {
val counter = (1..10)
.asFlow()
.count{
(it % 2) == 0
}
println(counter)
}
-Flow + toList : Flow의 요소들을 리스트 형태로 반환한다. 리스트 형태로 변형될 때 [1]Flow의 요소의 순서를 유지한채로 , [2]중복 허용하여 리스트로 반환한다.
-따라서 순서가 중요하고, 중복된 요소들도 모두 포함시키고 싶을 때 사용한다.
fun main() = runBlocking {
val numbersFlow = flowOf(1, 2, 2, 4, 3, 4, 5)
val numbersList = numbersFlow.toList()
println(numbersList)
}
-Flow + toSet : Flow의 요소들을 set 형태로 반환한다. set 형태로 변형될 때 [1]순서는 유지되지 않고 , [2]유일한 값들만 포함시키고 싶을 때 반환한다.
fun main() = runBlocking {
val numbersFlow = flowOf(1, 2, 2, 4, 3, 4, 5)
val numbersList = numbersFlow.toSet()
println(numbersList)
}
-Flow + first : Flow에서 방출된 요소들 중 첫번째 요소만 반환한다. 즉, Flow의 첫번째 요소만 알고 싶을 때 사용한다. 만약 Flow가 요소를 방출하지 않으면 "NoSuchElementException" 에러를 발생시킨다.
fun main() = runBlocking {
val emptyValue = flowOf<Int>()
val firstValue = flowOf(99, 2, 3)
try {
println(emptyValue.first())
} catch (e: NoSuchElementException) {
println("Flow에 요소가 없어서 예외가 발생했습니다: ${e.message}")
}
}
-Flow + single : Flow가 단 하나의 요소만 방출할 것이라고 예상 될 때 사용한다. 만약 하나 초과의 요소를 반환하면 "IllegalArgumentException" 에러를 발생시킨다. 따라서 Flow가 최대 하나의 요소만 방출하는 것이 확실할 때 사용한다.
-Flow + singleOrNull : Flow가 단 하나의 요소만 방출하면 해당 요소를 반환하고 , 아무 요소도 방출하지 않거나 , 두개 이상의 요소를 반환하면 'null'을 반환한다.
따라서 에러를 발생시키지 않고 안전하게 사용하고 싶을 때 사용한다.
fun main() = runBlocking {
val singleYes = flowOf(1)
val singleNo = flowOf(1, 2, 3)
try {
println(singleYes.single())
} catch (e: IllegalArgumentException) {
println("Flow에 요소가 하나 초과 있어어 예외가 발생했습니다: ${e.message}")
}
println(singleYes.singleOrNull())
}
-Flow는 기본적으로는 현재 Coroutine의 Context에서 실행된다. 하지만 Context를 flowOn을 통해서 변경 할 수 있다.
-Flow의 Context를 변경하는 이유는 [1]Flow의 요소 생성과 처리를 다른 Coroutine Context에서 실행하기 위해서 [2]'Flow 방출 스레드(emit()이 있는 스레드)'와 'Flow 종단 스레드' 분리 일렇게 총 두가지 이유다.
-예를들면 , [1]복잡한 데이터 처리작업은 백그라운드 스레드에서 실행하고 싶고, [2]Flow의 요소들을 종단연산자를 통해서 받는 작업은 기존의 CoroutineContext에서 실행하고 싶다면 , flowOn 연산자를 사용해서 Flow emit이 포함된 코드의 Context를 Dispatchers.Default||Dispatchers.IO 같은 백그라운드 스레드로 변경해서 처리하면 된다.
-flowOn은 UpStream에 있는 대상을 어떤 Context에서 실행시킬지 결정한다. DownStream은 종단 연산자가 실행되는 Coroutine의 Context 내부에서 실행된다.
-flowOn의 위치에 따라서 UpStream과 DownStream이 결정이 되고 (상대적인 위치야.)
flowOn을 통해서 Flow의 연산||emit이 실행될 Context를 변경시킬 수 있다 : 만약, flowOn이 두번 연속으로 사용되었다면, 최상단의 flowOn이 우선 순위이다.
-flowOn는 내부적으로 버퍼링 메커니즘이 존재한다. 이는, Flow의 방출 부분과 수집 부분이 서로 다른 Context에서 동시에 실행 될 수 있기 때문이다.
-참고로 Flow의 방출과 수집은 모두 Coroutine에서만 실행 될 수 있다 : val simple3 = simple3()는 객체를 선언하는것에 불과하므로 실제로 simple3()가 방출되고 방출된 simple3()를 수집하는 과정은 Coroutine에서 일어난다.
-flowOn의 버퍼링 메커니즘은 암시적이고 , 명시적으로 버퍼링 메커니즘을 사용하고 싶다면, .buffer()를 사용하면 된다.
fun log3(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun simple3(): Flow<Int> = flow {
for (i in 1..10) {
delay(100L)
log3("값 ${i}를 emit합니다.")
emit(i)
} // Dispatchers.IO의 UpStream -> Dispatchers.IO에서 실행된다.
}
.flowOn(Dispatchers.IO) // 위치
.map {
it * 2 // Dispatchers.Default의 UpStream -> Dispatchers.Default에서 실행된다.
}
.flowOn(Dispatchers.Default) // 위치
fun main() = runBlocking {
simple3()
.collect { // DownStream
value ->
log("${value}를 받음.")
}
}
-기존의 Flow를 사용할 때는 생산측과 수신측이 같은 속도로 움직일 수 없었다. 왜냐하면 emit을 통해서 종단연산자에 전달된 요소 하나에 대한 처리가 완료되야지 다음 요소를 emit 할 수 있는 직렬 구조이기 때문이다.
-따라서 emit측과 종단측의 구조를 병렬적으로 바꾸고 싶었고 방출된 요소를 저장하는 Buffer에 대한 개념이 대두되었다.
-Flow + buffer : 송신측이 더 이상 종단 연산자의 실행 완료를 기다리지 않고 , 방출과 종단 연산자가 병렬적으로 실행된다. 주의 : Flow의 요소 방출이 병렬적으로 일어나는 것이 아니야!
-buffer를 사용하지 않았던 지난 코드들에서 Flow는 요소 하나를 방출하고 방출된 요소 하나가 종단 연산자에서 받아서 처리되면 , 다음 요소를 방출하는 식으로 순차적인 실행 구조를 가졌다.
-하지만, 이는 방출과 수신이 직렬로 실행되서 실행 속도를 늦추는 구조적인 문제를 지니고 있었다. buffer를 사용하게 되면 방출된 요소가 수신 완료 될 때까지 기다리지 않고, 바로 바로 다음 순서의 요소들을 방출하고 , 수신측 또한 Flow의 각 요소들이 독립적으로 처리가 되어 방출과 수신이 병렬로 처리되기 때뮨에 실행 속도를 증가시킬 수 있게 되었다.
-buffer()는 방추된 요소들을 임시로 저장 할 수 있는 버퍼를 만들어서 , 방출된 요소들을 보관하고 수신측에서 임의의 요소를 받을 수 있는 준비가 되면 해당 요소를 넘겨준다.
-아래는 buffer() 적용 전 후 의 코드이다.
fun simple() : Flow<Int> = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking {
val time= measureTimeMillis {
simple().collect { value ->
delay(300)
println(value)
}
}
println("Collected in ${time} ms")
}
fun simple2() : Flow<Int> = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking {
val time= measureTimeMillis {
simple2().buffer()
.collect { value ->
delay(300)
println(value)
}
}
println("Collected in ${time} ms")
}
-Flow + Conflate : 중간의 값을 융합(conflate) 할 수 있다. 내가 처리하지 못하고 있던 사이에 받았던 값들을 다 버린다.
-conflate는 buffer를 상속 받았으므로 , 방출과 수집이 독립적으로 일어난다. 즉, buffer에 기능이 추가 되었다고 생각해도 좋다.
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
-conflate()는 빠른 방출 속도와 느린 소비 속도 사이의 갭차를 해결하는 기능을 한다. 주로 방출의 속도가 소비의 속도 보다 빠를 때 사용한다.
-'Flow가 요소를 방출하는 속도'가 '방출된 요소가 소비되는 속도'보다 빠른 경우에, 처리되지 않은 요소들을 버려버린다. 즉, 종단 연산자가 요소를 처리하는 것이 준비 될 때까지 기다리지 않고, 준비가 안 되어있으면 그냥 해당 요소를 버려버린다. 이로 인해 항상 최신의 값을 소비 할 수 있는 장점이 있으며, 주로 실시간 데이터처리||빠른 데이터 스트림 처리에서 사용한다.
fun simple3() : Flow<Int> = flow {
for (i in 1..3) {
delay(100) // (100) 1 (200) 2 (300) 3 (400) 4 (500) 5 (600)
emit(i)
}
}
fun main() = runBlocking {
val time= measureTimeMillis {
simple3().conflate()
.collect { value ->
delay(300)
println(value)
}
}
println("Collected in ${time} ms")
}
> 1을 받아서 delay(300)동안 기다린 후 1을 처리한다. 이때 1을 처리하는 작업이 완료된 후
들어온 값만 받아서 처리한다. 1을 처리중일때 들어온 2는 버려진다.
-Flow + collectLatest : Flow 요소를 받아서 처리하는 와중에 새로운 값이 오면은 이전에 처리하던 내용들을 다 리셋하고 새로운 값만 처리한다. 그래서 결과적으로는 Flow의 마지막 요소만 처리한다.
-마지막 값이 올 때까지 기다리는 것이 아니라 Flow 요소들을 처리하는 와중에 새로운 Flow 요소가 들어오면 기존에 처리하고 있던 Flow 요소와 관련된 모든 작업을 리셋하는 방식으로 마지막 Flow 요소만 처리하는 것이야.
-collectLatest는 코드 내부적으로 buffer를 사용하고 있으므로 , buffer의 기능을 사용하고 있다.
public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) {
fun simple4(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // (100) 1 (200) 2 (300) 3 (400) 4 (500) 5 (600)
emit(i)
}
}
fun main() = runBlocking {
val time = measureTimeMillis {
simple4().collectLatest { value -> // 1 , 2 // 리셋 // 2 , 3 // 리셋 // 3만 처리
println("값 ${value}를 처리하기 시작합니다.")
delay(300)
println(value)
println("처리를 완료하였습니다.")
}
}
println("Collected in ${time} ms")
}
-두개의 다른 Flow가 방출하는 요소들을 결합 할 수 있다.
-Flow + Zip : 두개의 Flow가 있을 때 양쪽 Flow의 요소가 둘 다 준비가 되었을 때 두 요소를 묶어서 묶어서 새로운 Flow 요소를 만들어 낸다.
fun main() = runBlocking {
val nums = (1..3).asFlow()
val strs = flowOf("일", "이", "삼")
nums.zip(strs) { a, b -> "${a}은(는) $b" }
.collect {
println(it)
}
}
-Flow + combine : 두개의 Flow가 있을 때 양쪽 Flow 중 하나의 Flow에서 새로운 요소가 방출 될때마다 , 다른쪽의 Flow의 최신 요소와 결합하여 새로운 요소를 만들어서 방출한다. 만약, 반대쪽의 Flow가 아직 새로운 요소를 방출하지 않았으면 가장 최근에 방출된 요소를 반대쪽의 Flow의 최신 요소로 인식한다.
-만약 아예 반대쪽 요소가 최초의 방출도 아직 안 되어있다면 , 대기하다 방출되면 combine()이 실행된다.
fun main() = runBlocking<Unit> {// 1 일 // 2 일 // 3 일 // 3 이 // 3 삼
val nums2 = (1..3).asFlow().onEach { delay(100L) }
val strs2 = flowOf("일" , "이" , "삼").onEach { delay(200L) }
nums2.combine(strs2) { a , b -> "${a}은(는) $b"}
.collect {value ->
println(value)
}
}
-Flow Flattening이란, 플로우 평탄화라고하며 한 플로우의 각 요소를 다른 플로우로 변환하는 것을 말한다. ("Flow에서 Flow로 요소를 넘기는 경우"를 말한다.) <-> Flow에서 종단 연산자로 요소를 넘기는 경우
-Flow Flattening : 하나의 Flow가 다른 Flow를 방출하는 것을 의미한다. 주로 사용되는 연산자는 flatMapConcat , flatMapMerge , flatMapLatest 3가지이다.
-즉 , (A)Flow가 방출 한 값을 (B)Flow가 받아서 (C)Flow로 방출하면 (최종목적지)임의의 종단 연산자에서 해당 Flow 요소를 처리하는 방법론에 따라서 3가지로 분류된다.
-Flow에서 Flow로 요소를 넘길때는 [flatMapConcat : 받는 Flow가 준비가 되야지 넘길수 있거나][flatMapMerge : 준비가 안되도 넘길수 있거나] [flatMapLatest : 최신의 Flow가 넘겨질때마다 기존의 Flow는 취소 시킬 수 있거나] 이 세 가지의 종류
-Flow + flatMapConcat : 첫번째 요소에 대해서 Flattening 하고 나서 두번째 요소를 합친다. 즉, '순서대로 합친다'라고 생각하면 된다. (순차적인 처리가 필요할 때 사용한다.)
-FlowA.flatMapConcat { FlowB } : FlowA가 방출한 요소(들)을 FlowB에서 받아서 가공해서 새로운 Flow 요소를 순차적으로 방출한다. : 한 Flow의 모든 요소가 처리 될때까지 다음 Flow로 넘어가지 않는다.
-즉, FlowA와 FlowB를 순차적으로 연결해서 단일 Flow를 만드는 연산자이다. -> flatMapConcat의 의미에 영향을 받는 것은 FlowB이다.
-꼭 두개의 Flow에만 적용되는 것은 아니며 두개 이상의 Flow들을 순차적으로 연결하여 단일 Flow로 만드는 연산자이다.
-각각의 Flow에 대해서 별도의 비동기 작업을 수행한 후 그 결과를 단일 Flow로 모아서 방출하고 싶을 때 사용하면 된다. 예를 들면 FlowA의 요소들을 처리하는 FlowB를 만들고.. 이러한 결과를 순차적으로 연결하여 최종 결과값을 Flow의 요소로 반환하고자 할 때 사용하면 된다.
fun requestFlow(i : Int) : Flow<String> = flow {
emit("$i : First")
delay(500)
emit("$i : Second")
}
@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
.flatMapConcat {
requestFlow(it)
}
.collect {
println("$it at ${System.currentTimeMillis() - startTime} ms from start")
}
}
-Flow + flatMapMerge : 첫 요소의 Flattening을 시작하고 바로 이어서 다음 요소의 Flattening을 시작한다.
-flatMapMerge 또한 flatMapConcat과 동일하게 각각의 Flow 요소에 대해서 새로운 FlowC를 만들어서 방출한다. 하지만 , 순차적으로 방출하는 것이 아니라 병렬적으로 Flow를 방출한다. : .flatMapMerge의 의미에 영향을 받는것은 FlowB이다.
-즉, FlowB가 처리하는 작업이 병렬적으로 진행되어 다른 요소들의 처리를 기다리지 않는다. (이런 점이 flatMapConcat과의 차이점이다.)
-FlowA.flatMapMerge { FlowB } : FlowA가 방출한 값을 FlowB에서 새로운 FlowC로 반환하는데, FlowC가 방출한 요소가 종단 연산자에서 처리 완료 될 때까지 기다리지 않고 , 바로 이어서 FlowC는 새로운 요소를 병렬적으로 방출한다. : 들어오는 족족 FlowB에서 처리해서 FlowC로 방출.(병렬적)
fun requestFlow2(i : Int) : Flow<String> = flow {
emit("$i : First")
delay(500)
emit("$i : Second")
}
@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
.flatMapMerge {
requestFlow2(it)
}
.collect {
println("$it at ${System.currentTimeMillis() - startTime} ms from start")
}
}
-Flow + flatMapLatest : 다음 요소의 플레트닝을 시작하며 이전에 진행 중이던 플레트닝을 취소한다.
-FlowA에서 FlowB로 요소를 넘길 때 기존에 넘겨진 FlowA의 요소가 있다면 [FlowB가 FlowA의 요소를 받아서 처리하는 작업을 취소 시키고] , [FlowA의 새로운 요소만 받아서 FlowB에서 처리한다.]
fun requestFlow3(i : Int) : Flow<String> = flow {
emit("$i : First")
delay(500)
emit("$i : Second")
}
@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
.flatMapLatest {
requestFlow3(it)
}
.collect {
println("$it at ${System.currentTimeMillis() - startTime} ms from start")
}
}
-Flow는 어디에서 예외가 발생하든 상관없이 모든 예외는 처리가 가능하다.
-Flow의 Exception의 투명성 : Flow 빌더 코드 블록 내에서 예외를 처리하는 것은 예외 투명성을 어기는 것이다. 왜냐하면, Flow 내부에서 예외를 처리해버리면 , Flow 빌더 코드 블록 밖에서는 예외가 발생 하였는지 알 수 없기 때문이다. 따라서 Flow에서는 catch 연산자를 사용하여 예외를 처리하는 것을 권한다.
fun simple() : Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
try {
simple().collect {value ->
println(value)
check(value <= 1) { "Collected $value"}
}
} catch (e : Throwable) {
println("Caught $e")
}
}
fun simple2(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
.map {
check(it <= 1) { "Crashed on $it" }
"string $it"
}
fun main() = runBlocking<Unit> {
try {
simple2().collect { value -> println(value) }
} catch (e : Throwable) {
println("Caught $e")
}
}
-catch 블록에서 [어떤 예외가 발생 하였는지 받을 수 있고, 그 예외를 처리하는 것이다.]
-catch 블록에서 [예외를 새로운 데이터로 만들어 emit을 하거나] , [다시 예외를 던지거나] , [로그를 남길 수 있다.]
fun simple3(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
.map {
check(it <= 1) { "Crashed on $it" }
"string $it"
}
fun main() = runBlocking<Unit> {
simple3()
.catch { e -> emit("Caught $e") }
.collect { value -> println(value) }
}
-Flow의 catch 투명성이란 , "catch 연산자는 UpStream(catch 연산자를 사용하기 전의 코드)에만 영향을 미치고 DownStream(catch 연산자를 사용하고 난 후의 코드)에는 영향을 미치지 않는다."는 것이다.
fun simple4(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple4() // catch가 영향을 미칠 수 있어.
.catch { e -> print("Caught $e") } // UpStream에 영향(o) DownStream에 영향(x)
// 여기 아래로는 catch가 영향을 미치지 않는다.
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
-[예외가 발생하거나][Flow의 동작이 끝난] 다음에 즉 , Flow가 완료된 이후에 동작을 해야 할 필요가 있을 수 있다. 즉, Flow가 모든 요소를 방출하고 난 후 필요한 추가 작업(ex:리소스 해제)을 하는 방법이 있다.
-try~finally 문의 finally 블록에 최종적으로 수행할 코드를 작성해주면 된다.
fun simple() : Flow<Int> = (1..3).asFlow()
fun main() = runBlocking {
try {
simple().collect{ value -> println(value)}
} finally {
println("Done")
}
}
-onCompletion { 이 onCompletion 코드 블록 내부에 완료 된 후 실행 될 코드 작성 } 을 사용하여 선언적으로 완료 처리를 할 수 있다.
-.onCompletion{}과 함께 .catch{}를 사용하면 둘 다 cause를 받아오는데 여기서 cause는 오류메시지를 나타낸다. 이런식으로 둘다 함께 사용하면 에러메시지에 대한 내용도 간편하게 알 수 있다.(예시코드2 참고)
-onCompletion의 장점은 종료 처리를 할 때 예외가 발생되었는지에 대한 여부를 알 수 있다. try~catch~finally 사용하면 catch에서는 Flow의 예외를 알 수 있지만 , finally에서는 알 수 없었던 단점 해결.
fun simple2() : Flow<Int> = (1..3).asFlow()
fun main() = runBlocking {
simple2()
.map {
if (it > 2) {
throw IllegalStateException()
}
it + 1
}
.catch { e-> emit(-99) }
.onCompletion { println("Done") }
.collect { value -> println(value) }
}
// onCompletion{}과 catch{}의 코드 블록에서 받아오는 it은 Flow 처리 중 발생한 예외를 참조한다.
fun simple3() : Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking {
simple3()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") else { println("Flow completed.") } }
.catch { cause -> println(cause) }
.collect { value -> println(value) }
}
-Flow + launchIn : launchIn을 이용하면 별도의 코루틴에서 Flow를 런칭 할 수 있다. 이로서 Flow가 비동기적으로 실행되기 때문에 MainThread에서 코드가 멈추지 않는다.
-launchIn의 코드는 아래와 같은데 종단 연산자인 collect가 내장 함수로 등록되어있어서 별도로 종단 연산자를 사용하지 않아도 된다.
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
collect() // tail-call
}
fun events2() : Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking {
events2()
.onEach { event -> println("Event : $event ${Thread.currentThread().name}") }
.launchIn(this)
println("Done ${Thread.currentThread().name}")
}