
현재 QueueEventController 클래스에서 발생하는 오류는 status.first, status.second, status.third 속성을 사용하려고 할 때 발생하고 있습니다.
이 오류는 QueueStatus 객체를 Triple 타입으로 잘못 참조하고 있기 때문에 발생합니다.
이전에 QueueStatus를 Triple<Long, Long, Double> 타입 별칭으로 정의했었는데, 지금은 데이터 클래스로 변경되어 first, second, third 대신 정확한 필드 이름(userRank, totalQueueSize, progress)을 사용해야 합니다.
QueueEventController 클래스의 코드를 다음과 같이 수정해야 합니다.
kotlinpackage com.docqueue.domain.flow.controller
import com.docqueue.domain.flow.dto.QueueUpdateEvent
import com.docqueue.domain.flow.service.UserQueueService
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.reactor.asFlux
import org.springframework.http.MediaType
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController
import reactor.core.publisher.Flux
import java.time.Duration
@RestController
@RequestMapping("/api/v1/queue")
class QueueEventController(
private val userQueueService: UserQueueService
) {
/**
* Server-Sent Events를 통한 대기열 상태 업데이트 스트리밍
*/
@GetMapping(path = ["/events"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun streamQueueEvents(
@RequestParam(name = "queue", defaultValue = "default") queue: String,
@RequestParam(name = "user-id") userId: Long
): Flux<QueueUpdateEvent> {
return Flux.interval(Duration.ofSeconds(1))
.flatMap { userQueueService.getQueueStatus(queue, userId) }
.map { status -> QueueUpdateEvent(status.userRank, status.totalQueueSize, status.progress) }
.distinctUntilChanged()
}
/**
* Flow를 활용한 대기열 상태 업데이트 스트리밍
*/
@GetMapping(path = ["/flow-events"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun streamQueueEventsFlow(
@RequestParam(name = "queue", defaultValue = "default") queue: String,
@RequestParam(name = "user-id") userId: Long
): Flux<QueueUpdateEvent> {
val statusFlow: Flow<QueueUpdateEvent> = userQueueService
.getQueueStatusAsFlow(queue, userId)
.map { status -> QueueUpdateEvent(status.userRank, status.totalQueueSize, status.progress) }
return statusFlow.asFlux()
}
}
QueueUpdateEvent 클래스가 아직 정의되지 않은 것 같으므로, 다음과 같이 정의해야 합니다.
kotlinpackage com.docqueue.domain.flow.dto
data class QueueUpdateEvent(
val userRank: Long,
val totalQueueSize: Long,
val progress: Double
)
이렇게 수정하면 status.first, status.second, status.third와 관련된 오류가 해결됩니다.
QueueStatus 데이터 클래스의 필드명을 직접 사용하여 QueueUpdateEvent 객체를 생성하도록 변경했습니다.
이 프로젝트는 Spring WebFlux와 Kotlin 코루틴을 함께 사용하는 반응형 웹 애플리케이션입니다.
다음과 같은 세 가지 주요 문제가 있었습니다.
코루틴과 Reactor 패턴 혼용으로 인한 "Suspension functions can be called only within coroutine body" 오류
kotlin@Configuration
@EnableWebFluxSecurity
class SecurityConfig {
@Bean
fun securityWebFilterChain(http: ServerHttpSecurity): SecurityWebFilterChain {
return http
.csrf().disable()
.authorizeExchange()
.pathMatchers("/api/v1/queue/**").permitAll()
.pathMatchers("/waiting-room").permitAll()
.anyExchange().authenticated()
.and()
.build()
}
}
class UserQueueService(
private val userQueueRepository: UserQueueRepository,
private val tokenGenerator: TokenGenerator
) {
// suspend 함수 내에서 Reactor Mono 사용
suspend fun registerWaitQueue(queue: QueueName, userId: UserId): Mono<WaitingNumber> {
return userQueueRepository.findWaitingOrder(queue)
.flatMap { waitingOrder ->
userQueueRepository.addWaitQueue(queue, userId, waitingOrder + 1)
.map { waitingOrder + 1 }
}
}
// 코루틴과 Reactor 혼용 문제가 있는 메서드
suspend fun registerWaitingQueueOrGetQueueStatus(queue: QueueName, userId: UserId): QueueStatus {
return withContext(Dispatchers.IO) {
userQueueRepository.findUserWaitOrder(queue, userId)
.flatMap { userOrder ->
if (userOrder > 0) {
getQueueStatus(queue, userId)
} else {
registerWaitQueue(queue, userId)
.flatMap { _ -> getQueueStatus(queue, userId) }
}
}
.awaitSingle()
}
}
// 나머지 메소드들...
}
@RequestMapping("/api/v1/queue")
class QueueEventController(
private val userQueueService: UserQueueService
) {
@GetMapping(path = ["/events"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun streamQueueEvents(...): Flux<QueueUpdateEvent> {
return Flux.interval(Duration.ofSeconds(1))
.flatMap { userQueueService.getQueueStatus(queue, userId) }
.map { status -> QueueUpdateEvent(status.first, status.second, status.third) }
.distinctUntilChanged()
}
// 비슷한 방식으로 다른 메소드에서도 first, second, third 사용
}
Spring Security 6.1 이상에서는 람다 기반의 새로운 API를 사용해야 합니다. csrf().disable(), authorizeExchange(), and() 메소드가 모두 폐지되었습니다.
코루틴의 suspend 함수와 Reactor의 Mono를 함께 사용할 때는 적절한 연결 방법이 필요합니다. suspend 함수 내에서 다른 suspend 함수를 직접 호출할 수 있지만, Reactor의 flatMap 같은 연산자 내에서는 불가능합니다.
QueueStatus가 데이터 클래스로 변경되었으므로, Triple을 사용하는 대신 명시적인 필드 이름을 사용해야 합니다.
@EnableWebFluxSecurity
class SecurityConfig {
@Bean
fun securityWebFilterChain(http: ServerHttpSecurity): SecurityWebFilterChain {
return http
.csrf { csrf -> csrf.disable() }
.authorizeExchange { exchanges ->
exchanges
.pathMatchers("/api/v1/queue/**").permitAll()
.pathMatchers("/waiting-room").permitAll()
.anyExchange().authenticated()
}
.build()
}
}
class UserQueueService(
private val userQueueRepository: UserQueueRepository,
private val tokenGenerator: TokenGenerator
) {
// suspend 키워드 제거
fun registerWaitQueue(queue: QueueName, userId: UserId): Mono<WaitingNumber> {
return userQueueRepository.findWaitingOrder(queue)
.flatMap { waitingOrder ->
userQueueRepository.addWaitQueue(queue, userId, waitingOrder + 1)
.map { waitingOrder + 1 }
}
}
// 코루틴과 Reactor 연결 개선
suspend fun registerWaitingQueueOrGetQueueStatus(queue: QueueName, userId: UserId): QueueStatus {
return withContext(Dispatchers.IO) {
val userOrder = userQueueRepository.findUserWaitOrder(queue, userId).awaitSingle()
if (userOrder > 0) {
getQueueStatus(queue, userId).awaitSingle()
} else {
registerWaitQueue(queue, userId).awaitSingle()
getQueueStatus(queue, userId).awaitSingle()
}
}
}
// 나머지 메소드들...
}
@RequestMapping("/api/v1/queue")
class QueueEventController(
private val userQueueService: UserQueueService
) {
@GetMapping(path = ["/events"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun streamQueueEvents(...): Flux<QueueUpdateEvent> {
return Flux.interval(Duration.ofSeconds(1))
.flatMap { userQueueService.getQueueStatus(queue, userId) }
.map { status -> QueueUpdateEvent(status.userRank, status.totalQueueSize, status.progress) }
.distinctUntilChanged()
}
// 다른 메소드도 동일하게 변경
}
Spring Security 최신 API 적용: 람다 기반의 더 깔끔하고 타입 안전한 API 사용
코루틴과 Reactor 패턴 명확한 분리
Spring Security 6.1+ API 변경사항
코루틴과 Reactor 통합의 올바른 방법
명확한 타입 사용의 중요성
Spring WebFlux와 Kotlin 코루틴을 함께 사용하는 반응형 프로그래밍은 강력하지만, 두 패러다임을 올바르게 통합하는 것이 중요합니다.
이번 리팩토링을 통해 코드의 안정성을 높이고, 최신 API에 맞게 업데이트했으며, 두 비동기 프로그래밍 모델을 더 명확하게 사용할 수 있게 되었습니다.
특히 코루틴과 Reactor의 통합에 있어서는 각 패러다임의 특성을 이해하고, awaitSingle()과 같은 브릿지 함수를 적절히 사용하는 것이 중요합니다.
또한 데이터 모델을 정의할 때는 명확한 의미를 갖는 데이터 클래스를 사용하여 코드의 가독성과 유지보수성을 향상시키는 것이 좋습니다.
이러한 개선을 통해 코드의 품질이 향상되고, 개발자가 더 쉽게 이해하고 유지보수할 수 있게 되었습니다.