[Kotlin] Flow - Channel,Buffer

KSang·2024년 5월 7일
0

TIL

목록 보기
98/101

Channel

Kotlin의 채널은 큐와 유사한 개념이다. 코루틴 간 데이터를 전송하고 수신하는 통신 수단을 제공한다.

채널과 관련 있는 코루틴으로 Producer와 Consumer가 있다.

  • 생산자(Producer): 데이터를 채널에 보내는 코루틴
  • 소비자(Consumer): 채널에서 데이터를 받는 코루틴
suspend fun main(): Unit = coroutineScope {

    val channel = produce<Int> {
        println("Sending 10")
        send(10)

        println("Sending 20")
        send(20)
    }

    launch {
        channel.consumeEach { receivedValue ->
            println("Consumer1: $receivedValue")
        }
    }

    launch {
        channel.consumeEach { receivedValue ->
            println("Consumer2: $receivedValue")
        }
    }
}

produce는 채널로 데이터를 보내는 스트림을 생성하기위해 새 코루틴을 시작하는 코루틴 빌더다.
여기서 정수형 채널을 초기화하고 숫자 10,20을 보낸다.
이후 produce는 데이터를 수신할 수 있는 ReceiveChanne를 반환한다.
send함수를 이용해 코루틴 내에서 데이터를 채널로 보낸다.
데이터를 보낼 수 있을때까지 코루틴은 일시 중지된다.

이후 consumeEach 를 통해 채널에 수신된 모든 요소를 처리하기 시작한다.

코틀린 채널은 기본적으로 브로드 캐스트 채널이 아닌데, 그래서 요소가 다른 소비자에 의해 소비되면, 다른 소비자는 소비된 요소를 받지 못한다.

이코드애선 Consumer1: 에서 10을 받아 Consumer2 에선 10을 받지 못한다.

Consumer1에서 수신되는 데이터를 처리하는 동안 Consumer2에서 데이터를 받아 둘이 숫자를 10,20나눠 갖는다.

만약 consumeEach하는 스코프가 Consumer1밖에 없으면 Consumer1에서 10데이터를 처리하고 이어서 20의 데이터도 처리한다.

suspend fun main(): Unit = coroutineScope {

    val channel = Channel<Int>()

    launch {
        channel
            .consumeAsFlow()
            .collect {
                println("Process $it")
                delay(1000)
                println("$it processed")
            }
    }

    launch {

        delay(100)

        // 1 should be processed
        channel.trySend(1)
        println("sharedFlow emits 1")

        // 2 should not be processed since downstream is busy
        channel.trySend(2)
        println("sharedFlow emits 2")

        // 3 should be processed again
        delay(2000)
        channel.trySend(3)
        println("sharedFlow emits 3")
    }
}

다음 함수는 채널을 만들고 Send해서 요소를 관찰한다.

여기서 channel이란 이름의 정수용 채널을 생성한다.

consumeAsFlow로 collect 가능한 Flow로 변환한다.

기본적으로 Buffered채널을 사용해, 버퍼의 크기가 1이다.

그렇게되면 launch에서 trySend(1)을 보내고 채널에서 로직이 수행되는대 delay가 1초임으로 trySend(2)는 실패하게된다.

왜냐면 버퍼의 크기가 1이며 trySend는 채널의 상태를 확인하지 않고 즉시 반환 되기때문에, 채널이 더이상 항목을 수용 할 수 없는 경우 즉시 실패하게 된다.

send같은경우 데이터가 처리되지 않았다면 코루틴을 블로킹하지 않고 일시중단해 전부 처리된다.

Buffer

suspend fun main() = coroutineScope {

    val flow = flow {
        repeat(5) {
            println("Emitter:    Start Cooking Pancake $it")
            delay(100)
            println("Emitter:    Pancake $it ready!")
            emit(it)
        }
    }.buffer()

    flow.collect {
        println("Collector:  Start eating pancake $it")
        delay(300)
        println("Collector:  Finished eating pancake $it")
    }
}

팬케이크를 만들고 collect에서 팬케이크를 먹는 함수가 있다.

함수를 실행하면 이런 결과가 나온다.

Emitter:    Start Cooking Pancake 0
Emitter:    Pancake 0 ready!
Collector:  Start eating pancake 0
Emitter:    Start Cooking Pancake 1
Emitter:    Pancake 1 ready!
Emitter:    Start Cooking Pancake 2
Emitter:    Pancake 2 ready!
Emitter:    Start Cooking Pancake 3
Collector:  Finished eating pancake 0
Collector:  Start eating pancake 1
Emitter:    Pancake 3 ready!
Emitter:    Start Cooking Pancake 4
Emitter:    Pancake 4 ready!
Collector:  Finished eating pancake 1
Collector:  Start eating pancake 2
Collector:  Finished eating pancake 2
Collector:  Start eating pancake 3
Collector:  Finished eating pancake 3
Collector:  Start eating pancake 4
Collector:  Finished eating pancake 4

팬케이크를 다먹을때까지 기다리지않고 바로 다음 팬캐이크를 준비한다.

스코프에서 아직 println 로직을 처리중인대 어째서 emit되어 관찰이 될까?

.buffer() 연산자와 연관이 있는데, 여기서 콜렉터가 처리를 끝내기를 기다리지 않고 방출자가 미리 값을 방출할 수 있도록 해준다.

방출자는 지연 없이 계속해서 팬케이크를 준비할 수 있으며, 별도의 버퍼에 저장된다.

버퍼는 capcity와 onBufferOverflow를 설정해줄 수 있다.

suspend fun main() = coroutineScope {

    val flow = flow {
        repeat(5) {
            val pancakeIndex = it + 1
            println("Emitter:    Start Cooking Pancake $pancakeIndex")
            delay(100)
            println("Emitter:    Pancake $pancakeIndex ready!")
            emit(pancakeIndex)
        }
    }.buffer(capacity = 1, onBufferOverflow = BufferOverflow.SUSPEND)

    flow.collect {
        println("Collector:  Start eating pancake $it")
        delay(300)
        println("Collector:  Finished eating pancake $it")
    }
}

capacity 버퍼의 크기를 하나로 제한한다.

BufferOverflow.SUSPEN는 버퍼가 가득 찬 경우 방출자 emitter를 일시 중단하고, 버퍼에 공간이 생길 때 까지 기다리게 한다.

flow가 콜렉터의 속도를 초과해 데이터를 방출하지 않도록 보장한다.

함수를 실행해보자

Emitter:    Start Cooking Pancake 1
Emitter:    Pancake 1 ready!
Collector:  Start eating pancake 1
Emitter:    Start Cooking Pancake 2
Emitter:    Pancake 2 ready!
Emitter:    Start Cooking Pancake 3
Emitter:    Pancake 3 ready!
Collector:  Finished eating pancake 1
Collector:  Start eating pancake 2
Emitter:    Start Cooking Pancake 4
Emitter:    Pancake 4 ready!
Collector:  Finished eating pancake 2
Collector:  Start eating pancake 3
Emitter:    Start Cooking Pancake 5
Emitter:    Pancake 5 ready!
Collector:  Finished eating pancake 3
Collector:  Start eating pancake 4
Collector:  Finished eating pancake 4
Collector:  Start eating pancake 5
Collector:  Finished eating pancake 5

첫번째 케이크를 만들고 collecter에 요소를 방출했다.

케이크를 먹는동안 케이크2가 만들어졌는데, collecter가 아직 팬케이크를 먹고 있으니 buffer에 저장한다.

이후 케이크3이 만들어졌지만 buffer에 공간이 없어서 emit을 실행하지 않고 버퍼에 공간이 생길 때 까지 대기한다.

이후 케이크1을 다먹고 케이크2가 collecter로 이동해 buffer에 빈공간이 생겨 emit을 수행한다.

BufferOverflow는 SUSPEND말고도 지원하는 기능이 더 있다.

DROP_OLDEST를 사용하면 버퍼에 공간이 없을때 버퍼를 차지하고 있는 오래된 요소를 없에고 방출한 다음 로직을 수행한다.

DROP_LATEST는 그와 반대로 버퍼에 공간이 없으면 버퍼에 있는 가장 최근 아이템을 삭제하고 새 아이템을 그 자리에 넣는다.

capacipy의 경우 명시 하지 않을 시 기본 크기는 16인데, UNLIMITED를 사용해 제한없이 사용할 수 있다.

0개의 댓글