Stream에는 크게 Cold Stream과 Hot Stream이 있다.
두 유형을 정의하는 기준은 다음과 같다.
1. 데이터 생성 위치
2. 생성자가 발행한 데이터를 동시에 receiver(소비자들)이 수신 가능 여부
3. 스트림이 값을 생산하는 시점
// flow 내부에서 데이터 생성
val coldStream1: Flow<Int> = flow {
emit(1)
emit(2)
}
// flow 내부에서 데이터 생성
val coldStream1: Flow<Int> = flow {
emit(1)
emit(2)
}
coldStream1.collect { println(it) }
flow는 소비를 시작하는 함수인 collect, reduce, fist 등의 종단연산자가 호출되지 않으면 데이터를 생산하지 않는다. 중간연산자(map, onEach, filter ..)도 종단연산자가 호출되어야 실행된다.
val coldStream = flow<Int> {
emit(1)
emit(2)
}
coldStream.collect { v -> println(v) }
coldStream.collect { v -> println(v) }
위 코드에서 보면 flow를 여러 곳에서 collect할 수 있다.
하지만 collect를 할 때마다 새로운 block이 실행된다. 즉 이전 구독과는 독립적, 하나의 생산자에 하나의 소비자만 존재한다는 것을 의미한다.
CoroutineScope(Dispatchers.Default).launch {
val channel = Channel<Int>()
launch {
// 외부에서 데이터 생성
channel.send(1)
}
launch {
channel.send(2)
}
channel.consumeEach { v -> println(v) }
}
위에서 channel의 send를 통해 데이터를 생성한다.
flow는 내부에서 데이터를 생성했지만, hot stream은 외부에서 데이터를 생성해준다.
val channel = Channel<Int>(5)
channel.trySend(1)
println(channel.isEmpty) // false
channel에 trySend로 값을 넣고 따로 소비하지 않아도 channel이 비어있지 않음을 확인할 수 있다.
fun main() {
runBlocking {
// channel에 일정 간격으로 데이터를 send 해줌
val receiveChannel = produce {
println(coroutineContext)
var count = 0
while (true) {
send(count++)
delay(100)
}
}
launch {
// 코루틴 빌더로 3개의 코루틴 생성
repeat(3) { index ->
launch {
consumeNumbers(index, receiveChannel)
}
}
}
}
}
fun CoroutineScope.consumeNumbers(index: Int, receiveChannel: ReceiveChannel<Int>) = launch {
receiveChannel.consumeEach {
// 각 coroutine 내부에 ReceiveChannel을 통해 수신한 데이터 출력
println("${index}가 ${it}를 수신")
}
}
produce?
CoroutineScope의 확장함수로 매개변수 중에 block이 존재하고 수신객체는 ProducerScope이다. 이때 ProducerScope은 CoroutineScope, SendChannel을 구현하고 있는 클래스여서 Coroutine을 생성하면서 SendChannel의 역할을 할 수 있으며 ReciveChannel을 반환한다.
각각 다른 coroutine에 존재하는 receiver가 하나의 데이터 스트림을 소비하고 있다.
또한 소비자가 소비를 시작한 시점부터 생산된 데이터를 소비한다.
cold stream과 hot stream을 비유해서 정리하자면..
Cold Stream은 CD Player, Hot Stream은 Radio이다.
Cold Stream : CD Player
CD Player는 각 CD 내부에 음악이 저장되어 있다(데이터가 내부에서 생성).
사용자마다 음악을 재생하지만(소비), 모든 사용자가 하나의 CD를 공유하지 않는다(UniCast).
Hot Stream : Radio
Radio는 방송국에서 프로그램을 제작하고(외부에서 데이터 생성) 청취자들에게 동시에 송신한다(MultiCast). 중간부터 청취하기 시작한 사람들은 그 시점부터 청취 가능하다(생산자가 소비자의 소비를 신경쓰지 않고 생산).