[Kotlin] Coroutine의 Channel 알아보기

박상군·2024년 3월 20일
1

Kotlin

목록 보기
5/9
post-thumbnail

코루틴 네가 내 별이다.

Channel 이란?

데이터를 stream 처럼 전송하기 위한 인터페이스며 구조는 BlockingQueue와 비슷하며, 동일하게 ThreadSafe 한 형태의 구조를 가지고 있습니다.

데이터를 전송하는 SendChannel 과 데이터를 소비하는 ReceiveChannel로 이루어져 있으며 flow와 다른 점은 직접 데이터를 생산하는 것이 아닌 전달의 개념이라는 것이 다른 점입니다.

Channel의 기본 생성자를 살펴보면

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>

capacity, onBufferOverflow, onUndeliveredElement 이렇게 세 가지 매개변수를 제공합니다.

기본적으로 전송과 소비는 다음 함수를 사용합니다.

public fun trySend(element: E): ChannelResult<Unit>
public fun tryReceive(): ChannelResult<E>

public suspend fun send(element: E)
public suspend fun receive(): E

trySendtryReceive 함수는 suspend 하지 않기 때문에 성공과 실패 여부를 ChannelResult 라는 클래스를 통해 리턴해줍니다.
sendreceive는 suspend 함수이기 때문에 다른 coroutine scope 간에도 전송과 소비가 가능합니다.

우선 기본 생성자로 채널을 생성하여 로그를 찍어보면

val channel: Channel<Int> = Channel()
        repeat(10) {
            channel.trySend(it)
        }
        channel.tryReceive().getOrNull().run {
            println("받은 값 -> $this")
        }
        
받은 값 -> null

위의 결과를 보면 분명 send를 했지만 로그에는 null이 찍히는 걸 볼 수 있는데
이는 Channel의 capacity를 알면 쉽게 이해할 수 있다.

capacity란?

일종의 버퍼 개념인데 Channel에선 다음과 같이 capacity를 제공합니다.

  • RENDEZVOUS : 버퍼가 존재하지 않음
  • CONFLATED : 버퍼가 1이며, 가장 나중에 들어온 데이터만 보장
  • UNLIMITED : 버퍼가 무한대
  • BUFFERED : 시스템이 정한 버퍼값(64개) 사용

Channel의 기본 생성자를 보면 capacity가 RENDEZVOUS 라 버퍼가 존재하지 않기 때문에 데이터를 저장하지 못해 null로 나오게 됩니다.

그렇다면 각 capacity 별 결과 값을 로그로 찍어보면 다음과 같습니다.

val channel: Channel<Int> = Channel(capacity = Channel.CONFLATED)
        repeat(10) {
            channel.trySend(it)
        }
        channel.tryReceive().getOrNull().run {
            println("받은 값 -> $this")
        }
        
받은 값 -> 9

가장 최근 값이 덮어 씌워지기 때문에 9가 찍히게 됩니다.

val channel: Channel<Int> = Channel(capacity = Channel.UNLIMITED)
        repeat(10) {
            channel.trySend(it)
        }
        channel.tryReceive().getOrNull().run {
            println("받은 값 -> $this")
        }
        
받은 값 -> 0

Channel의 버퍼에는 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 전부 저장

val channel: Channel<Int> = Channel(capacity = 5)
        repeat(10) {
            channel.trySend(it)
        }
        channel.tryReceive().getOrNull().run {
            println("받은 값 -> $this")
        }
        
받은 값 -> 0

Channel의 버퍼에는 0, 1, 2, 3, 4 까지 버퍼 개수만큼 저장

위와 같은 결과를 볼 수 있습니다.

onBufferOverflow란?

버퍼가 가득 찼을 때 처리할 방법이고 다음과 같은 방식을 제공합니다.

  • SUSPEND : 더 이상 데이터를 저장하지 않고 대기
  • DROP_OLDEST : 가장 오래된 데이터를 제거하고 새로운 데이터를 저장
  • DROP_LATEST : 가장 최신의 데이터를 제거하고 새로운 데이터를 저장

Channel의 기본 생성자를 보면 default 값은 SUSPEND이고 각 DROP_OLDEST는 Queue 방식이고 DROP_LATEST는 Stack 방식이라고 생각하면 쉽습니다.

flow로 변환

또한 Channel에는 쉽게 flow로 변환하여 사용할 수 있게 확장 함수를 제공합니다.

  • receiveAsFlow()
  • consumeAsFlow()

두 함수의 차이점으로는

receiveAsFlow()

  • Channel을 Flow로 변환
  • Channel에서 소비자에게 데이터를 전송
  • 여러 개의 소비자가 같은 Channel에 대해 동시에 구독할 수 있음
  • 소비자가 데이터를 요청할 때마다 Channel에서 데이터를 받아와 Flow로 전달

consumeAsFlow()

  • Channel의 데이터를 소비하는 Flow를 생성
  • Channel에서 데이터를 소비하여 소비자에게 전달
  • 여러 개의 소비자가 같은 Channel에 대해 동시에 구독할 수 없음. 즉, 한 번에 하나의 소비자만이 데이터를 소비할 수 있음
  • Channel을 Flow로 변환하는 것이 아니라, 이미 생성된 Channel의 데이터를 소비하는 Flow를 생성

사용 방법은 일반적인 flow와 똑같습니다.

val channel: Channel<Int> = Channel(capacity = Channel.UNLIMITED)
        repeat(10) {
            channel.trySend(it)
        }
CoroutineScope(Dispatchers.Default).launch {
        channel.receiveAsFlow().onEach {
             println("받은 값 -> $it")
         }.collect()
    }
     
받은 값 -> 0
받은 값 -> 1
받은 값 -> 2
받은 값 -> 3
받은 값 -> 4
받은 값 -> 5
받은 값 -> 6
받은 값 -> 7
받은 값 -> 8
받은 값 -> 9
val channel: Channel<Int> = Channel(capacity = Channel.UNLIMITED)
        repeat(10) {
            channel.trySend(it)
        }
CoroutineScope(Dispatchers.Default).launch {
        channel.consumeAsFlow().onEach {
             println("받은 값 -> $it")
         }.collect()
    }
     
받은 값 -> 0
받은 값 -> 1
받은 값 -> 2
받은 값 -> 3
받은 값 -> 4
받은 값 -> 5
받은 값 -> 6
받은 값 -> 7
받은 값 -> 8
받은 값 -> 9

마무리

Reference

kotlin-channels

0개의 댓글