
코루틴은 비동기적인 작업을 서로 다른 스레드에서 동시에 작업할 수 있도록 설계 되어있습니다.
이때 하나의 데이터를 처리하기 위해서 여러 코루틴에서 동시에 작업을 한다면 공유 상태로 인한 충돌이 발생하여 원하지 않은 결과값을 받을 수 있으며, 코루틴만을 이용해서 순서 보장을 하게 된다면 동시성을 가지기 어려울 수 있습니다. 이를 해결하기 위해서채널(Channel)이라는 API를 제공하고 있습니다.
채널은 어떤 행위를 하는 것일까요?
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
public interface SendChannel<in E> {
public val isClosedForSend: Boolean
public suspend fun send(element: E)
public val onSend: SelectClause2<E, SendChannel<E>>
public fun trySend(element: E): ChannelResult<Unit>
public fun close(cause: Throwable? = null): Boolean
public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
}
public interface ReceiveChannel<out E> {
public val isClosedForReceive: Boolean
public val isEmpty: Boolean
public suspend fun receive(): E
public val onReceive: SelectClause1<E>
public suspend fun receiveCatching(): ChannelResult<E>
public val onReceiveCatching: SelectClause1<ChannelResult<E>>
public fun tryReceive(): ChannelResult<E>
public operator fun iterator(): ChannelIterator<E>
public fun cancel(cause: CancellationException? = null)
}
채널은 SendChannel, ReceiveChannel의 인터페이스를 구현한 하나의 인터페이스 입니다.
채널은 두개의 인터페이스를 상속받아 각 구현이 되어있기에 SendChannel, ReceiveChannel 중 하나의 타입으로 강제화 시켜 역할의 강제성을 부여하는 것이 가능합니다.
각 인터페이스의 함수인 send, receive를 보면 일시 중단 함수(suspsend)인 것을 확인할 수 있습니다.
채널의 간단한 예시로 각기 다른 코루틴에서 생성자와 소비자가 있어야하며, 생성자는 원소를 보내고 소비자는 원소를 받도록 되어있습니다.
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
repeat(5) { index ->
println("producing next one")
delay(1000)
channel.send(index)
}
channel.close()
}
launch {
for (elemnet in channel) {
print(elemnet)
}
}
}
위 예시는 channel 객체를 생성하고, 1초 마다 send(index)를 호출해서 채널에 원소를 보내도록 되어있으며, 5개의 원소를 보내고 나서 채널를 닫는 close()를 호출하고 있습니다.
소비자는 for 문을 사용해서 channel에 값이 들어올 때까지 코루틴을 일시 중단 하고 해당 원소를 받으면 출력 하도록 되어있습니다.
하지만 위 방식의 문제점은 예외 등 여러 상황에서 채널을 닫는 걸 깜빡하기 쉽다는 것입니다.
채널을 닫지 않는다면 소비자의 코루틴은 무한 일시 중단 상태가 될 수 있고, GC에 의해서 처리되지 못할 수 도 있기에 사용이 끝난 코루틴은 꼭 닫을 수 있도록 해야합니다.
이러한 문제는 해결하기 위해 코루틴에서는 produce 함수를 제공하고 있습니다.
public fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>
ReceiveChannel 타입을 반환하는 produce 사용한다면 내부 블록의 작업이 모두 끝나거나, 취소 된다면 onCompleted, onCancelled 함수를 호출하여 채널을 close() 받드시 호출할 수 있도록 합니다.
fun main() = runBlocking {
val channel = produce {
repeat(5) { index ->
println("Producing next one")
delay(1000)
send(index*2)
}
}
for (element in channel) {
println(element)
}
}
위 코드를 보면 produce 로 생성자를 생성하고 생성된 채널을 소비자가 값을 받아서 출력하는 것을 볼 수 있습니다. 여기에서 생성자의 모든 행위가 끝났을 때 close()가 호출되고 최상위 루트 코루틴이 종료되어 프로세스가 종료되는 것을 확인할 수 있습니다.
설정한 용량 크기에 따라 채널을 네 가지로 구분할 수 있습니다.
Channel.UNLIMITED로 설정된 채널로, send가 중단되지 않습니다.Channel.BUFFERED(기본값 64)로 설정된 채널Channel.RENDEZVOUS(용량이 0입니다)인 채널로, 송신자와 수신자가 만날 때만 원소를 교환합니다.채널을 커스텀화하기 위해 버퍼가 꽉 찼을 때의 행동을 정의할 수 있습니다.
(onBUfferOverflow)
SUSPEND(기본 옵션): 버퍼가 가득 찼을 때, send 메서드가 중단됩니다.DROP_OLDEST: 버퍼가 가득 찼을 때, 가장 오래된 원소가 제거됩니다.DROP_LATEST: 버퍼가 가득 찼을 때, 가장 최근의 원소가 제거됩니다.원소가 어떠한 이유로 처리되지 않을 때 호출되는 onUndeliveredElement 대부분 채널이 닫히거나 취소되었음을 의미하지만 send, receive, receiveOrNull 또는 hasNext가 에러를 던질 때 발생할 수도 있습니다. 주로 채널에서 보낸 자원을 닫을 때 사용합니다.
여러 개의 코루틴이 하나의 채널로부터 원소를 받을 수 있습니다.

만약 생성자는 하나인데 소비자가 여러개라면 어떻게 될까요?
이때 채널은 원소를 공평하게 배분합니다. 채널에서 원소를 기다리는 소비자 코루틴들은 FIFO 큐로 가지고 있습니다.
여러개의 코루틴이 하나의 채널로 원소를 전송할 수 있습니다.
