토요일에 열린 Kotlin Backend Meetup을 다녀와 코틀린의 매력적인 기능들을 사용해 코딩하고 싶다는 생각이 들었습니다.
특히 코루틴은 평소에 잘 다뤄보지 않아서 평소 만들어보고 싶던 WorkFlow방식 설계를 코루틴을 통해 구현해보고 싶었습니다.
WorkFlow는 여러 개의 Step을 조합하여 일련의 작업을 수행하는 방식입니다.
예: 파일을 읽고, 데이터를 변환하고 저장. 또는 여러가지 종류의 API 요청을 보내고 취합
모든 Step의 실행 결과를 표현하기 위해 Result 클래스를 정의합니다.
sealed class Result<T> {
data class Success<T>(val value: T) : Result<T>()
data class Failure<T>(val reason: String) : Result<T>()
}
WorkFlow와 WorkFlowExecutor를 분리했습니다.
WorkFlow는 동기 또는 비동기 실행 시 각각 다르게 하기 위해서 입니다. 먼저 Executor 인터페이스를 만들어줍니다.
sealed interface WorkFlowExecutor {
fun execute(workFlow: WorkFlow)
fun retry(): List<Step>
일부러 중간에 첫번째 Step에 delay를 걸어주었습니다.
비동기적으로 Step을 실행하면서 실패한 Step을 failQueue에 저장합니다.
data object SequentialExecutor : WorkFlowExecutor {
private val failQueue = mutableListOf<Step>()
override fun execute(workFlow: WorkFlow) {
workFlow.steps.forEach { step ->
when (step.execute()) {
is Result.Success -> println("workFlow 진행 중. 현재 완료 스텝: $step")
is Result.Failure -> {
failQueue.add(step)
println("Step 실패: $step")
return@forEach
}
}
}
}
override fun retry(): List<Step> = TODO("멈춘 데부터 retry")
}
data object AsyncExecutor : WorkFlowExecutor {
private val failQueue = mutableListOf<Step>()
override fun execute(workFlow: WorkFlow) =
runBlocking {
failQueue.clear()
val resultChannel = Channel<Pair<Step, Result<Step>>>()
val jobs =
workFlow.steps.map { step ->
async(step.stepType.dispatcher) {
if (step.stepType == StepType.A) {
delay(5000)
}
val result = step.execute()
resultChannel.send(step to result)
}
}
launch {
repeat(workFlow.steps.size) {
val (step, result) = resultChannel.receive()
when (result) {
is Result.Failure -> {
println("Step 실패: $step")
failQueue.add(step)
}
is Result.Success -> println("✅ WorkFlow 진행 중. 현재 완료 스텝: $step")
}
}
resultChannel.close()
}
jobs.awaitAll()
val remaining = retry()
println(
message =
when {
remaining.isNotEmpty() -> "실패한 Step이 있습니다: ${remaining.joinToString(",")}"
else -> "workFlow 완료"
},
)
}
override fun retry(): List<Step> =
runBlocking {
val retryJobs =
failQueue.map { step ->
async(step.stepType.dispatcher) {
step.execute()
}
}
retryJobs.awaitAll()
emptyList()
}
}
}
이제 이를 실행해보면
fun sendRequest() {
val request = Request(UUID.randomUUID())
println("Sending request: $request")
WorkFlowExecutor.AsyncExecutor.execute(WorkFlow.Personal)
println("Completed PersonalWorkFlow")
WorkFlowExecutor.SequentialExecutor.execute(WorkFlow.Business)
println("Completed BusinessWorkFlow")
println("All WorkFlows completed")
}
아래와 같이 프로세스가 실행되고 종료됩니다.
Sending request: Request(id=64fe9fbe-47fc-4468-82dd-25eef8c11f65)
Sending step: Step(B)
Sending step: Step(C)
✅ WorkFlow 진행 중. 현재 완료 스텝: Step(B)
✅ WorkFlow 진행 중. 현재 완료 스텝: Step(C)
Sending step: Step(A)
✅ WorkFlow 진행 중. 현재 완료 스텝: Step(A)
workFlow 완료
Completed PersonalWorkFlow
Sending step: Step(D)
workFlow 진행 중. 현재 완료 스텝: Step(D)
Sending step: Step(E)
workFlow 진행 중. 현재 완료 스텝: Step(E)
Completed BusinessWorkFlow
All WorkFlows completed
Process finished with exit code 0
1.1 유지보수성이 뛰어남
• 하나의 거대한 프로세스 대신 작은 Step 단위로 작업을 나누면 관리가 쉬워짐.
• 새로운 Step을 추가하거나 수정할 때 전체 코드를 수정할 필요 없이 모듈화된 Step만 변경하면 됨.
1.2 비동기 성능 최적화
• async와 awaitAll()을 활용하여 모든 Step을 병렬로 실행하면 실행 속도가 대폭 향상됨.
• CPU 작업과 IO 작업을 적절히 분리(CpuBound, IoBound 활용)하여, 자원을 효율적으로 사용할 수 있음.
1.3 자동 재시도를 통한 안정성 확보
• 특정 Step이 실패하더라도 WorkFlow 전체가 중단되지 않고, 최대 3번까지 재시도를 진행할 수 있음.
• 이를 통해 네트워크 오류, 일시적인 장애 등의 문제를 극복하고 보다 견고한 시스템을 구축할 수 있음.
1.4 가독성 및 확장성 증가
• WorkFlow의 진행 상태를 실시간으로 출력하며, 성공한 Step과 실패한 Step을 명확히 구분할 수 있음.
• 추가적인 모니터링 기능(예: 진행률 표시, 로그 기록 등)을 쉽게 확장 가능.
2.1 데이터 처리 파이프라인
• Step 1: 데이터 읽기 (IO 작업)
• Step 2: 데이터 변환 (CPU 작업)
• Step 3: 변환된 데이터 저장 (IO 작업)
• Step 4: 결과를 다른 서비스로 전송 (네트워크 IO 작업)
• 병렬 실행 + 실패 시 자동 재시도로 안정적인 데이터 처리 가능
2.2 파일 업로드 & 변환 시스템
• Step 1: 사용자로부터 파일 업로드 받기
• Step 2: 파일을 특정 형식으로 변환 (CPU 연산)
• Step 3: 변환된 파일을 클라우드 스토리지에 업로드
• 각 Step을 병렬 실행하면 대량의 파일을 효율적으로 처리 가능
2.3 대량 API 호출 시스템
• Step 1: 사용자 요청을 수집
• Step 2: 외부 API를 병렬로 호출하여 데이터 수집
• Step 3: 결과 데이터를 가공하여 저장
• 비동기 API 호출을 활용하면 수백~수천 개의 요청을 빠르게 처리 가능