
코루틴을 활용하면, 비동기 작업을 쉽고 효율적으로 처리할 수 있습니다. 코루틴은 분명 개발자에게 큰 편리함을 제공하지만, 실제 개발 환경에서는 그보다 훨씬 복잡한 요구사항들이 존재합니다. 따라서 상황에 따라 코루틴만으로는 모든 문제를 해결하기 어려울 때도 있습니다. 따라서 코루틴을 기반으로 하면서도 복잡한 흐름을 더 쉽게 다룰 수 있는 다른 도구들이 필요한데, 본 시리즈에서는 그런 어려움을 해결해줄 수 있는 수단을 살펴봅니다.
이 글을 원활히 이해하기 위해, 코루틴과 데이터 스트림 관련 사전 지식이 필요합니다.
코루틴을 활용해 비동기적으로 여러 차례 데이터를 공급해야 한다면?
코루틴도 결국 본질적으로 함수입니다. 함수는 일반적으로 다음과 같은 흐름을 따릅니다.
(입력 →) 처리 → 출력
일반적인 함수는 결국 한 번 호출하면 한 번의 결과만 반환하는 구조입니다. 코루틴 역시 중단되었다가 다시 실행될 수는 있지만, 입력 및 처리 과정을 거쳐 한 번 값을 반환한다는 사실은 바뀌지 않습니다.
fun main() = runBlocking {
// Coroutine returning single value!
val value: Deferred<String> = async {
fetchSingleValue(1)
}
println(value.await())
}
suspend fun fetchSingleValue(input: Int): String {
delay(1000) // calculation...
val result = "calculated $input"
return result
}
calculated 1
여러 데이터가 필요한 경우라면?
가장 단순하게, 같은 함수를 여러 차례 호출하는 방식을 생각해볼 수 있습니다.
fun main() = runBlocking {
repeat(3) { index ->
// Coroutine returning single value!
val value: Deferred<String> = async {
fetchSingleValue(index + 1)
}
println(value.await())
}
}
suspend fun fetchSingleValue(input: Int): String {
delay(1000) // calculation...
val result = "calculated $input"
return result
}
calculated 1
calculated 2
calculated 3
이 방식은 호출자 측에서 직접 상태 관리를 해주어야 한다는 번거로움이 존재합니다.
위 예시에서는 반복 횟수(3)와 인덱스 기반으로 활용할 값(index + 1)을 호출자에서 직접 관리하고 있습니다. 이는 데이터 생산에 필요한 컨텍스트를 소비자가 직접 처리하고 있다는 뜻입니다.
한 번의 호출로도 여러 데이터를 제공하고 처리할 수 있는 구조가 좋을 것입니다.
대안으로, 배열 혹은 컬렉션 등의 형태로 반환하는 방법을 생각해 볼 수 있습니다. 리스트에 원하는 만큼의 데이터를 담아 반환하는 방식이라면, 여러 데이터를 한 번에 전달할 수 있기 때문입니다.
suspend fun fetchMultipleValues(size: Int): List<String> {
// calculation!
val result = List(size) { index ->
delay(1000)
"calculated $index"
}
return result
}
데이터를 활용하는 측에서도, 컬렉션 함수를 활용하여 데이터를 처리할 때 선언적으로 가공하는 것이 가능할 것입니다.
// in a coroutine scope or a suspend function
val values: List<String> = fetchMultipleValues(10)
val newValues = values
.map { /* ... */ }
.filter { /* ... */ }
그러나 이 방식에도 한계가 있습니다. 하나의 컬렉션을 완성시키기 위해서는 모든 데이터를 한꺼번에 만들어야 합니다. 모든 연산을 즉시(eagerly) 평가한다는 컬렉션의 특징이 기능 요구 사항에 따라 적절하지 않을 수 있습니다.
예를 들어, 한 원소의 데이터를 처리하는 데 1초라는 시간이 소요된다고 가정해보겠습니다.
// in a coroutine scope or a suspend function
val values: List<String> = fetchMultipleValues(10)
val newValues = values
.map {
// blocking operation(1 sec ~)
asyncWork(it)
}
.filter { /* ... */ }
suspend fun asyncWork(value: String) { /* ... */ }
이 경우, 모든 데이터가 준비되어야만 활용할 수 있는 컬렉션의 특성상 전체 원소들이 처리되기 전까지는 어떤 결과도 얻을 수 없습니다. 이로 인해 초기 응답 지연(latency)이 발생하게 됩니다. 사용자 경험이 중요한 UI 환경이나, 실시간성이 요구되는 상황에서는 이러한 지연이 치명적인 단점이 될 수 있습니다.
예를 들어 아래의 경우들과 같이 각 원소가 만들어지는 즉시 활용되어야만 하는 경우에도 컬렉션을 활용하면, 소비하는 측에 처음으로 데이터가 전달되기까지 많은 시간이 걸릴 수 있습니다.
이 문제를 해결하기 위해, 데이터 스트림을 활용 해볼 수 있습니다.
실제로 Kotlin에서는 초창기(버전 1.0)부터 이러한 작업을 수행할 수 있도록, Java의 스트림과 유사한 시퀀스(Sequence) 라는 개념이 도입되어 있었습니다. 시퀀스는 데이터를 지연 계산(lazy evaluation) 하면서, 하나씩 순차적으로 처리할 수 있게 해줍니다. 덕분에 전체 데이터를 모두 메모리에 올려둘 필요 없이, 필요할 때마다 하나씩 가져와 처리할 수 있습니다.
// in a coroutine scope or a suspend function
val values: List<String> = fetchMultipleValues(10)
val newValues = values
.asSequence() // convert List into Sequence
.map { /* ... */ }
.filter { /* ... */ }
하지만 현재 상황에서 시퀀스를 그대로 활용할 수는 없는데, 바로 동기(synchronous) 흐름을 기반으로 구현되어 있기 때문입니다.
시퀀스는 내부적으로 데이터를 생산하고 소비하는 과정이 동기적으로 이루어지도록 설계되었습니다. 따라서 시퀀스의 각 원소를 생성, 변형, 소비하는 과정에서는 비동기 작업을 수행할 수 없습니다.
따라서 아래와 같이 Sequence의 원소 처리 과정에서 비동기 작업을 시도하면 컴파일 오류가 발생하게 됩니다.
// in a coroutine scope or a suspend function
val values: List<String> = fetchMultipleValues(10)
val newValues = values
.asSequence()
.map {
// compile error!
// asyncWork(it)
}
.filter { /* ... */ }
프로그램 개발 중 다음과 같은 상황을 마주했다고 가정해 보겠습니다.
데이터 생산의 끝이 정해져 있지 않고, 생산된 데이터의 실시간 처리 및 활용이 필요한 상황
데이터를 활용하는 과정에서 비동기 작업이 필요한 상황
만약 컬렉션과 시퀀스를 활용한다면, 다음과 같은 어려움을 겪게 될 것입니다.
이 문제들을 극복하기 위해 다음 두 가지 조건을 만족하는 새로운 접근 방식이 필요합니다.
비동기 작업, 지연 계산이 모두 수행 가능한 특성이 적용된 데이터 스트림은 어떻게 구현할 수 있을지, 이어질 내용에서 살펴보도록 하겠습니다.
콘솔 기반의 간단한 프로그램을 직접 만들어보며 비동기 데이터 스트림을 이해해 봅시다. 일상 생활 속, 우리가 정류장에서 버스를 기다릴 때 확인하는 도착 정보 시스템을 개발해봅시다.
[x] 도착 정보는 노선명, 남은 시간으로 구성되어 있다.
[x] 남은 시간은 초 단위이다.
[x] 초기에 남은 시간은 15초에서 20초 사이 중 랜덤이다.
[x] 콘솔에 도착 정보를 표출한다.
[x] 도착 정보는 최대 15개 보유할 수 있다.
[x] 남은 시간 기준 오름차순으로 상위 5개 노선명만 중복 없이 표출한다.
[x] 남은 시간 기준 오름차순으로 모든 노선 정보를 표출한다. 중복된 노선명이 있어도 그대로 표출한다.
[x] 3초마다 도착 정보를 갱신하여 업데이트한다.
[x] 도착 정보 갱신 시, 갱신된 시점을 표출한다.
[x] 남은 시간이 0이 되면 해당 노선은 목록에서 더 이상 표출되지 않는다.
노선명과 도착 시간으로 이뤄진 클래스를 생성합니다.
data class Arrival(
val routeName: String,
val timeLeft: Int, // sec
)
도착 정보를 생성하는 객체를 구현합니다.
도착 정보 리스트는 별도의 정렬이 이뤄지지 않은 상태에서 불러오도록 구현하도록 하겠습니다.
class ArrivalInfoSource() {
// 사용될 노선 이름들
private val routes = mutableListOf<String>("123", "147", "M5", "M10", "M27")
fun fetch(size: Int): List<Arrival> = List(size) {
create()
}
private fun create(): Arrival {
routes.shuffle()
val routeName = routes.first()
val timeLeft = Random.nextInt(15..20)
return Arrival(
routeName = routeName,
timeLeft = timeLeft,
)
}
}
시퀀스의 한계를 극복하기 위해, 비동기 기반 데이터 스트림을 구현해보도록 하겠습니다.
데이터를 생산하고 소비하는데 필요한 인터페이스부터 살펴보겠습니다.
interface DataProducer<T> {
suspend fun produce(element: T)
}
interface DataCustomer<T> {
suspend fun consume(): T
}
DataProducer는 데이터를 받아서 처리하는 역할을 담당합니다. produce() 함수는 suspend 키워드를 통해 비동기 작업을 수행할 수 있으며, 전달받은 원소를 가공하거나 저장하는 등의 작업을 수행합니다.
DataConsumer는 데이터를 소비하는 역할을 담당합니다. consume() 함수는 비동기적으로 데이터를 가져와서 반환하며, 이 과정에서도 비동기 작업을 수행할 수 있습니다.
이어서 생산자와 소비자를 이어주는 역할을 하는 StreamPipe를 구현해보도록 하겠습니다.
StreamPipe는 생산자와 소비자 역할도 구현하는데, 파이프 양 끝에 생산자와 소비자가 존재하는 이미지를 떠올려볼 수 있습니다.
기본적인 아이디어는 다음과 같습니다.
produce()를 호출하여 파이프로 데이터를 보내고, 소비하는 측에서 consume()을 호출하여 파이프로부터 데이터를 받는 구조입니다.class StreamPipe<T> : DataProducer<T>, DataCustomer<T> {
private val buffer = ArrayDeque<T>()
private val waitingConsumers = ArrayDeque<CancellableContinuation<T>>()
private val mutex = Mutex()
private val isClosed = AtomicBoolean(false)
override suspend fun produce(element: T) {
if (isClosed.get()) throw IllegalStateException("Channel is closed")
var consumerToResume: CancellableContinuation<T>? = null
mutex.withLock {
if (waitingConsumers.isNotEmpty()) {
consumerToResume = waitingConsumers.removeFirst()
} else {
buffer.addLast(element)
}
}
consumerToResume?.resume(element) { t ->
println("error : $t")
}
}
override suspend fun consume(): T {
return suspendCancellableCoroutine { continuation ->
var resumed = false
val job = CoroutineScope(Dispatchers.Unconfined).launch {
mutex.withLock {
if (buffer.isNotEmpty()) {
if (!resumed) {
resumed = true
continuation.resume(buffer.removeFirst()) { t ->
println("error : $t")
}
}
return@withLock
}
if (isClosed.get()) {
if (!resumed) {
resumed = true
continuation.resumeWithException(IllegalStateException("Channel is closed and empty"))
}
return@withLock
}
waitingConsumers.addLast(continuation)
}
}
continuation.invokeOnCancellation {
job.cancel()
CoroutineScope(Dispatchers.Unconfined).launch {
mutex.withLock {
waitingConsumers.remove(continuation)
}
}
}
}
}
suspend fun close() {
if (!isClosed.getAndSet(true)) {
mutex.withLock {
waitingConsumers.forEach { consumer ->
consumer.resumeWithException(IllegalStateException("Channel is closed"))
}
waitingConsumers.clear()
buffer.clear()
}
}
}
}
최대 15개의 도착 정보를 관리하기 위한 객체입니다.
이 곳에서 모든 도착 정보를 관리하며, 소비자가 파이프로부터 데이터를 받아올 때마다 시간 감소 및 보충 로직, 데이터 정렬 및 상태 관리를 수행합니다.
class ArrivalManager(
private val maxArrivals: Int = 15,
private val arrivals: MutableList<Arrival> = mutableListOf()
) {
private val mutex = Mutex()
private var lastUpdateTime = System.currentTimeMillis()
suspend fun addNewArrival(arrival: Arrival) {
mutex.withLock {
val currentTime = System.currentTimeMillis()
if (currentTime - lastUpdateTime >= 3000) {
updateTimeAndRemoveExpired()
lastUpdateTime = currentTime
}
// 새로운 도착 정보 추가 (최대 15개까지)
if (arrivals.size < maxArrivals) {
arrivals.add(arrival)
} else {
// 가장 오래된 것 제거 후 새로운 것 추가
arrivals.removeFirst()
arrivals.add(arrival)
}
}
}
private fun updateTimeAndRemoveExpired() {
// 기존 도착 정보의 시간을 3초 감소
arrivals.replaceAll { arrival ->
arrival.copy(timeLeft = maxOf(0, arrival.timeLeft - 3))
}
// 시간이 0이 된 항목 제거
arrivals.removeIf { it.timeLeft <= 0 }
}
suspend fun topRoutes(count: Int): List<String> {
return mutex.withLock {
arrivals.sortedBy { it.timeLeft }
.map { it.routeName }
.distinct()
.take(count)
}
}
suspend fun sortedArrivals(): List<Arrival> {
return mutex.withLock {
arrivals.sortedBy { it.timeLeft }
}
}
}
class ArrivalView {
fun displayArrivalInfo(topRoutes: List<String>, allArrivals: List<Arrival>, updateTime: String) {
println("\n" + "=".repeat(50))
println("🚌 버스 도착 정보 (업데이트 시각: $updateTime)")
println("=".repeat(50))
// 상위 5개 노선명 (중복 제거)
println("📍 가장 빨리 도착하는 노선 (상위 5개):")
if (topRoutes.isNotEmpty()) {
topRoutes.forEachIndexed { index, routeName ->
println(" ${index + 1}. ${routeName}번")
}
} else {
println(" 현재 도착 예정인 버스가 없습니다.")
}
println()
// 모든 노선 정보 (중복 포함)
println("🕐 전체 도착 정보:")
if (allArrivals.isNotEmpty()) {
allArrivals.forEach { arrival ->
val minutes = arrival.timeLeft / 60
val seconds = arrival.timeLeft % 60
val timeDisplay = if (minutes > 0) {
"${minutes}분 ${seconds}초"
} else {
"${seconds}초"
}
println(" ${arrival.routeName}번 - $timeDisplay 후 도착")
}
} else {
println(" 현재 도착 예정인 버스가 없습니다.")
}
}
fun displaySystemStart() {
println("🚌 버스 도착 정보 시스템을 시작합니다...")
println("초기 도착 정보 15개를 불러옵니다...")
}
fun displaySystemShutdown() {
println("\n시스템을 종료합니다...")
}
}
class ArrivalProducerController(
private val pipe: StreamPipe<Arrival>,
private val arrivalInfoSource: ArrivalInfoSource,
) {
private var isInitialized = false
suspend fun start() {
// 초기 15개 도착 정보 생성
if (!isInitialized) {
val initialArrivals = arrivalInfoSource.fetch(15)
for (arrival in initialArrivals) {
pipe.produce(arrival)
}
isInitialized = true
}
// 주기적으로 새로운 도착 정보 생성
while (true) {
delay(3000) // 3초마다
try {
// 랜덤하게 1-3개의 새로운 도착 정보 생성
val newArrivals = arrivalInfoSource.fetch(Random.nextInt(1, 4))
for (arrival in newArrivals) {
pipe.produce(arrival)
}
} catch (e: Exception) {
println("데이터 생산자 오류: ${e.message}")
break
}
}
}
}
class ArrivalConsumerController(
private val pipe: StreamPipe<Arrival>,
private val model: ArrivalManager,
private val view: ArrivalView
) {
suspend fun start() {
while (true) {
try {
val arrival = pipe.consume()
// 새로운 도착 정보를 모델에 추가
model.addNewArrival(arrival)
// 주기적으로 화면 업데이트 (3초마다)
displayCurrentState()
} catch (e: Exception) {
println("데이터 소비자 오류: ${e.message}")
break
}
}
}
private suspend fun displayCurrentState() {
val currentTime = LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"))
val topRoutes = model.topRoutes(5)
val allArrivals = model.sortedArrivals()
view.displayArrivalInfo(topRoutes, allArrivals, currentTime)
}
}
fun main() {
BusArrivalApplication().run()
}
class BusArrivalApplication {
private val model = ArrivalManager()
private val view = ArrivalView()
private val pipe = StreamPipe<Arrival>()
private val arrivalInfoSource = ArrivalInfoSource()
private val dataProducer = ArrivalProducerController(pipe, arrivalInfoSource)
private val dataConsumer = ArrivalConsumerController(pipe, model, view)
fun run() = runBlocking {
view.displaySystemStart()
val scope = CoroutineScope(Dispatchers.Default)
val producerJob = scope.launch {
dataProducer.start()
}
val consumerJob = scope.launch {
dataConsumer.start()
}
joinAll(producerJob, consumerJob)
}
}
비동기 데이터 스트림 구현을 통해 코루틴 혹은 시퀀스를 활용할 때 발생하는 여러 한계점들을 해결할 수 있었습니다.
StreamPipe로 대기 중인 생산자와 소비자를 큐로 관리하여, 생산자와 소비자 간의 데이터 전달 메커니즘을 구축하였습니다. Mutex를 활용하여 공유 자원에 대한 안전한 접근을 보장했고, 여러 코루틴이 동시에 실행되는 환경에서도 데이터 무결성을 유지할 수 있었습니다. 특히 생산자가 데이터를 생성하는 동안 소비자가 동시에 데이터를 요청하는 상황에서도 경쟁 조건 없이 안전하게 처리할 수 있는 구조를 만들었습니다.
실시간 처리 측면에서는 생산자와 소비자가 완전히 독립적인 코루틴에서 실행되어 실제 시간 기반 처리가 가능해졌습니다. 버스 도착 정보가 실시간으로 업데이트되고 화면에 반영되는 시스템을 구현할 수 있었으며, 데이터가 생산되는 즉시 소비자에게 전달되어 지연 시간을 최소화했습니다.
버퍼 용량 제한 부재로 인한 백프레셔
현재 구현에서는 파이프가 가지고 있는 보유 가능한 정보의 수가 제한이 없습니다. 이로 인해 상황에 따라 성능이 저하될 수 있습니다. 특히 만약 파이프에서 데이터가 생산되는 속도에 비해 소비되는 속도가 현저히 느리다면, 메모리 사용량이 무한정 증가할 수 있습니다. 이러한 백프레셔를 제어하는 로직 또한 존재하지 않는 것도 성능에 악영항을 미칠 수 있습니다.
데이터 가공 과정의 어려움
현재 구현에서는 원시 데이터를 파이프를 통해 그대로 전달하는 구조입니다. 하지만 실제 시스템에서는 데이터를 필터링하거나 변환하는 과정이 필요할 수 있습니다. 예를 들어, 현재 구조에서는 특정 노선만 선별하거나 데이터를 집계하는 등의 중간 처리 과정을 ArrivalManager 측에서 직접 구현하고 있는데, 더욱 복잡한 로직이 생긴다면 그에 대한 관리가 어려워질 수 있습니다.
이번 편에서는 비동기 데이터 스트림을 활용하는 것이 필요한 상황, 그리고 그것을 직접 구현해보며 원리를 살펴보았습니다. 하지만 지금까지 알아본 방식에는 많은 문제점이 존재하는데, 이를 어떻게 해결하는지 다음 편에서 알아보도록 하겠습니다.