[Kotlin] Cold Stream vs Hot Stream

유정현·2024년 5월 23일
0

Stream

Stream에는 크게 Cold Stream과 Hot Stream이 있다.
두 유형을 정의하는 기준은 다음과 같다.
1. 데이터 생성 위치
2. 생성자가 발행한 데이터를 동시에 receiver(소비자들)이 수신 가능 여부
3. 스트림이 값을 생산하는 시점

Cold Stream(Flow)

  1. 데이터가 스트림 내부에서 생성
    flow builder에는 flow, flowOf, asFlow가 있다.
    아래 코드는 flow로 데이터가 내부에서 emit 되고 있음을 확인할 수 있다.
// flow 내부에서 데이터 생성
val coldStream1: Flow<Int> = flow {
    emit(1)
    emit(2)
}
  1. 소비자가 소비를 시작할 때 데이터를 생산
    // flow 내부에서 데이터 생성
    val coldStream1: Flow<Int> = flow {
        emit(1)
        emit(2)
    }

    coldStream1.collect { println(it) }

flow는 소비를 시작하는 함수인 collect, reduce, fist 등의 종단연산자가 호출되지 않으면 데이터를 생산하지 않는다. 중간연산자(map, onEach, filter ..)도 종단연산자가 호출되어야 실행된다.

  1. 하나의 생산자에 하나의 소비자만 존재(UniCast)
    val coldStream = flow<Int> {
        emit(1)
        emit(2)
    }

    coldStream.collect { v -> println(v) }

    coldStream.collect { v -> println(v) }

위 코드에서 보면 flow를 여러 곳에서 collect할 수 있다.
하지만 collect를 할 때마다 새로운 block이 실행된다. 즉 이전 구독과는 독립적, 하나의 생산자에 하나의 소비자만 존재한다는 것을 의미한다.

Hot Stream(StateFlow, SharedFlow)

  1. 데이터가 외부에서 생성
CoroutineScope(Dispatchers.Default).launch { 
        val channel = Channel<Int>()
        
        launch { 
            // 외부에서 데이터 생성
            channel.send(1)
        }
        
        launch { 
            channel.send(2)
        }
        
        channel.consumeEach { v -> println(v) }
    }

위에서 channel의 send를 통해 데이터를 생성한다.
flow는 내부에서 데이터를 생성했지만, hot stream은 외부에서 데이터를 생성해준다.

  1. 소비자가 데이터에 접근하지 않더라도 계속 데이터 생산(소비를 신경쓰지 않고 생산)
val channel = Channel<Int>(5)

    channel.trySend(1)
    println(channel.isEmpty) // false

channel에 trySend로 값을 넣고 따로 소비하지 않아도 channel이 비어있지 않음을 확인할 수 있다.

  1. 하나의 생산자에 여러 소비자에게 데이터 소비 (MultiCast)
fun main() {
    runBlocking {
        // channel에 일정 간격으로 데이터를 send 해줌
        val receiveChannel = produce {
            println(coroutineContext)
            var count = 0
            while (true) {
                send(count++)
                delay(100)
            }
        }

        launch {
            // 코루틴 빌더로 3개의 코루틴 생성
            repeat(3) { index ->
                launch {
                    consumeNumbers(index, receiveChannel)
                }
            }
        }
    }
}

fun CoroutineScope.consumeNumbers(index: Int, receiveChannel: ReceiveChannel<Int>) = launch {
    receiveChannel.consumeEach {
    	// 각 coroutine 내부에 ReceiveChannel을 통해 수신한 데이터 출력
        println("${index}${it}를 수신")
    }
}

produce?
CoroutineScope의 확장함수로 매개변수 중에 block이 존재하고 수신객체는 ProducerScope이다. 이때 ProducerScope은 CoroutineScope, SendChannel을 구현하고 있는 클래스여서 Coroutine을 생성하면서 SendChannel의 역할을 할 수 있으며 ReciveChannel을 반환한다.

각각 다른 coroutine에 존재하는 receiver가 하나의 데이터 스트림을 소비하고 있다.
또한 소비자가 소비를 시작한 시점부터 생산된 데이터를 소비한다.

정리

cold stream과 hot stream을 비유해서 정리하자면..
Cold Stream은 CD Player, Hot Stream은 Radio이다.

  • Cold Stream : CD Player
    CD Player는 각 CD 내부에 음악이 저장되어 있다(데이터가 내부에서 생성).
    사용자마다 음악을 재생하지만(소비), 모든 사용자가 하나의 CD를 공유하지 않는다(UniCast).

  • Hot Stream : Radio
    Radio는 방송국에서 프로그램을 제작하고(외부에서 데이터 생성) 청취자들에게 동시에 송신한다(MultiCast). 중간부터 청취하기 시작한 사람들은 그 시점부터 청취 가능하다(생산자가 소비자의 소비를 신경쓰지 않고 생산).


참고

Medium

0개의 댓글

관련 채용 정보