a stream of value that are asynchronously cloputedflow { ... } : builder
emit(value) : trasmit a value
collect { ... } : receive the value, collect가 호출되어야 flow 가 데이터 trasmit 을 시작한다.
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
fun main() {
runBlocking {
println("receiving prime numbers")
sendPrimes().collect {
println("[${Date(Calendar.getInstance().timeInMillis)}] Received prime number $it")
}
println("Finished receiving numbers")
}
}
fun sendPrimes(): Flow<Int> = flow {
val primeList = listOf(2, 3, 5, 7, 11, 13, 17, 19, 23, 29)
primeList.forEach {
delay(it * 100L)
emit(it)
}
}
prime number * 100 밀리초만큼 지연해서 emit 하도록 sendPrimes() 함수를 정의하였고, main 에서 실행한 결과에서 타임스탬프를 확인해보면 그대로 지연 출력되는걸 알 수 있다.
fun main() {
runBlocking {
sendNumberByEmit().collect { println("Received number by emit($it)") }
sendNumberByCollection().collect { println("Received number by Collection $it") }
sendNumberByParams().collect { println("Received number by params $it") }
}
}
fun sendNumberByEmit() = flow {
for (i in 0..10) { emit(i) }
}
fun sendNumberByCollection() = listOf(1,2,3).asFlow()
fun sendNumberByParams() = flowOf(1,2,3)
??operator : flow 를 input 으로 받거나, flow 를 전달 하거나, output 으로 flow 를 반환함.
collection 에 map 을 적용하는것과 비슷하다
fun main() {
runBlocking {
mapOperator()
}
}
suspend fun mapOperator() {
(1..10).asFlow()
.map {
delay(500L)
"mapping $it"
}
.collect {
println(it)
}
}
//mapping 1
//mapping 2
//mapping 3
//mapping 4
//mapping 5
//mapping 6
//mapping 7
//mapping 8
//mapping 9
//mapping 10
역시 collection 에 filter 를 적용하는 것과 비슷하다.
fun main() {
runBlocking {
filterOperator()
}
}
suspend fun filterOperator() {
(1..10).asFlow()
.filter {
it % 2 == 0
}
.collect {
println(it)
}
}
// 2
// 4
// 6
// 8
// 10
원하는 순간에 어떤 값이든 emit 할 수 있도록 해준다.
fun main() {
runBlocking {
transformOperator()
}
}
suspend fun transformOperator() {
(1..10).asFlow()
.transform {
if (it % 2 ==0) {
emit("even ")
}
emit(it)
emit("\n")
}
.collect {
print(it)
}
}
//1
//even 2
//3
//even 4
//5
//even 6
//7
//even 8
//9
//even 10
몇번째 값까지 가져갈 것인지를 지정한다.
fun main() {
runBlocking {
takeOperator()
}
}
suspend fun takeOperator() {
(1..10).asFlow()
.take(2)
.collect {
println(it)
}
}
//1
//2
flow 를 collection 으로 변환한다
이하의 operator 는 각각이 flow 를 실행되도록 만든다.
suspend fun reduceOperator() {
val size = 10
val factorial = (1..size).asFlow()
.reduce { accumulator, value ->
accumulator * value
}
println("Factorial of $size is $factorial")
}
flow 가 emit 되는 context 를 바꾼다.
suspend fun flowOnOperator() {
(1..10).asFlow()
.flowOn(Dispatchers.IO)
.collect {
println(it)
}
}
flow 가 값을 생성하는 시간 < flow 의 값을 받아서 처리하는 프로세스의 시간 인 경우 나중에 처리할 flow 의 값을 buffer 에 저장하는 것을 buffering 이라고 한다.
fun main() {
runBlocking {
val time = measureTimeMillis {
generate().collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")
}
}
fun generate() : Flow<Int> = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
각각의 값들이 emit 되기까지 100ms, 그리고 프로세싱에 300ms로 각 emit 마다 총 400ms 이 소요되도록 하였다. 따라서 measureTimeMillis {...} 를 통해 전체 프로세스의 경과 시간을 측정해보면 1200ms ~ 1300ms 사이의 시간이 소요되는 확인할 수 있다. flow 값 생성과 처리가 동기적으로 처리 된다는 것을 알 수 있다.
하지만 buffering 을 이용한다면? flow 값 생성과 처리가 비동기적으로 처리 되는 것과 같은 시간 감소 효과를 볼 수 있다.
fun main() {
runBlocking {
val time = measureTimeMillis {
generate()
.buffer() // buffer 추가
.collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")
}
}
이때 1000ms ~ 1100ms 시간이 소요되어 전체 처리 시간이 감소하는 효과가 있다.
fun main() {
runBlocking {
val eng = flowOf("one", "two", "three")
val french = flowOf("un", "deux", "troix")
eng.zip(french) { a, b ->
"'$a' in French is '$b'"
}.collect {
println(it)
}
}
}
fun main() {
runBlocking {
val numbers = (1..5).asFlow()
.onEach { delay(300L) }
val values = flowOf("one", "two", "three", "four", "five")
.onEach {
delay(400L)
}
numbers.combine(values) { a, b ->
"$a -> $b"
}.collect {
println(it)
}
}
}
//1 -> one
//2 -> one
//2 -> two
//3 -> two
//3 -> three
//4 -> three
//5 -> three
//5 -> four
//5 -> five
전통적인 try / catch 방법
fun main() {
runBlocking {
tryCatch()
}
}
suspend fun tryCatch() {
try {
(1..3).asFlow()
.onEach {
check(it != 2) // check(boolean) false 면 throw IllegalStateException
}
.collect { println(it) }
} catch (e: IllegalStateException) {
println("Caught exception $e")
}
}
전통적인 방법에 비해 코드가 간결해지지만, catch() operator 가 붙기 이전에 발생한 예외에 대해서만 잡아내기 때문에 주의가 필요함. 하지만 이러한 특성 덕분에 각각의 point 에서 원하는 exception 을 잡아내는데 이용할 수 있음.
fun main() {
runBlocking {
catch()
}
}
suspend fun catch() {
(1..3).asFlow()
.onEach { check(it != 2) }
.catch { e -> println("Caught exception $e") }
.collect { println(it) }
}
try / catch 에서 finally 와 비슷한 역할을 한다.
fun main() {
runBlocking {
onCompletion()
}
}
suspend fun onCompletion() {
(1..3).asFlow()
.onEach { check(it != 2) }
.onCompletion { e ->
if (e != null) {
println("Flow completed with exception $e")
} else {
println("Flow completed successfully")
}
}
.catch { e -> println("Caught exception $e") }
.collect { println(it) }
}