[Coroutines] 채널과 친해지기

빙티·2025년 4월 27일

Kotlin Coroutines

목록 보기
1/2
post-thumbnail

💡 채널이란?
코루틴 간 연속된 데이터 스트림을 전달하기 위한 API

채널의 특징

  • 큐(Queue)처럼 선입선출로 데이터를 추가·삭제한다.
  • 채널로 전송된 모든 값은 단 한 번만 수신할 수 있다.
  • 일반적으로 송·수신자는 각각 하나이지만, 제한 없이 수를 설정할 수도 있다.


Channel 인터페이스는 서로 다른 두 인터페이스 SendChannelReceiveChannel를 구현한다.

interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

값을 보낼 때 send(element: E)를, 채널을 닫을 때 close()를 사용한다.

interface SendChannel<in E> {
	suspend fun send(element: E) // 값 전송
	fun close(cause: Throwable? = null): Boolean // 채널 닫기
	// ...
}

ReceieveChannelreceive()는 값을 받을 때 사용한다.

interface ReceiveChannel<out E> {
	suspend fun receive(): E // 값 수신
	fun cancel(cause: CancellationException? = null) // 채널 취소
	// ...
}

send()receive()는 모두 supsend 함수이므로 코루틴 내부에서 호출해야 한다.
채널을 통해 값을 전달하는 예제 코드를 살펴보자.

val channel = Channel<Int>()

launch {
    for (x in 1..5) channel.send(x * x)
}

repeat(5) {
	println(channel.receive())
}

println("Done!")

결과는 다음과 같다.

1
4
9
16
25
Done!



💡 의문
위 코드는 for과 repeat로 데이터를 보내고 받는다.
만약 데이터를 보내는 횟수와 받는 횟수가 일치하지 않는다면 어떻게 될까?

이 궁금증을 해결하기 위해선 send()receive()가 언제 중단되는지 알아야 한다.
각 함수는 작업을 바로 수행할 수 없을 때 중단되며, 채널에서 대기할 수 있다. (정확히 말하면 채널의 송·수신 큐에 작업을 예약한다.)

아래는 각각 송·수신 작업이 대기 중인 채널을 출력한 결과다.

// 값 1을 보내려는 send 작업이 대기 중
capacity=0,data=[(send,1)]
// 여러 코루틴의 send 대기 가능
capacity=0,data=[(send,0),(send,1),(send,2)]

// 값을 받으려는 receive 작업이 대기 중
capacity=0,data=[receive]
// 여러 코루틴의 receive 대기 가능
capacity=0,data=[receive,receive,receive]

송신, 수신 작업을 바로 수행할 수 없을 때가 언제인지 정확히 이해하려면 선수 지식이 필요하다.




버퍼

Channel의 생성자 파라미터 capacity로 버퍼의 크기를 정할 수 있다.
버퍼는 채널이 임시로 저장할 수 있는 값의 개수, 즉 채널의 용량을 의미하며 기본 값은 0(RENDEZVOUS)이다.

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>

각 함수가 중단하는 경우는 아래와 같다.
(버퍼 크기가 0인 채널을 '버퍼 없는 채널', 0 이상인 채널을 '버퍼 있는 채널'이라고 표현했다.)


send()

  • 버퍼 없는 채널 : 수신 작업이 대기 중이지 않을 때 중단
  • 버퍼 있는 채널 : 버퍼가 가득 차 있으면 중단

receive()

  • 버퍼 없는 채널 : 송신할 데이터가 대기 중이지 않을 때 중단
  • 버퍼 있는 채널 : 버퍼가 비어있으면 중단

버퍼 크기가 0인 채널에 send()를 호출하면 receive()를 만나야 재개된다.
반면 버퍼 크기가 N개인 채널은, 버퍼가 꽉 차기 전까지 중단 없이 send()를 호출할 수 있다.

마찬가지로 버퍼 크기가 0인 채널에 receive()를 호출하면 send()를 만나야 재개된다.
또한 버퍼가 가득 찬 상태에서 receive()를 호출하면, 버퍼 공간이 확보되기 전까지 중단된다.

그래서 send()receive()가 매치되지 않는다면 이를 호출한 코루틴은 계속 중단 상태로 남을 수 있다.




💡 의문
그렇다면 send와 receive를 매치하기 위해 송·수신값의 개수를 계속 체크해야 할까?

채널이 제공하는 아래 함수들을 사용하면 더 안전하고 편리하게 데이터를 보내고 받을 수 있다.


데이터를 보내는 동안만 채널 열어두기

코루틴 빌더인 produce를 사용하면, 값을 모두 보내고 해당 코루틴이 종료되었을 때 채널을 자동으로 닫을 수 있다.
produce는 내부에서 값을 보내는 생산자(producer) 코루틴을 만들고, 출력 수단ReceiveChannel을 반환하는 특별한 코루틴 빌더이다.
ReceiveChannel은 수신 전용 채널이므로 외부에서 해당 채널에 send()할 수 없다.
따라서 유일한 송신자는 produce 안의 send()로 제한된다.

launch {
	val channel = produce {
	    for (i in answer) {
	        send(i)
	    }
	}
    // channel.send(1) <- 호출 불가
}

채널이 열려있는 동안만 데이터 가져오기

수신자가 채널에서 총 몇개의 값이 올 지에 대한 정보를 아는 경우는 드물다.
이 때 for 루프consumeEach 함수를 이용하면, 채널이 닫힐 때까지 값을 받을 수 있다.

val channel = Channel<Int>()

launch {
    val nums1 = mutableListOf<Int>()
    for (element in channel) { // for loop로 받기
        nums1.add(element)
    }
}

launch {
    val nums2 = mutableListOf<Int>()
    channel.consumeEach { // consumeEach로 받기
        nums2.add(it)
    }
}
profile
할머니에게 설명할 수 없다면 제대로 이해한 게 아니다

0개의 댓글