Channel

참치돌고래·2022년 12월 12일
0

channel

channel api : 코루틴 간에 primitive 통신을 위해 추가된 API

channel 로 보내진 모든 값들은 오직 한 번만 수신할 수 있다.

channel interface

  • SendChannel : send elements, close the channel
  • ReceiveChannel : receive elements
interface SendChannel<in E> {
	suspend fun send(element : E)
    fun close() : Boolean

interface REceiveChannel<out E> {
	suspend fun receive() : E
    fun cancel (cause : CancellationException? = null)
    }

interface Channel<E> : SendChannel<E>, ReciveChannel<E>

produce 함수를 통해 channel에 값을 전송하자. 단순히 send를 통해 channel에 전송할 경우에는 exception 발생 시, 수신측에서 timeout과 같은 제약조건을 두지 않는 이상 영원히 suspend되기 때문이다.

fun CoroutineScope.produceNumbers(
	max : Int
) : ReceiveChannel<Int> = produce {
		var x = 0;
        while (x < 5) send(x++)
	}

ReceiveChannel 을 반환값으로 설정한 덕분에, close함수는 호출될 것이다.

Channel Capacity type

  • Unlimited : unlimited capacity buffer, send 함수는 절대 중지되지 않는다.
  • Buffered : Channel.BUFFERED (Default 64). 크기만큼 중지되지 않는다.
  • Rendezvous(default) : 0, 오직 sender와 receiver가 만날때만.
  • Conflated : buffer size 1. 새로운 값이 항상 이전 값을 대체한다. 중지되지 않는다.

On buffer overflow

  • SUSPEND(default) : 버퍼가 가득차면, send 메소드는 중지된다.
  • DROP_OLDEST : 버퍼가 가득차면 가장 오래된 요소를 drop
  • DROP_LATEST : 퍼버가 가득차면 가장 최근 요소를 drop

이러한 option을 사용하기 위해서는 위에서 구현한 produce를 사용할 수 없다. Channel 함수를 사용해서 option을 구현해야한다.

suspend fun main(): Unit = coroutineScope {
	val channel = Channel<Int>(
    	capacity = 2,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
        )
        
        launch {
        	repeat(5) { index ->
            	channel.send(index * 2)
                delay(100)
                println("Sent")
            }
            channel.close()
   }

On undelivered element handler

val channel = Channel<Response>(
		capacity,
        onUndeliveredElement = { resource ->
        	resource.close()  
	         }
		)
    }
    
    val resourceToSend = openResource()
    channel.send(resourceToSend)
    
    vla resourceReceived = channel.receive()
    try{
    	// work with received resource
   	} finally {
    	resourceReceived.close()
    }

위의 메소드는 기본적으로 consume되지 않는 요소들을 감지하고 적절한 액션을 취할 수 있게 해준다.
Flow 객체의 에러를 핸들링(retrying..)하거나 로깅하는데 유용하다.

profile
안녕하세요

0개의 댓글