코루틴의 비동기 스트림 API를 지원하는 Flow와 Channel에 대해서 알아보자.
채널은 일종의 파이프라인이다. 채널을 열고 한쪽에서 값을 보내면(send) 다른 쪽에서 수신하는(receive) 개념이다.(이렇게 채널을 생성하는 패턴을 파이프라인이라고 한다.) Channel은 여러 방향에서 데이터를 던지고 받는 형식으로 코루틴끼리의 데이터를 전달하기 위해 사용한다.
또한, Channel은 수신하기 전에 데이터를 보내는 특성이 있으며 Hot Stream
이라고도 한다.
구조는 BlockingQueue와 비슷하며, 동일하게 Thread-Safe한 형태의 구조를 가지고 있다.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic,
// we'll just send five squares
for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
}
[Result]
1
4
9
16
25
Done!
값을 받을 때마다 출력하며, 두 코루틴이 채널을 통해서 값을 주고 받을 수 있다. 기본적으로 send()
와 receive()
로 데이터를 주고 받을 수 있으며 for문을 통해서 receive()를 대신할 수도 있다.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
}
[Result]
1
4
9
16
25
Done!
위의 코드와 다른 점은 close()
가 추가된 부분이다. close()가 없다면 "Done!"을 출력하지 않으며 Channel 파이프라인이 종료되지 않는다.
또 다른 방법으로는 producer-consumer
패턴을 이용하는 방법이며 produce builder를 통해 코루틴을 작성한다. consumeEach
가 for문을 대체한다.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (x in 1..5) send(x * x)
}
fun main() = runBlocking {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
결과는 동일하다.
[Channel에 대해서 조금 더 살펴보기]
Channel에서 consumeAsFlow
를 사용하면 데이터를 Flow
형식으로 받는게 가능하다. 다만, 소비를 Cold Stream(Flow)으로 할 뿐 데이터를 생산(즉, 배출)하는 것은 Hot Stream(Channel)이기에 사용을 기다리지 않고 한번에 배출된다. 그래서 Flow
처럼 여러 곳에서 동일한 데이터를 받는 것은 불가능하다.
val channel = produce<Int> { for (x in 0..5) send(x) }
val flowFromChannel = channel.consumeAsFlow()
flowFromChannel.collect{
println("1 $it")
}
flowFromChannel.collect{
println("2 $it")
}
Result
1 0
1 1
1 2
1 3
1 4
2 4
val flow = (0..5).asFlow()
flow.collect{
println("1 $it")
}
flow.collect{
println("2 $it")
}
Result
1 0
1 1
1 2
1 3
1 4
2 0
2 1
2 2
2 3
2 4
첫번째 코드인 consumeAsFlow
을 통해 만들어진 Flow는 첫번째 collect만 동작되었고 두번째 코드인 Flow는 모든 collect가 동작하는 것을 확인할 수 있다.
consumeAsFlow()가 필요한 경우는 다양한 연산자들(find, map, filter, first 등)을 사용하기 위함이다.
여러 방향에서 데이터를 던지고 받는 Channel의 특성을 지칭하는 용어로 Fan-In, Fan-Out을 사용한다.
for-loop와 consumeEach의 동작이 다른데 아래 코드를 통해서 확인해보자. 받는 곳이 두 곳일 때, 한쪽이 에러가 발생한 상황에 대한 코드이다.
// consumeEach
launch {
source.consumeEach {
println("1 $it")
}
}
launch {
try {
source.consumeEach {
throw Exception()
}
}catch (e : Exception){
...
}
}
----------------------------
// for-loop
launch {
try {
for (x in source) {
throw Exception()
println("1 $x")
}
}catch (e : Exception){
...
}
}
launch {
for (x in source) {
println("2 $x")
}
}
에러가 발생하지 않는 경우라면 동일한 동작을 하지만, 에러가 발생했을 때 결과가 달라진다.
consumeEach
의 경우 에러가 발생하면 채널 자체가 close
된다. 따라서 사용하는 곳이 모두 다 같이 종료된다.
for-loop
의 경우 에러가 발생한 for-loop
만 중단되고 채널은 종료시키지 않는다. 따라서 사용하고 있는 다른 곳들은 정상적인 동작을 이어간다.
가이드 문서에서는 for-loop를 완벽하게 안전한 것이라 표현하고 있으니 Fan-Out에서는 for-loop를 사용하는게 좋다.
버퍼의 사이즈를 직접 지정하거나 미리 지정된 형태를 통해 지정할 수 있다.
UNLIMITED : 제한이 Int.MAX_VALUE인 상태
RENDEZVOUS(Default) : 버퍼가 없는 상태
CONFLATED : 항상 최신의 값 하나만 가지고 있는 상태로 데이터가 있는 상태에서 새로운 데이터가 들어오면 이전 데이터는 소실됨
BUFFERED : 시스템이 정한 버퍼값(64개)을 가지고 있는 상태
Channel은 FIFO(First in First out)으로 동작하여 하나의 채널이 독점하지 않고 순차적으로 데이터를 가져간다.
val channel = Channel<Int>()
launch {
delay(100)
channel.send(0)
}
launch {
repeat(4){
val x = channel.receive()
println("1 $x")
channel.send(x+1)
}
}
launch {
repeat(4){
val x = channel.receive()
println("2 $x")
channel.send(x+1)
}
}
Result
1 0
2 1
1 2
2 3
1 4
2 5
1 6
2 7
위 코드를 보면 자기가 던진 데이터를 자기가 가져가지 않고 먼저 receive한 곳에서 가져가는 모습을 통해 아주 공평한 모습을 가지고 있다는 걸 알 수 있다.
Channel과 다르게 가져오는 시점을 지정할 수 있다. 우리가 원하는 시점에 데이터를 가져올 수 있으며 이러한 특성을 Cold Stream
이라고 한다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i * i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
[Result]
Calling simple function...
Calling collect...
Flow started
1
4
9
Calling collect again...
Flow started
1
4
9
이처럼 collect
하는 시점부터 데이터를 가져올 수 있다. collect 블록은 collect를 호출하기 전까지 수행되지 않는다.
Flow가 수행되는 스레드를 변경하고 싶다면 .flowOn()
을 통해서 디스패처를 적절하게 변경하면 된다. 또한, Flow는 map과 같은 다양한 연산자를 지원하기 때문에 필요에 따라서 데이터를 변환하여 사용하면 된다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return (request * request).toString()
}
fun main() = runBlocking<Unit> {
(1..5).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
[Result]
Calling simple function...
Calling collect...
Flow started
1
4
9
Calling collect again...
Flow started
1
4
9
위에서 본 것처럼 Channel과 Flow는 데이터를 가져오는 시점에 차이가 있다. Flow는 사용자가 Collect를 호출하는 시점에 원할때 가져올 수 있으며 Cold라고 부르며, 호출하기도 전에 데이터를 가져오는 Channel 같은 경우는 Hot이라고 부른다.