[Kotlin] 대규모 데이터 처리를 위한 Sequence 알아보기 (1)

Kame·2025년 6월 7일
0

Kotlin

목록 보기
5/9
post-thumbnail

들어가며

이 글에서는 Kotlin Sequence의 이해를 돕는 것을 목표로 합니다.

아래 사항 중 하나라도 해당된다면, 이 글이 도움이 될 것입니다.

🤷‍♂️ Collection은 아는데, Sequence는 모르겠다.
🤷‍♂️ Sequence가 어떻게 구현되어 있는지 알고 싶다.

🤷 Collection도 있는데, Sequence를 언제 / 왜 사용해야 하는지 알고 싶다.
🤷‍♀️ Sequence로 작성된 코드를 읽는 방법을 알고 싶다.

선행 지식


컬렉션(Collection)

Kotlin에서 여러 데이터를 담고 처리하기 위해 List, Set, Map 등의 컬렉션을 사용합니다. 이러한 컬렉션들의 특징은 모든 연산을 즉시 평가(Eager Evaluation) 한다는 것입니다.

여기서 ‘즉시 평가’ 한다는 것은 연산자의 호출 시점이 항상 수행 시점과 일치함을 의미합니다. 아래 예시를 살펴보면, 연산자의 호출이 일어날 때마다 즉시 전체 데이터를 처리하고, 중간 결과를 새로운 리스트로 만들고 있습니다.

val numbers = listOf(1, 2, 3, 4, 5)

val doubledNumbers = numbers
	.map { 
        println("map: $it")
  		it * 2
	}.filter {
		println("filter: $it")
		it > 5 
	}

println(doubledNumbers)

map: 1
map: 2
map: 3
map: 4
map: 5
filter: 2
filter: 4
filter: 6
filter: 8
filter: 10
[6, 8, 10]

문제점

컬렉션 연산자들을 연속적으로 호출하는 형태의 체이닝을 활용하면, 각각의 연산마다 전체 원소를 순회하며 새로운 컬렉션을 만들어냅니다. 예를 들어, 다음과 같은 코드에서는 최종적으로 사용되는 컬렉션은 하나임에도 체인의 각 연산자마다 새로운 컬렉션을 생성하며 각각의 원소를 모두 순회하고 있습니다.

val result = (1..1_000_000)
    .map { 
        println("map: $it")
        it * 2 
    }
    .filter { 
        println("filter: $it")
        it % 3 == 0 
    }
    .take(3)
    
println(result)

map: 1
… (map: 2 ~ map: 999999 생략) …
map: 1000000
filter: 2
… (filter: 4 ~ filter: 1999998 생략) …
filter: 2000000
[6, 12, 18]

  • map : 100만 개의 숫자를 모두 두 배로 만든 새로운 리스트 생성
  • filter : 앞선 결과에서 3의 배수만 걸러내어 새로운 리스트 생성
  • take : 앞선 결과에서 처음 3개만 남긴 새로운 리스트 생성

map, filter, take의 호출 시마다 전부 즉시 평가되어, 1,000,000개의 중간 결과를 매 단계마다 생성하게 됩니다. 이로 인해 대량의 데이터로 이뤄진 컬렉션을 활용하면 불필요한 메모리 사용과 성능 저하가 발생할 수 있습니다.


시퀀스(Sequence)

필요한 시점에만 데이터를 활용하거나 계산하는 것은 어떨까?

정의

Kotlin의 Sequence는 Java의 Stream과 유사한 수단으로, 지연 평가(Lazy Evaluation) 원리를 활용하는 특별한 데이터 구조입니다.

필요성

앞서 살펴본 비효율적인 컬렉션 활용 코드를 시퀀스를 사용하여 개선해보겠습니다.

val result = (1..1_000_000).asSequence()
    .map { 
        println("map: $it")
        it * 2 
    }
    .filter { 
        println("filter: $it")
        it % 3 == 0 
    }
    .take(3)
    .toList()
    
println(result)

map: 1
filter: 2
map: 2
filter: 4
map: 3
filter: 6
map: 4
filter: 8
map: 5
filter: 10
map: 6
filter: 12
map: 7
filter: 14
map: 8
filter: 16
map: 9
filter: 18
[6, 12, 18]

컬렉션과 달리 중간 결과를 즉시 만들지 않고 최종 결과가 필요할 때만(toList() 호출) 실제 연산을 수행합니다. 또한 컬렉션을 활용했을 때보다 확연히 적은 양의 처리가 일어나고 있습니다.

mapfilter요소마다 번갈아 호출되고, 조건을 만족하는 요소가 5개 모이면 그 즉시 연산을 멈추고 있습니다. 그 원리를 이어질 내용에서 살펴보겠습니다.

특징

컬렉션과의 공통점

이터레이터(iterator)

컬렉션와 시퀀스는 모두 내부적으로 이터레이터(iterator) 를 통해 원소를 하나씩 순회합니다. 즉, 두 구조 모두 데이터를 차례대로 접근하기 위한 표준적인 방법인 hasNext()next() 메서드를 활용합니다.

  • for (item in collection) 구조는 사실 collection.iterator() 를 내부적으로 호출합니다.
  • 시퀀스는 이 구조를 유지하면서 지연 처리 특성을 더한 형태입니다.

함수형 프로그래밍(Functional Programming)

시퀀스의 프로그래밍 스타일은 컬렉션과 매우 유사합니다.

이는 Kotlin의 컬렉션과 시퀀스 모두 이터레이터 기반으로 구현되어 있기 때문입니다. 따라서 둘 다 map, filter, flatMap 등의 고차 함수를 이용하여 데이터를 선언적(선언형)으로 처리합니다. 이는 for-loop 등을 활용하는 명령형 프로그래밍에 비해 많은 강점을 가져옵니다.

컬렉션과의 차이점

지연 처리(Lazy Evaluation)

컬렉션과 시퀀스의 가장 중요한 차이점은 이터레이터를 어떻게 운용하느냐에 있습니다. 컬렉션은 ‘즉시 평가’, 시퀀스는 ‘지연 평가’ 방식을 활용합니다.

항목컬렉션시퀀스
연산 시점연산자 호출 즉시종단 연산자 호출 시
중간 결과새로운 컬렉션으로 생성중간 결과 없음 (지연 처리)
  • 즉시 평가(Eager Evaluation) : 연산자의 호출이 일어날 때마다 즉시 전체 데이터를 처리하고, 중간 결과를 새로운 컬렉션으로 만듭니다.
  • 지연 평가(Lazy Evaluation) : 최종 결과가 필요할 때까지 연산을 미루며, 실제로 결과를 사용할 때만 연산을 실행합니다.

이와 관련하여 이전의 시퀀스 활용 코드를 다시 살펴보겠습니다.

val result = (1..1_000_000).asSequence()
    .map { 
        println("map: $it")
        it * 2 
    }
    .filter { 
        println("filter: $it")
        it % 3 == 0 
    }
    .take(3)
    .toList()

지연 처리(Lazy Evaluation) 방식을 따르므로 다음과 같이 처리됩니다:

  1. mapfiltertake가 각 원소마다 순차적으로 적용됩니다.
  2. 1부터 시작하여 하나씩 2배로 만들고, 3의 배수인지 확인하고, 상위 5개가 모이는 즉시 중단합니다.
  3. 전체를 순회하지 않고, 필요한 개수만큼만 연산합니다.

하나의 요소에 대해 모든 연산자가 순차적으로 적용되고, 그 다음 요소로 넘어갑니다.

즉, 하나의 값이 전 처리 과정을 거친 후 다음 값으로 넘어가는 방식으로 처리됩니다. 비유하자면, 마치 파이프라인을 보는 듯한 느낌을 받을 수 있습니다.

스트림 처리(Streaming)

시퀀스는 중간 리스트를 만들지 않고 각 요소를 하나씩 순차적으로 처리합니다. 이 덕분에 아래와 같은 이점이 생깁니다:

  • 중간 리스트를 생성하지 않으므로 메모리 효율성 향상
  • 전체 데이터를 만들지 않고 필요한 만큼만 연산하므로 성능 향상
  • 필요한 부분만 처리하고 조기 종료 가능
val result = (1..1_000_000).asSequence()
    .map { it * 2 }
    .filter { it % 3 == 0 }
    .take(5)
    .toList()

map: 1
filter: 2
map: 2
filter: 4
map: 3
filter: 6
map: 4
filter: 8
map: 5
filter: 10
map: 6
filter: 12
map: 7
filter: 14
map: 8
filter: 16
map: 9
filter: 18
[6, 12, 18]

위 코드의 연산 결과를 다시 살펴보면, 실제로는 100만 개를 모두 연산하지 않습니다. 조건을 만족하는 5개 항목만 찾으면 그 즉시 중단됩니다.

중간 연산자와 종단 연산자의 존재

Sequence에서는 중간 연산자와 종단 연산자가 구분되어 있는데, 지연 처리를 가능하도록 하는 큰 특징입니다.

연산자 종류예시설명
중간 연산자map, filter, take, drop, distinct새로운 시퀀스를 반환. 실제 연산은 미수행
종단 연산자toList(), forEach, count, first, last시퀀스를 소비하며 실제 연산 수행 시작

실제로 아래 코드를 실행해보면, 종단 연산자를 호출하지 않았습니다. 따라서 의도한 작업들이 수행되지 않으며, 연산이 수행된 결과 역시 확인할 수 없습니다.

val result = listOf(1, 2, 3).asSequence()
    .map { println("map: $it"); it * 2 }

println(result)

kotlin.sequences.TransformingSequence@66a3ffec

아래 코드처럼 toList()와 같은 종단 연산자를 호출해야 map 내부가 실행됩니다:

val result = listOf(1, 2, 3).asSequence()
    .map { println("map: $it"); it * 2 }
    .toList()

println(result)

map: 1
map: 2
map: 3
[2, 4, 6]


코드 살펴보기

시퀀스를 구현한 코드를 상향식으로 알아봅니다.

Iterator

앞서 컬렉션과 마찬가지로 시퀀스 역시 Iterator 기반으로 동작한다고 설명하였습니다.

Iterator는 컬렉션, 시퀀스의 각 원소들을 순회하기 위하여 사용되는 수단입니다.

public interface Iterator<out T> {
    public operator fun next(): T
    public operator fun hasNext(): Boolean
}

코틀린에서 Iterator는 인터페이스로서, 두 가지 기능을 가지고 있습니다.

  • next() : 다음 원소를 반환하되, 더 순회할 원소가 없다면 NoSuchElementException를 발생시킨다.
  • hasNext() : 순회할 수 있는 원소가 더 있다면 'true'를 반환한다.

참고)
두 함수 모두 operator 함수로 구현되어 있다는 것이 특징입니다. 이러한 특성 덕분에 for (x in y) 구문에서 내부적으로 y.iterator()를 호출하고, 반환된 Iterator에 대해 hasNext()next()를 반복 호출할 수 있도록 합니다. 물론 ****두 함수 이름을 직접적으로 호출하여 기능들을 활용할 수도 있습니다.

Sequence Interface

시퀀스 인터페이스를 살펴보도록 하겠습니다. Iterator를 반환하는 iterator() 함수로부터, Iterator 기반으로 동작한다는 특성을 다시 한 번 확인할 수 있습니다. 해당 Iterator는 시퀀스로부터 각 원소를 가져오는 역할을 수행하게 됩니다.

public interface Sequence<out T> {
    /**
     * 한 번만 호출 가능하며, 두 번째 호출될 때는 예외를 던지도록 구현해야 한다.
     */
    public operator fun iterator(): Iterator<T>
}

Sequence function

그것을 구현한 기본적인 형태의 시퀀스를 살펴보겠습니다. 가짜 생성자(Fake Constructor)를 활용한 구현에서, 시퀀스에 필요한 iterator()를 호출하여 가져오는 것을 확인할 수 있습니다.

@kotlin.internal.InlineOnly
public inline fun <T> Sequence(crossinline iterator: () -> Iterator<T>): Sequence<T> = object : Sequence<T> {
    override fun iterator(): Iterator<T> = iterator()
}

iterator()는 Iterator를 반환하는 람다 함수를 통해 가져오고 있습니다.

Sequence 함수가 활용되는 코드를 통해 iterator를 어떻게 정의하고 있는지 알아보겠습니다.

public fun <T> sequence(
    @BuilderInference block: suspend SequenceScope<T>.() -> Unit
): Sequence<T> = Sequence { iterator(block) }

이 함수는 SequenceScope에서 값을 yield()로 제공받는 suspend 블록을 통해 시퀀스를 만듭니다.

핵심은 suspend 람다를 iterator() 함수의 매개변수로 전달하는 형태로 시퀀스로 만든다는 것입니다. Sequence 구현에 코루틴이 활용되고 있다는 것이 주목할 만한 사실입니다.

여기서 시퀀스를 정의하기 위해 또 다른 iterator() 함수를 활용하여 이터레이터를 정의하고 있는데, 그 구현체도 살펴보겠습니다.

iterator function

public fun <T> iterator(
    @BuilderInference block: suspend SequenceScope<T>.() -> Unit
): Iterator<T> {
    val iterator = SequenceBuilderIterator<T>()
    iterator.nextStep = block.createCoroutineUnintercepted(receiver = iterator, completion = iterator)
    return iterator
}

iterator()에서는 SequenceBuilderIterator라는 특수한 형태의 Iterator를 활용하고 있습니다. 해당 코루틴의 실행 상태를 관리하는 역할을 수행합니다. 값이 필요할 때마다(lazy) block 부분을 수행하며 값을 가져올 수 있게 됩니다.

SequenceBuilderIterator

시퀀스 내부 원리의 핵심이 되는 SequenceBuilderIterator를 살펴보겠습니다.

private class SequenceBuilderIterator<T> :
    SequenceScope<T>(), Iterator<T>, Continuation<Unit> {
    // ...
}

해당 클래스는 세 가지를 상속/구현하고 있습니다. 각각의 주요 기능은 다음과 같습니다.

역할설명
SequenceScopeyield(), yieldAll()로 원소 생산
IteratorhasNext()next()로 순회 지원
Continuation코루틴 실행 맥락(Context) 유지

SequenceScope에서 yield()yieldAll()로 데이터 생산하기

추상 클래스로서 SequenceScope는 다음과 같은 구성을 보입니다.

public abstract class SequenceScope<in T> internal constructor() {
    /**
     * Yields a value to the [Iterator] being built and suspends
     * until the next value is requested.
     *
     * @sample samples.collections.Sequences.Building.buildSequenceYieldAll
     * @sample samples.collections.Sequences.Building.buildFibonacciSequence
     */
    public abstract suspend fun yield(value: T)

    /**
     * Yields all values from the `iterator` to the [Iterator] being built
     * and suspends until all these values are iterated and the next one is requested.
     *
     * The sequence of values returned by the given iterator can be potentially infinite.
     *
     * @sample samples.collections.Sequences.Building.buildSequenceYieldAll
     */
    public abstract suspend fun yieldAll(iterator: Iterator<T>)

    /**
     * Yields a collections of values to the [Iterator] being built
     * and suspends until all these values are iterated and the next one is requested.
     *
     * @sample samples.collections.Sequences.Building.buildSequenceYieldAll
     */
    public suspend fun yieldAll(elements: Iterable<T>) {
        if (elements is Collection && elements.isEmpty()) return
        return yieldAll(elements.iterator())
    }

    /**
     * Yields potentially infinite sequence of values  to the [Iterator] being built
     * and suspends until all these values are iterated and the next one is requested.
     *
     * The sequence can be potentially infinite.
     *
     * @sample samples.collections.Sequences.Building.buildSequenceYieldAll
     */
    public suspend fun yieldAll(sequence: Sequence<T>): Unit = yieldAll(sequence.iterator())
}

주목할만 한 점은 yield와 yieldAll이 suspend function으로 정의되어 있다는 것입니다.

즉, 시퀀스에서는 내부적으로 코루틴을 활용하고 있음을 알 수 있습니다.

SequenceBuilderIterator 측에서 yield와 yieldAll은 다음과 같이 구현되어 있습니다.

  • 특정 값을 생산하고 코루틴을 일시 중단합니다.
  • 이후 next()가 호출되면 이어서 다음 값을 계산합니다.
override suspend fun yield(value: T) {
    nextValue = value
    state = State_Ready
    return suspendCoroutineUninterceptedOrReturn { c ->
        nextStep = c
        COROUTINE_SUSPENDED
    }
}

override suspend fun yieldAll(iterator: Iterator<T>) {
    if (!iterator.hasNext()) return
    nextIterator = iterator
    state = State_ManyReady
    return suspendCoroutineUninterceptedOrReturn { c ->
        nextStep = c
        COROUTINE_SUSPENDED
    }
}

상태 머신 기반의 hasNext()next()

  • 내부적으로 코루틴이 일시 중단되고 다시 재개되며, 상태 전이를 관리합니다.
  • 값을 yield()하면 nextValue에 저장되고 state = State_Ready로 설정되어 다음 호출에서 꺼낼 수 있게 됩니다.
override fun hasNext(): Boolean {
    while (true) {
        when (state) {
            State_NotReady -> {} // 아직 준비 안됨
            State_Ready -> return true // 준비 완료
            State_Done -> return false // 종료
            ...
        }

        // 상태를 변경하면서 다음 값을 준비
        state = State_Failed
        val step = nextStep!!
        nextStep = null
        step.resume(Unit)
    }
}

구현 방식을 다음과 같이 요약해 볼 수 있습니다.

  • sequence { ... }으로 내부적으로 코루틴을 포함한 Iterator 생성
  • yield()를 통해 순차적으로 값 생성
  • hasNext()next() 호출될 때마다 코루틴이 재개되어 새로운 값 생성
  • 상태 머신으로 지연 처리 + 상태 유지 동시 수행

컬렉션 vs 시퀀스, 어떤 때 사용해야 할까?

메모리 효율을 저해하는 요소를 극복하기 위한

  • 데이터가 매우 많을 때(기준)
  • 일부 데이터만 필요할 때
  • 중간 결과를 저장하지 않아도 될 때

하지만 모든 경우에 Sequence가 정답은 아닙니다. "지연 평가가 정말 필요한가?", "데이터가 정말 많은가?" 등을 먼저 고려해 보아야 합니다. 예를 들어, 아래의 경우에는 시퀀스 활용이 오히려 오버헤드로 이어지며, 컬렉션을 활용하는 것이 유리합니다.

  • 데이터가 적을 때
  • 모든 데이터를 반드시 사용해야 할 때

다음 편에 계속

다음 편에서는 Sequence를 활용한 코드를 읽는 방법을 알아보도록 하겠습니다.

참고 자료

https://kotlinlang.org/docs/sequences.html

profile
Software Engineer

0개의 댓글