코루틴 네가 내 별이다.
데이터를 stream 처럼 전송하기 위한 인터페이스며 구조는 BlockingQueue와 비슷하며, 동일하게 ThreadSafe 한 형태의 구조를 가지고 있습니다.
데이터를 전송하는 SendChannel 과 데이터를 소비하는 ReceiveChannel로 이루어져 있으며 flow와 다른 점은 직접 데이터를 생산하는 것이 아닌 전달의 개념이라는 것이 다른 점입니다.
Channel의 기본 생성자를 살펴보면
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>
capacity, onBufferOverflow, onUndeliveredElement 이렇게 세 가지 매개변수를 제공합니다.
기본적으로 전송과 소비는 다음 함수를 사용합니다.
public fun trySend(element: E): ChannelResult<Unit>
public fun tryReceive(): ChannelResult<E>
public suspend fun send(element: E)
public suspend fun receive(): E
trySend
와 tryReceive
함수는 suspend 하지 않기 때문에 성공과 실패 여부를 ChannelResult 라는 클래스를 통해 리턴해줍니다.
send
와 receive
는 suspend 함수이기 때문에 다른 coroutine scope 간에도 전송과 소비가 가능합니다.
우선 기본 생성자로 채널을 생성하여 로그를 찍어보면
val channel: Channel<Int> = Channel()
repeat(10) {
channel.trySend(it)
}
channel.tryReceive().getOrNull().run {
println("받은 값 -> $this")
}
받은 값 -> null
위의 결과를 보면 분명 send를 했지만 로그에는 null이 찍히는 걸 볼 수 있는데
이는 Channel의 capacity를 알면 쉽게 이해할 수 있다.
일종의 버퍼 개념인데 Channel에선 다음과 같이 capacity를 제공합니다.
- RENDEZVOUS : 버퍼가 존재하지 않음
- CONFLATED : 버퍼가 1이며, 가장 나중에 들어온 데이터만 보장
- UNLIMITED : 버퍼가 무한대
- BUFFERED : 시스템이 정한 버퍼값(64개) 사용
Channel의 기본 생성자를 보면 capacity가 RENDEZVOUS 라 버퍼가 존재하지 않기 때문에 데이터를 저장하지 못해 null로 나오게 됩니다.
그렇다면 각 capacity 별 결과 값을 로그로 찍어보면 다음과 같습니다.
val channel: Channel<Int> = Channel(capacity = Channel.CONFLATED)
repeat(10) {
channel.trySend(it)
}
channel.tryReceive().getOrNull().run {
println("받은 값 -> $this")
}
받은 값 -> 9
가장 최근 값이 덮어 씌워지기 때문에 9가 찍히게 됩니다.
val channel: Channel<Int> = Channel(capacity = Channel.UNLIMITED)
repeat(10) {
channel.trySend(it)
}
channel.tryReceive().getOrNull().run {
println("받은 값 -> $this")
}
받은 값 -> 0
Channel의 버퍼에는 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 전부 저장
val channel: Channel<Int> = Channel(capacity = 5)
repeat(10) {
channel.trySend(it)
}
channel.tryReceive().getOrNull().run {
println("받은 값 -> $this")
}
받은 값 -> 0
Channel의 버퍼에는 0, 1, 2, 3, 4 까지 버퍼 개수만큼 저장
위와 같은 결과를 볼 수 있습니다.
버퍼가 가득 찼을 때 처리할 방법이고 다음과 같은 방식을 제공합니다.
- SUSPEND : 더 이상 데이터를 저장하지 않고 대기
- DROP_OLDEST : 가장 오래된 데이터를 제거하고 새로운 데이터를 저장
- DROP_LATEST : 가장 최신의 데이터를 제거하고 새로운 데이터를 저장
Channel의 기본 생성자를 보면 default 값은 SUSPEND이고 각 DROP_OLDEST는 Queue 방식이고 DROP_LATEST는 Stack 방식이라고 생각하면 쉽습니다.
또한 Channel에는 쉽게 flow로 변환하여 사용할 수 있게 확장 함수를 제공합니다.
- receiveAsFlow()
- consumeAsFlow()
두 함수의 차이점으로는
사용 방법은 일반적인 flow와 똑같습니다.
val channel: Channel<Int> = Channel(capacity = Channel.UNLIMITED)
repeat(10) {
channel.trySend(it)
}
CoroutineScope(Dispatchers.Default).launch {
channel.receiveAsFlow().onEach {
println("받은 값 -> $it")
}.collect()
}
받은 값 -> 0
받은 값 -> 1
받은 값 -> 2
받은 값 -> 3
받은 값 -> 4
받은 값 -> 5
받은 값 -> 6
받은 값 -> 7
받은 값 -> 8
받은 값 -> 9
val channel: Channel<Int> = Channel(capacity = Channel.UNLIMITED)
repeat(10) {
channel.trySend(it)
}
CoroutineScope(Dispatchers.Default).launch {
channel.consumeAsFlow().onEach {
println("받은 값 -> $it")
}.collect()
}
받은 값 -> 0
받은 값 -> 1
받은 값 -> 2
받은 값 -> 3
받은 값 -> 4
받은 값 -> 5
받은 값 -> 6
받은 값 -> 7
받은 값 -> 8
받은 값 -> 9