대기열 등록 중 취소 시 오류가 발생하는 문제가 발생

프로젝트에는 대기열에서 사용자를 제거하고 참가열로 이동시키는 승격 로직이 존재하는데
승격 로직이 대기열에서 사용자를 삭제한 직후 아직 참가열에 추가하기 전, 거의 동시에 취소 로직이 실행되어 참가열에서 해당 사용자를 취소하면 사용자가 존재하지 않기 때문에 취소 로직에서 오류가 발생하게 됩니다.
suspend fun cancelWaitOrAllowUser(
userId: String,
queueType: String,
queueCategory: String
): ResponseEntity<String> {
try {
if (queueCategory == "wait") {
val waitQueueKey = "$queueType$WAIT_QUEUE"
val removedCount = reactiveRedisTemplate.opsForZSet()
.remove(waitQueueKey, userId)
.awaitSingle()
if (removedCount == 0L) {
throw ReserveException(HttpStatus.BAD_REQUEST, ErrorCode.USER_NOT_FOUND_IN_THE_QUEUE)
}
sendEventToKafka(waitQueueKey, userId, "CANCELED")
return ResponseEntity.ok("대기열 삭제 완료")
} else {
val allowQueueKey = "$queueType$ALLOW_QUEUE"
val allowRemovedCount = reactiveRedisTemplate.opsForZSet()
.remove(allowQueueKey, userId)
.awaitSingle()
**// 여기서 문제 상황에 대한 에러 발생
if (allowRemovedCount == 0L) {
throw ReserveException(HttpStatus.BAD_REQUEST, ErrorCode.USER_NOT_FOUND_IN_THE_QUEUE)
}**
val ttlRemovedCount = reactiveRedisTemplate.opsForZSet()
.remove(TOKEN_TTL_INFO, userId)
.awaitSingle()
if (ttlRemovedCount == 0L) {
log.warn { "${userId}님의 TTL 키가 존재하지 않아 삭제되지 않았습니다." }
}
return ResponseEntity.ok("참가열 삭제 완료")
}
} catch (e: ReserveException) {
log.error { "예약 취소 중 오류 발생" }
throw e
}
}
이 타이밍 문제를 별도 케이스로 분리하고, 이러한 상황이 발생하면 참가열에서 사용자를 삭제하지 않고 이미 삭제된 것으로 간주하여 응답을 반환하여 오류가 발생하지 않도록 하였습니다.
suspend fun cancelUser(userId: String, queueType: String, queueCategory: String): ResponseEntity<String> {
return when (queueCategory) {
"wait" -> cancelWaitOrAllow(userId, queueType)
"allow" -> cancelAllowUser(userId, queueType)
else -> throw ReserveException(HttpStatus.BAD_REQUEST, ErrorCode.INVALID_QUEUE_CATEGORY)
}
}
/*
* 문제 상황 발생 가능성
*
* 승격 로직이 wait에서 사용자를 삭제하고 allow로 옮기기 전에 취소 로직이 allow에서 삭제를 진행하는 경우
* ⇒ 이러한 타이밍으로 인한 경쟁 상태를 별도로 관리하여 문제를 해결
*/
private suspend fun cancelWaitOrAllow(userId: String, queueType: String): ResponseEntity<String> {
val waitQueueKey = "$queueType$WAIT_QUEUE"
val removedCount = reactiveRedisTemplate.opsForZSet()
.remove(waitQueueKey, userId)
.awaitSingle()
return if (removedCount > 0L) {
sendEventToKafka(waitQueueKey, userId, "CANCELED")
ResponseEntity.ok("대기열 삭제 완료")
} else {
val allowResult = cancelAllowUserForWaitContext(userId, queueType)
allowResult ?: run {
log.info { "이미 삭제가 처리된 사용자" }
ResponseEntity.ok("이미 삭제가 처리된 사용자")
}
}
}
// 경쟁 상태 문제로 인한 별도의 로직
private suspend fun cancelAllowUserForWaitContext(
userId: String, queueType: String
): ResponseEntity<String>? {
val allowQueueKey = "$queueType$ALLOW_QUEUE"
val allowRemovedCount = reactiveRedisTemplate.opsForZSet()
.remove(allowQueueKey, userId)
.awaitSingle()
// 승격 중 타이밍 문제로 간주
if (allowRemovedCount == 0L) return null
removeTtlKey(userId)
return ResponseEntity.ok("참가열 삭제 완료")
}
suspend fun cancelAllowUser(
userId: String,
queueType: String,
): ResponseEntity<String> {
try {
val allowQueueKey = "$queueType$ALLOW_QUEUE"
val allowRemovedCount = reactiveRedisTemplate.opsForZSet()
.remove(allowQueueKey, userId)
.awaitSingle()
if (allowRemovedCount == 0L) {
throw ReserveException(HttpStatus.BAD_REQUEST, ErrorCode.USER_NOT_FOUND_IN_THE_QUEUE)
}
removeTtlKey(userId)
return ResponseEntity.ok("참가열 삭제 완료")
} catch (e: ReserveException) {
log.error { "예약 취소 중 오류 발생" }
throw e
}
}