Coroutine Asynchronous Flow

probehub·2022년 2월 16일

android coroutine

목록 보기
4/7
  • Asynchronous Flow : a stream of value that are asynchronously cloputed

flow { ... } : builder
emit(value) : trasmit a value
collect { ... } : receive the value, collect가 호출되어야 flow 가 데이터 trasmit 을 시작한다.

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking

fun main() {

    runBlocking {
        println("receiving prime numbers")
        sendPrimes().collect {
            println("[${Date(Calendar.getInstance().timeInMillis)}] Received prime number $it")
        }
        println("Finished receiving numbers")
    }
}

fun sendPrimes(): Flow<Int> = flow {
    val primeList = listOf(2, 3, 5, 7, 11, 13, 17, 19, 23, 29)
    primeList.forEach {
        delay(it * 100L)
        emit(it)
    }
}

prime number * 100 밀리초만큼 지연해서 emit 하도록 sendPrimes() 함수를 정의하였고, main 에서 실행한 결과에서 타임스탬프를 확인해보면 그대로 지연 출력되는걸 알 수 있다.

flow 생성하기

  • 각각의 값을 emit() 함으로써 생성하기
  • collection 타입은 flow 로 전환될 수 있다.
  • 파라미터를 받아서 flow 만들기 flowOf(vararg el: T)
fun main() {
    runBlocking {
        sendNumberByEmit().collect { println("Received number by emit($it)") }
        sendNumberByCollection().collect { println("Received number by Collection $it") }
        sendNumberByParams().collect { println("Received number by params $it") }
    }
}

fun sendNumberByEmit() = flow {
    for (i in 0..10) { emit(i) }
}

fun sendNumberByCollection() = listOf(1,2,3).asFlow()

fun sendNumberByParams() = flowOf(1,2,3)

flow properties

  • Cold : Flow 는 collect 함수가 호출되기 전까지는 동작하지 않는다. 이러한 특성을 cold 하다고 한다.
  • Cancellation : flow 는 스스로 작업을 취소할 수 없다. 해당 flow 를 포괄하고 있는 coroutine 의 작업이 취소 될때 같이 취소된다.

Operator

??operator : flow 를 input 으로 받거나, flow 를 전달 하거나, output 으로 flow 를 반환함.

  • 이러한 operator 역시 cold 하기 때문에 그 자체로 flow 를 동작시키지는 않는다.
  • 반환(return) 되는 flow 는 동기적으로 처리된다.

map operator

collection 에 map 을 적용하는것과 비슷하다

fun main() {
    runBlocking {
        mapOperator()
    }
}

suspend fun mapOperator() {
    (1..10).asFlow()
        .map {
            delay(500L)
            "mapping $it"
        }
        .collect {
            println(it)
        }
}
//mapping 1
//mapping 2
//mapping 3
//mapping 4
//mapping 5
//mapping 6
//mapping 7
//mapping 8
//mapping 9
//mapping 10

filter

역시 collection 에 filter 를 적용하는 것과 비슷하다.

fun main() {
    runBlocking {
        filterOperator()
    }
}

suspend fun filterOperator() {
    (1..10).asFlow()
        .filter {
            it % 2 == 0
        }
        .collect {
            println(it)
        }
}
// 2
// 4
// 6
// 8
// 10

transform

원하는 순간에 어떤 값이든 emit 할 수 있도록 해준다.

fun main() {
    runBlocking {
        transformOperator()
    }
}

suspend fun transformOperator() {
    (1..10).asFlow()
        .transform {
            if (it % 2 ==0) {
                emit("even ")
            }
            emit(it)
            emit("\n")
        }
        .collect {
            print(it)
        }
}

//1
//even 2
//3
//even 4
//5
//even 6
//7
//even 8
//9
//even 10

take

몇번째 값까지 가져갈 것인지를 지정한다.

fun main() {
    runBlocking {
        takeOperator()
    }
}

suspend fun takeOperator() {
    (1..10).asFlow()
        .take(2)
        .collect {
            println(it)
        }
}
//1
//2

terminal flow

flow 를 collection 으로 변환한다
이하의 operator 는 각각이 flow 를 실행되도록 만든다.

  • collect
  • toList
  • toSet
  • reduce
suspend fun reduceOperator() {
    val size = 10
    val factorial = (1..size).asFlow()
        .reduce { accumulator, value ->
        accumulator * value
        }
    println("Factorial of $size is $factorial")
}

flowOn

flow 가 emit 되는 context 를 바꾼다.

suspend fun flowOnOperator() {
    (1..10).asFlow()
        .flowOn(Dispatchers.IO)
        .collect {
            println(it)
        }
}

Buffering

flow 가 값을 생성하는 시간 < flow 의 값을 받아서 처리하는 프로세스의 시간 인 경우 나중에 처리할 flow 의 값을 buffer 에 저장하는 것을 buffering 이라고 한다.

fun main() {
    runBlocking {
        val time = measureTimeMillis {
            generate().collect { value ->
                delay(300)
                println(value)
            }
        }
        println("Collected in $time ms")
    }
}


fun generate() : Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

각각의 값들이 emit 되기까지 100ms, 그리고 프로세싱에 300ms로 각 emit 마다 총 400ms 이 소요되도록 하였다. 따라서 measureTimeMillis {...} 를 통해 전체 프로세스의 경과 시간을 측정해보면 1200ms ~ 1300ms 사이의 시간이 소요되는 확인할 수 있다. flow 값 생성과 처리가 동기적으로 처리 된다는 것을 알 수 있다.

하지만 buffering 을 이용한다면? flow 값 생성과 처리가 비동기적으로 처리 되는 것과 같은 시간 감소 효과를 볼 수 있다.

fun main() {
    runBlocking {
        val time = measureTimeMillis {
            generate()
                .buffer() // buffer 추가
                .collect { value ->
                delay(300)
                println(value)
            }
        }
        println("Collected in $time ms")
    }
}

이때 1000ms ~ 1100ms 시간이 소요되어 전체 처리 시간이 감소하는 효과가 있다.

composing flow

zip : 다른 flow 와 순서가 일치하는 값을 처리할 수 있다.

fun main() {
    runBlocking {
        val eng = flowOf("one", "two", "three")
        val french = flowOf("un", "deux", "troix")
        eng.zip(french) { a, b ->
            "'$a' in French is '$b'"
        }.collect {
            println(it)
        }
    }
}

combine : 두 flow 간의 가장 마지막 값을 처리할 수 있다.

fun main() {
    runBlocking {
        val numbers = (1..5).asFlow()
           .onEach { delay(300L) }
        val values = flowOf("one", "two", "three", "four", "five")
            .onEach {
                delay(400L)
            }
        numbers.combine(values) { a, b ->
            "$a -> $b"
        }.collect {
            println(it)
        }
    }
}
//1 -> one
//2 -> one
//2 -> two
//3 -> two
//3 -> three
//4 -> three
//5 -> three
//5 -> four
//5 -> five

Exception handling

try / catch

전통적인 try / catch 방법

fun main() {
    runBlocking {
        tryCatch()
    }
}

suspend fun tryCatch() {
   try {
       (1..3).asFlow()
           .onEach { 
           check(it != 2) // check(boolean) false 면 throw IllegalStateException
           }
           .collect { println(it) }
   } catch (e: IllegalStateException) {
       println("Caught exception $e")
   }
}

catch()

전통적인 방법에 비해 코드가 간결해지지만, catch() operator 가 붙기 이전에 발생한 예외에 대해서만 잡아내기 때문에 주의가 필요함. 하지만 이러한 특성 덕분에 각각의 point 에서 원하는 exception 을 잡아내는데 이용할 수 있음.

fun main() {
    runBlocking {
        catch()
    }
}

suspend fun catch() {
    (1..3).asFlow()
        .onEach { check(it != 2) }
        .catch { e -> println("Caught exception $e") }
        .collect { println(it) }
}

onCompletion()

try / catch 에서 finally 와 비슷한 역할을 한다.

fun main() {
    runBlocking {
        onCompletion()
    }
}

suspend fun onCompletion() {
    (1..3).asFlow()
        .onEach { check(it != 2) }
        .onCompletion { e ->
            if (e != null) {
                println("Flow completed with exception $e")
            } else {
                println("Flow completed successfully")
            }
        }
        .catch { e -> println("Caught exception $e") }
        .collect { println(it) }
}
profile
개발자

0개의 댓글