코루틴을 활용한 Step 기반 비동기 WorkFlow 처리

Aryumka·2025년 2월 16일

Why 코루틴?

토요일에 열린 Kotlin Backend Meetup을 다녀와 코틀린의 매력적인 기능들을 사용해 코딩하고 싶다는 생각이 들었습니다.

특히 코루틴은 평소에 잘 다뤄보지 않아서 평소 만들어보고 싶던 WorkFlow방식 설계를 코루틴을 통해 구현해보고 싶었습니다.

WorkFlow란?

WorkFlow는 여러 개의 Step을 조합하여 일련의 작업을 수행하는 방식입니다.

  • 각 Step은 독립적으로 실행될 수 있으며, 성공/실패 여부를 반환합니다.
  • WorkFlow는 여러 Step을 조합하여 전체 프로세스를 관리합니다.
  • 비동기 실행을 활용하면 WorkFlow가 보다 빠르고 효율적으로 동작할 수 있습니다.

예: 파일을 읽고, 데이터를 변환하고 저장. 또는 여러가지 종류의 API 요청을 보내고 취합

WorkFlow 설계

모든 Step의 실행 결과를 표현하기 위해 Result 클래스를 정의합니다.

sealed class Result<T> {
    data class Success<T>(val value: T) : Result<T>()
    data class Failure<T>(val reason: String) : Result<T>()
}

WorkFlowWorkFlowExecutor를 분리했습니다.
WorkFlow는 동기 또는 비동기 실행 시 각각 다르게 하기 위해서 입니다. 먼저 Executor 인터페이스를 만들어줍니다.

sealed interface WorkFlowExecutor {
  fun execute(workFlow: WorkFlow)
  fun retry(): List<Step>

일부러 중간에 첫번째 Step에 delay를 걸어주었습니다.
비동기적으로 Step을 실행하면서 실패한 Step을 failQueue에 저장합니다.

  data object SequentialExecutor : WorkFlowExecutor {
    private val failQueue = mutableListOf<Step>()

    override fun execute(workFlow: WorkFlow) {
      workFlow.steps.forEach { step ->
        when (step.execute()) {
          is Result.Success -> println("workFlow 진행 중. 현재 완료 스텝: $step")
          is Result.Failure -> {
            failQueue.add(step)
            println("Step 실패: $step")
            return@forEach
          }
        }
      }
    }

    override fun retry(): List<Step> = TODO("멈춘 데부터 retry")
  }

  data object AsyncExecutor : WorkFlowExecutor {
    private val failQueue = mutableListOf<Step>()

    override fun execute(workFlow: WorkFlow) =
      runBlocking {
        failQueue.clear()

        val resultChannel = Channel<Pair<Step, Result<Step>>>()

        val jobs =
          workFlow.steps.map { step ->
            async(step.stepType.dispatcher) {
              if (step.stepType == StepType.A) {
                delay(5000)
              }
              val result = step.execute()
              resultChannel.send(step to result)
            }
          }

        launch {
          repeat(workFlow.steps.size) {
            val (step, result) = resultChannel.receive()
            when (result) {
              is Result.Failure -> {
                println("Step 실패: $step")
                failQueue.add(step)
              }
              is Result.Success -> println("✅ WorkFlow 진행 중. 현재 완료 스텝: $step")
            }
          }
          resultChannel.close()
        }

        jobs.awaitAll()

        val remaining = retry()
        println(
          message =
            when {
              remaining.isNotEmpty() -> "실패한 Step이 있습니다: ${remaining.joinToString(",")}"
              else -> "workFlow 완료"
            },
        )
      }

    override fun retry(): List<Step> =
      runBlocking {
        val retryJobs =
          failQueue.map { step ->
            async(step.stepType.dispatcher) {
              step.execute()
            }
          }
        retryJobs.awaitAll()
        emptyList()
      }
  }
}

이제 이를 실행해보면

fun sendRequest() {
  val request = Request(UUID.randomUUID())
  println("Sending request: $request")

  WorkFlowExecutor.AsyncExecutor.execute(WorkFlow.Personal)

  println("Completed PersonalWorkFlow")

  WorkFlowExecutor.SequentialExecutor.execute(WorkFlow.Business)

  println("Completed BusinessWorkFlow")

  println("All WorkFlows completed")
}

아래와 같이 프로세스가 실행되고 종료됩니다.

Sending request: Request(id=64fe9fbe-47fc-4468-82dd-25eef8c11f65)
Sending step: Step(B)
Sending step: Step(C)
✅ WorkFlow 진행 중. 현재 완료 스텝: Step(B)
✅ WorkFlow 진행 중. 현재 완료 스텝: Step(C)
Sending step: Step(A)
✅ WorkFlow 진행 중. 현재 완료 스텝: Step(A)
workFlow 완료
Completed PersonalWorkFlow
Sending step: Step(D)
workFlow 진행 중. 현재 완료 스텝: Step(D)
Sending step: Step(E)
workFlow 진행 중. 현재 완료 스텝: Step(E)
Completed BusinessWorkFlow
All WorkFlows completed

Process finished with exit code 0

결론

  1. WorkFlow를 Step 기반으로 설계하는 이유

1.1 유지보수성이 뛰어남
• 하나의 거대한 프로세스 대신 작은 Step 단위로 작업을 나누면 관리가 쉬워짐.
• 새로운 Step을 추가하거나 수정할 때 전체 코드를 수정할 필요 없이 모듈화된 Step만 변경하면 됨.

1.2 비동기 성능 최적화
• async와 awaitAll()을 활용하여 모든 Step을 병렬로 실행하면 실행 속도가 대폭 향상됨.
• CPU 작업과 IO 작업을 적절히 분리(CpuBound, IoBound 활용)하여, 자원을 효율적으로 사용할 수 있음.

1.3 자동 재시도를 통한 안정성 확보
• 특정 Step이 실패하더라도 WorkFlow 전체가 중단되지 않고, 최대 3번까지 재시도를 진행할 수 있음.
• 이를 통해 네트워크 오류, 일시적인 장애 등의 문제를 극복하고 보다 견고한 시스템을 구축할 수 있음.

1.4 가독성 및 확장성 증가
• WorkFlow의 진행 상태를 실시간으로 출력하며, 성공한 Step과 실패한 Step을 명확히 구분할 수 있음.
• 추가적인 모니터링 기능(예: 진행률 표시, 로그 기록 등)을 쉽게 확장 가능.

  1. Step 기반 비동기 WorkFlow의 실용적인 활용 사례

2.1 데이터 처리 파이프라인
• Step 1: 데이터 읽기 (IO 작업)
• Step 2: 데이터 변환 (CPU 작업)
• Step 3: 변환된 데이터 저장 (IO 작업)
• Step 4: 결과를 다른 서비스로 전송 (네트워크 IO 작업)
• 병렬 실행 + 실패 시 자동 재시도로 안정적인 데이터 처리 가능

2.2 파일 업로드 & 변환 시스템
• Step 1: 사용자로부터 파일 업로드 받기
• Step 2: 파일을 특정 형식으로 변환 (CPU 연산)
• Step 3: 변환된 파일을 클라우드 스토리지에 업로드
• 각 Step을 병렬 실행하면 대량의 파일을 효율적으로 처리 가능

2.3 대량 API 호출 시스템
• Step 1: 사용자 요청을 수집
• Step 2: 외부 API를 병렬로 호출하여 데이터 수집
• Step 3: 결과 데이터를 가공하여 저장
• 비동기 API 호출을 활용하면 수백~수천 개의 요청을 빠르게 처리 가능

profile
아륨까라고 읽습니다.

0개의 댓글