channel api : 코루틴 간에 primitive 통신을 위해 추가된 API
channel 로 보내진 모든 값들은 오직 한 번만 수신할 수 있다.
channel interface
interface SendChannel<in E> {
suspend fun send(element : E)
fun close() : Boolean
interface REceiveChannel<out E> {
suspend fun receive() : E
fun cancel (cause : CancellationException? = null)
}
interface Channel<E> : SendChannel<E>, ReciveChannel<E>
produce 함수를 통해 channel에 값을 전송하자. 단순히 send를 통해 channel에 전송할 경우에는 exception 발생 시, 수신측에서 timeout과 같은 제약조건을 두지 않는 이상 영원히 suspend되기 때문이다.
fun CoroutineScope.produceNumbers(
max : Int
) : ReceiveChannel<Int> = produce {
var x = 0;
while (x < 5) send(x++)
}
ReceiveChannel 을 반환값으로 설정한 덕분에, close
함수는 호출될 것이다.
send
함수는 절대 중지되지 않는다.send
메소드는 중지된다.이러한 option을 사용하기 위해서는 위에서 구현한 produce
를 사용할 수 없다. Channel
함수를 사용해서 option을 구현해야한다.
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Int>(
capacity = 2,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
launch {
repeat(5) { index ->
channel.send(index * 2)
delay(100)
println("Sent")
}
channel.close()
}
val channel = Channel<Response>(
capacity,
onUndeliveredElement = { resource ->
resource.close()
}
)
}
val resourceToSend = openResource()
channel.send(resourceToSend)
vla resourceReceived = channel.receive()
try{
// work with received resource
} finally {
resourceReceived.close()
}
위의 메소드는 기본적으로 consume되지 않는 요소들을 감지하고 적절한 액션을 취할 수 있게 해준다.
Flow
객체의 에러를 핸들링(retrying..)하거나 로깅하는데 유용하다.