채널은 값을 핫(hot) 스트림으로 가지지만, 콜드(cold) 스트림이 필요할 때가 있다.
우리가 사용하는 대부분의 데이터 소스는 두 가지 종류로 구분할 수 있다.
핫 데이터 스트림은 데이터를 소비하는 것과 무관하게 원소를 생성한다.
콜드 데이터 스트림은 요청이 있을 때만 작업을 수행하며 아무것도 저장하지 않는다.
리스트(핫)와 시퀀스(콜드)를 사용할 때 그 차이가 드러난다.
fun main() {
val l = buildList {
repeat(3) {
add("User$it")
println("L: Added User")
}
}
val l2 = l.map {
println("L: Processing")
"Processed $it"
}
val s = sequence {
repeat(3) {
yield("User$it")
println("S: Added User")
}
}
val s2 = s.map {
println("S: Processing")
"Processed $it"
}
}
// L: Added User
// L: Added User
// L: Added User
// L: Processing
// L: Processing
// L: Processing
핫 데이터 스트림의 빌더와 연산은 즉각 실행된다.
콜드 데이터 스트림에서는 원소가 필요할 때까지 실행되지 않는다.
그 결과 (Sequence, Stream, Flow와 같은) 콜드 데이터 스트림은
Sequence는 원소를 지연 처리하기 때문에 더 적은 연산을 수행한다.
작동 방식은 간단하다.
map이나 filter와 같은 중간 연산은 이전에 만든 시퀀스에 새로운 연산을 첨가할 뿐이다.fun m(i: Int): Int {
print("m$i ")
return i * i
}
fun f(i: Int): Boolean {
print("f$i ")
return i >= 10
}
fun main() {
listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.map { m(it) }
.find { f(it) }
.let { print(it) }
// m1 m2 m3 m4 m5 m6 m7 m8 m9 m10 f1 f4 f9 f16 16
println()
sequenceOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.map { m(it) }
.find { f(it) }
.let { print(it) }
// m1 f1 m2 f4 m3 f9 m4 f16 16
}
시퀀스의 처리방식은 모든 중간 과정을 계산하고 모든 데이터 처리가 완료된 컬렉션을 반환하는 리스트의 처리방식과 다르다.
따라서, 리스트의 경우 원소의 처리 순서가 달라지며, 컬렉션 처리 과정에서 좀 더 많은 메모리를 필요로 하고, 더 많은 연산을 수행하게 된다.
fun m(i: Int): Int {
print("m$i ")
return i * i
}
fun main() {
val l = listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.map { m(it) } // m1 m2 m3 m4 m5 m6 m7 m8 m9 m10
println(l) // [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
println(l.find { it > 10 }) // 16
println(l.find { it > 10 }) // 16
println(l.find { it > 10 }) // 16
val s = sequenceOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.map { m(it) }
println(s.toList())
// [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
println(s.find { it > 10 }) // m1 m2 m3 m4 16
println(s.find { it > 10 }) // m1 m2 m3 m4 16
println(s.find { it > 10 }) // m1 m2 m3 m4 16
}
핫 데이터 스트림은
자바의 Stream은 코틀린의 Sequence와 비슷한 특징을 가지고 있다.
둘 모두 콜드 스트림 데이터이다.
val channel = produce {
while (true) {
val x = computeNextValue()
send(x)
}
}
val flow = flow {
while (true) {
val x = computeNextValue()
emit(x)
}
}
플로우를 생성하는 가장 일반적인 방법은 produce 함수와 비슷한 형태인 flow 빌더를 사용하는 것이다.
private fun CoroutineScope.makeChannel() = produce {
println("Channel started")
for (i in 1..3) {
delay(1000)
send(i)
}
}
suspend fun main() = coroutineScope {
val channel = makeChannel()
delay(1000)
println("Calling channel...")
for (value in channel) {
println(value)
}
println("Consuming again...")
for (value in channel) {
println(value)
}
}
Channel started
(1초 후)
Calling channel...
1
(1초 후)
2
(1초 후)
3
Consuming again...
위 예제에서는 버퍼의 크기가 0인 랑데뷰 채널이라 곧 중단되며 수신자가 준비될 때까지 재개되지 않는다. 수신자가 없을 때 데이터 생성이 중단되는 것과 요청할 때 데이터를 생성하는 것의 차이를 알아야 한다.
채널은 핫 데이터 스트림이기에 소비되는 것과 상관없이 값을 생성한 뒤에 가지게 된다.
또한 별도의 코루틴에서 계산을 수행하기에 produce는 CoroutineScope의 확장 함수로 정의되어 있는 코루틴 빌더가 되어야 한다.
수신자가 얼마나 많은지에 대해선 신경 쓰지 않는다. 각 원소는 단 한 번만 받을 수 있기에, 첫 번째 수신자가 모든 원소를 소비하고 나면 두 번째 소비자는 채널이 비어있는걸 발견하고, 어떤 원소도 발견할 수 없다.
private fun makeFlow() = flow {
println("Flow started")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
suspend fun main() = coroutineScope {
val flow = makeFlow()
delay(1000)
println("Calling flow...")
flow.collect { value -> println(value) }
println("Consuming again...")
flow.collect { value -> println(value) }
}
(1초 후)
Calling flow...
Flow started
(1초 후)
1
(1초 후)
2
(1초 후)
3
Consuming again...
Flow started
(1초 후)
1
(1초 후)
2
(1초 후)
3
플로우를 사용해 처리하는 방식은 매우 다르다.
플로우는 콜드 데이터 소스이기 때문에 값이 필요할 때만 생성한다.
따라서 flow는 빌더라기 보단 collect와 같은 최종 연산이 호출될 때 원소가 어떻게 생성되어야 하는지 정의한 것에 불과하다.
그래서 flow 빌더는 CoroutineScope가 필요하지 않고, 빌더를 호출한 최종 연산의 스코프에서 실행된다(coroutineScope와 다른 코루틴 스코프 함수처럼 중단 함수의 continuation 객체로부터 스코프를 가져온다).
플로우의 각 최종 연산은 처음부터 데이터를 처리하기 시작한다.