coroutine channel fan-in fan-out

참치돌고래·2022년 12월 12일
0

Fan-out

multiple coroutines 가 하나의 channel에서 수신받을 때, 적절하게 각 consumer가 comsume 하기 위해서 for-loop를 사용할 수 있다.

fun CoroutineScope.produceNumbers() = produce {
    repeat(10) {
        delay(100)
        send(it)
    }
}

suspend fun CoroutineScope.launchProcessor(
    id : Int,
    channel : ReceiveChannel<Int>
) = launch{
    for (msg in channel) {
        println("#$id received $msg")
    }
}
suspend fun main(): Unit = coroutineScope {
   val channel = produceNumbers()
    repeat(3) { id ->
        delay(10)
        launchProcessor(id, channel)
    }
}

다수의 consumer가 비동기적으로 돌아가고 있다. 출력문을 살펴보면 같은 Id 에 같은 일정한 값이 출력되는 것을 확인할 수 있다. 이러한 출력문이 찍히는 이유가 바로 fan-out 특성 때문이다.

fan-out은 여러게 수신자를 생성하고, channel에서 수신받은 데이터를 복사한다. 이러한 데이터는 채널의 모든 수신자가 사용할 수 있도록 만든다. 각 수신자는 데이터를 수신받고, 다른 수신사와 독립적으로 데이터를 처리한다.이러한 복사본을 통한 처리로 인해, 여러 코루틴이 동일한 데이터를 동시에 처리할 수 있게 된다.

  1. send() 메소드를 통해 queue에 데이터를 전송한다.
  2. receive() 메소드를 통해 queue안에 있는 데이터 복사본을 수신받는다.
  3. 이러한 복사본을 통해 각각의 수신자들은 독립적으로 데이터를 처리할 수 있게 된다.
  4. 수신사들이 데이터에 대한 처리를 끝내면 데이터는 queue에서 삭제된다.

앞 장에서 언급한 Channel 인터페이스가 동기적이고 한번 소모되는 것에 그친다면,
SendChannel, ReceiveChannel 인터페이스를 통해 구현한다면, 비동기적으로 동시에 작업이 가능해진다.

channel 인터페이스에서 boradcast() 메소드 사용

fan-in

fan-out과 반대로 여러개의 channel에서 한 개의 수신자(코루틴)으로 처리하는 것이다.

profile
안녕하세요

0개의 댓글