WebFlux + Kotlin +Coroutine 스타일로 리팩터링

박태현·2025년 8월 18일
0

통합 예약 프로젝트에서 대기열 등록 과정에서의 코루틴 동작 과정에 대해 살펴보려합니다.

CoroutineScope와 CoroutineContext


CoroutineScope는 단순히 CoroutineContext를 감싸고 제공하는 역할이고, 실제 핵심 정보는 CoroutineContext 안에 들어 있는 요소들임

[ CoroutineContext의 대표적인 요소 ]

  • Job : 현재 코루틴의 생명주기( 취소/완료 ) 관리
  • ContinuationInterceptor ( == Dispatcher ) : 어떤 스레드/스레드풀에서 실행될지 결정
  • CoroutineName : 디버깅용 이름
  • CoroutineExceptionHandler : 예외 처리 핸들러

register 과정 ( 대기열 등록 과정 )에서의 scope를 확인해보자


[ 각 코루틴이 실행되는 시점의 컨텍스트 정보를 출력하는 디버깅용 함수 ]

suspend fun logScope(name: String, scope: CoroutineScope? = null) {
    val ctx = scope?.coroutineContext ?: coroutineContext

    // Job을 제외한 Context ( 부모 스코프를 기준으로 확인하기 위해 )
    // Job을 포함 한다면 각 코루틴은 독립적인 Job을 가지기에 항상 달라지므로 Job을 제외하고 계산해야 함 
    val ctxWithoutJob = ctx.minusKey(Job)

    println(
        """
        [$name]
        Scope(hashWithoutJob): ${ctxWithoutJob.hashCode()}
        Full Scope(hash): ${ctx.hashCode()}
        Job: ${ctx[Job]}
        Dispatcher: ${ctx[ContinuationInterceptor]}
        CoroutineName: ${ctx[CoroutineName]}
        Thread: ${Thread.currentThread().name}
        """.trimIndent()
    )
}

[ register 과정에서의 코드 흐름 ]

...
@PostMapping("/register/{userId}/{queueType}")
	suspend fun registerUser(
		@PathVariable("userId") userId: String,
		@PathVariable("queueType") queueType: String,
		request: ServerHttpRequest
	): ResponseEntity<String> {

		logScope("register01")
		...
}

---

suspend fun register(
	...
): ResponseEntity<String> {

	return redisLockUtil.acquireLockAndRun("$userId:$queueType:queueing")
		{ idempotencyService.execute(
			key = idempotencyKey,
			url = "/queue/register",
			method = "POST",
		) { registerUserToWaitQueue(userId, queueType, enterTimestamp) }
	}
}

---

suspend fun execute(
	...
): ResponseEntity<String> = withContext(Dispatchers.IO) {
	...
}

---

suspend fun registerUserToWaitQueue(
 ...
): String {

	logScope("register02")

	coroutineScope {
		logScope("coroutineScope 내부")

		val waitTime = measureTimeMillis {

		// 두 작업을 병렬로 실행
		val inWaitDeferred = async { searchUserRanking(userId, queueType, "wait") }
		val inAllowDeferred = async { searchUserRanking(userId, queueType, "allow") }
		...
}

---

suspend fun searchUserRanking(
	...
): Long {

	logScope("register-search")
	...
}

[ 로그 결과 ]

[register01]
Scope(hashWithoutJob): -1377992022 🟩
Full Scope(hash): -1297004250
Dispatcher: Dispatchers.Unconfined
Thread: parallel-1

[register02]
Scope(hashWithoutJob): -1315430893 🟦
Full Scope(hash): -314659329
Dispatcher: Dispatchers.IO
Thread: DefaultDispatcher-worker-1

[coroutineScope 내부]
Scope(hashWithoutJob): -1315430893 🟦
Full Scope(hash): -189858013
Dispatcher: Dispatchers.IO
Thread: DefaultDispatcher-worker-1

[register-search]
Scope(hashWithoutJob): -1315430893 🟦
Full Scope(hash): -1265994385
Dispatcher: Dispatchers.IO
Thread: DefaultDispatcher-worker-3

[register-search]
Scope(hashWithoutJob): -1315430893 🟦
Full Scope(hash): 349662602
Dispatcher: Dispatchers.IO
Thread: DefaultDispatcher-worker-2

새로운 Job이 만들어진다 == 새로운 코루틴이 실행된다

aunch {} 나 async {}를 쓰면 항상 새 Job이 생기고, 따라서 새로운 코루틴이 실행되며, Job이 다르면 반드시 다른 코루틴임

withContext(context) { ... }

새로운 스코프를 만드는 게 아니고, 현재 코루틴( Context )을 바탕으로 새로운 Context를 합성해서 그 블록을 실행하는 구조

즉, 부모 Context를 받지만 변경할 수 있으며, 새로운 Job을 만드는 게 아니라, 기본적으로 부모 Job을 그대로 사용

⇒  ”부모 코루틴 안에서 일시 중단( suspend ) → 다른 Context에서 실행 → 다시 복귀”하는 과정을 거침

⇒ 같은 코루틴 ( job )에서 실행됨

coroutineScope {} , supervisorScope {}

새로운 스코프 ( Context )를 만드는 것이 아니라 새로운 job만 만들고 부모 context를 그대로 상속하는 구조

즉, 새로운 job을 생성하여 동작하며 이 job을 부모 컨텍스트에 붙여서 실행하는 구조

[ 구조 ]

CoroutineScope (Spring Request scope, Reactor bridge)
└── Job1 (MonoCoroutine, registerUser 실행)
    └── (withContext(Dispatchers.IO)) // 같은 Job1, Dispatcher만 변경
        └── registerUserToWaitQueue 실행 ( Job1 실행됨 )
            └── Job2 (ScopeCoroutine, coroutineScope 블록)
                ├── Job3 (DeferredCoroutine, async { searchUserRanking(wait) })
                └── Job4 (DeferredCoroutine, async { searchUserRanking(allow) })

부가적인 설명


suspend fun registerUserToWaitQueue(
	userId: String,
	queueType: String,
	enterTimestamp: Long
): String {

	coroutineScope {
		// 두 작업을 병렬로 실행
		val inWaitDeferred = async { searchUserRanking(userId, queueType, "wait") }
		val inAllowDeferred = async { searchUserRanking(userId, queueType, "allow") }
	
		val inWait = inWaitDeferred.await()
		val inAllow = inAllowDeferred.await()
	
	
		if (inWait != -1L || inAllow != -1L) {
			throw ReserveException(HttpStatus.BAD_REQUEST, ErrorCode.ALREADY_REGISTERED_USER)
		}
		
		log.info { "wait : $waitTime" }
	}
	...
}

supervisorScope를 사용한다면 한 자식이 실패하더라도 다른 자식들은 취소되지 않고 끝까지 실행됨

여기서, scope 자체는 실패한 자식의 예외를 수집하여 블록이 끝날 때 전파하기에 모든 자식 코루틴 작업들이 실행되게 됨

하지만, coroutineScope는 한 자식 코루틴이 실패할 시 나머지도 전부 즉시 취소하고, 바로 예외를 전파

따라서, coroutineScope를 사용한다면 리소스 낭비를 줄일 수 있으므로 이를 사용

⭐️⭐️

일반적으로 suspend fun을 그냥 호출한다고 해서 Scope가 생성되는 것은 아니지만

, 요청 핸들러( @PostMapping suspend fun )는 프레임워크가 실행 된다면, 요청마다 새로운 CoroutineScope(Job)가 만들어지고 그 안에서 핸들러 suspend 함수가 실행됨

즉, 요청으로 받는 suspend 함수는 scope가 생성되어 그 안에서 실행됨 ( Spring Webflux가 만들어주는 것 )

따라서, 요청 핸들러 함수 내에서 별도의 코루틴 스코프를 명시적으로 생성하지 않아도 됨

profile
꾸준하게

0개의 댓글