코틀린 코루틴 3.18 - 핫 데이터와 콜드 데이터

Seogi·2025년 8월 18일

Kotlin

목록 보기
21/27

채널은 값을 핫(hot) 스트림으로 가지지만, 콜드(cold) 스트림이 필요할 때가 있다.

우리가 사용하는 대부분의 데이터 소스는 두 가지 종류로 구분할 수 있다.

  • 핫 스트림: 컬렉션(List, Set), Channel
  • 콜드 스트림: Sequence, Stream, Flow, RxJava 스트림

핫 vs 콜드

핫 데이터 스트림은 데이터를 소비하는 것과 무관하게 원소를 생성한다.

콜드 데이터 스트림은 요청이 있을 때만 작업을 수행하며 아무것도 저장하지 않는다.

리스트(핫)와 시퀀스(콜드)를 사용할 때 그 차이가 드러난다.

fun main() {
    val l = buildList {
        repeat(3) {
            add("User$it")
            println("L: Added User")
        }
    }

    val l2 = l.map {
        println("L: Processing")
        "Processed $it"
    }

    val s = sequence {
        repeat(3) {
            yield("User$it")
            println("S: Added User")
        }
    }

    val s2 = s.map {
        println("S: Processing")
        "Processed $it"
    }
}

// L: Added User
// L: Added User
// L: Added User
// L: Processing
// L: Processing
// L: Processing

핫 데이터 스트림의 빌더와 연산은 즉각 실행된다.

콜드 데이터 스트림에서는 원소가 필요할 때까지 실행되지 않는다.

그 결과 (Sequence, Stream, Flow와 같은) 콜드 데이터 스트림은

  • 무한할 수 있다.
  • 최소한의 연산만 수행한다.
  • (중간에 생성되는 값들을 보관할 필요가 없기 때문에)메모리를 적게 사용한다.

Sequence는 원소를 지연 처리하기 때문에 더 적은 연산을 수행한다.

작동 방식은 간단하다.

  • map이나 filter와 같은 중간 연산은 이전에 만든 시퀀스에 새로운 연산을 첨가할 뿐이다.
  • 최종 연산이 모든 작업을 실행한다.
fun m(i: Int): Int {
    print("m$i ")
    return i * i
}

fun f(i: Int): Boolean {
    print("f$i ")
    return i >= 10
}

fun main() {
    listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .map { m(it) }
        .find { f(it) }
        .let { print(it) }
    // m1 m2 m3 m4 m5 m6 m7 m8 m9 m10 f1 f4 f9 f16 16

    println()

    sequenceOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .map { m(it) }
        .find { f(it) }
        .let { print(it) }
    // m1 f1 m2 f4 m3 f9 m4 f16 16
}

시퀀스의 처리방식은 모든 중간 과정을 계산하고 모든 데이터 처리가 완료된 컬렉션을 반환하는 리스트의 처리방식과 다르다.

따라서, 리스트의 경우 원소의 처리 순서가 달라지며, 컬렉션 처리 과정에서 좀 더 많은 메모리를 필요로 하고, 더 많은 연산을 수행하게 된다.

fun m(i: Int): Int {
    print("m$i ")
    return i * i
}

fun main() {
    val l = listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .map { m(it) }         // m1 m2 m3 m4 m5 m6 m7 m8 m9 m10

    println(l)                 // [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
    println(l.find { it > 10 }) // 16
    println(l.find { it > 10 }) // 16


    println(l.find { it > 10 }) // 16

    val s = sequenceOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .map { m(it) }

    println(s.toList())
    // [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

    println(s.find { it > 10 }) // m1 m2 m3 m4 16
    println(s.find { it > 10 }) // m1 m2 m3 m4 16
    println(s.find { it > 10 }) // m1 m2 m3 m4 16
}

핫 데이터 스트림

  • 항상 사용 가능한 상태이다(각 연산이 최종 연산이 될 수 있다).
  • 여러 번 사용되었을 때 매번 결과를 다시 계산할 필요가 없다.

자바의 Stream은 코틀린의 Sequence와 비슷한 특징을 가지고 있다.
둘 모두 콜드 스트림 데이터이다.

핫 채널, 콜드 플로우

val channel = produce {
    while (true) {
        val x = computeNextValue()
        send(x)
    }
}

val flow = flow {
    while (true) {
        val x = computeNextValue()
        emit(x)
    }
}

플로우를 생성하는 가장 일반적인 방법은 produce 함수와 비슷한 형태인 flow 빌더를 사용하는 것이다.

private fun CoroutineScope.makeChannel() = produce {
    println("Channel started")
    for (i in 1..3) {
        delay(1000)
        send(i)
    }
}

suspend fun main() = coroutineScope {
    val channel = makeChannel()

    delay(1000)
    println("Calling channel...")
    for (value in channel) {
        println(value)
    }

    println("Consuming again...")
    for (value in channel) {
        println(value)
    }
}
Channel started
(1초 후)
Calling channel...
1
(1초 후)
2
(1초 후)
3
Consuming again...

위 예제에서는 버퍼의 크기가 0인 랑데뷰 채널이라 곧 중단되며 수신자가 준비될 때까지 재개되지 않는다. 수신자가 없을 때 데이터 생성이 중단되는 것과 요청할 때 데이터를 생성하는 것의 차이를 알아야 한다.

채널은 핫 데이터 스트림이기에 소비되는 것과 상관없이 값을 생성한 뒤에 가지게 된다.

또한 별도의 코루틴에서 계산을 수행하기에 produceCoroutineScope의 확장 함수로 정의되어 있는 코루틴 빌더가 되어야 한다.

수신자가 얼마나 많은지에 대해선 신경 쓰지 않는다. 각 원소는 단 한 번만 받을 수 있기에, 첫 번째 수신자가 모든 원소를 소비하고 나면 두 번째 소비자는 채널이 비어있는걸 발견하고, 어떤 원소도 발견할 수 없다.

private fun makeFlow() = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(1000)
        emit(i)
    }
}

suspend fun main() = coroutineScope {
    val flow = makeFlow()

    delay(1000)
    println("Calling flow...")
    flow.collect { value -> println(value) }

    println("Consuming again...")
    flow.collect { value -> println(value) }
}

(1초 후)
Calling flow...
Flow started
(1초 후)
1
(1초 후)
2
(1초 후)
3
Consuming again...
Flow started
(1초 후)
1
(1초 후)
2
(1초 후)
3

플로우를 사용해 처리하는 방식은 매우 다르다.

플로우는 콜드 데이터 소스이기 때문에 값이 필요할 때만 생성한다.

따라서 flow는 빌더라기 보단 collect와 같은 최종 연산이 호출될 때 원소가 어떻게 생성되어야 하는지 정의한 것에 불과하다.

그래서 flow 빌더는 CoroutineScope가 필요하지 않고, 빌더를 호출한 최종 연산의 스코프에서 실행된다(coroutineScope와 다른 코루틴 스코프 함수처럼 중단 함수의 continuation 객체로부터 스코프를 가져온다).

플로우의 각 최종 연산은 처음부터 데이터를 처리하기 시작한다.

0개의 댓글