이번 글에서는 지난번에 기획하고 설계했던 Ficket 대기열 시스템의 실제 구현 과정을 공유하려고 합니다.
기획 단계에서 설정한 목표는 Kafka, Redis, WebSocket, WebFlux를 활용해 대기열 시스템을 만드는 것이었습니다.
이번 구현 과정에서는 설계를 기반으로 작성한 코드와 각 기술의 역할을 자세히 설명하겠습니다.
이번 구현 단계의 핵심은 아래와 같습니다:
currentTime
을 스코어로 사용하여 요청 순서를 보장.Kafka Producer는 사용자 요청을 Kafka 토픽에 메시지 형태로 발행하는 역할을 합니다.
대기열에서 요청의 순서를 보장하기 위해 요청 시간(currentTime
)을 메시지에 포함시켜 Redis ZSet에 저장될 때 정렬을 유지하도록 설계했습니다.
@Slf4j
@Service
@RequiredArgsConstructor
public class QueueProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
public void addQueue(String userId, String eventId) {
String topic = KeyHelper.getFicketKafkaQueue(eventId);
sendMessage(topic, userId);
}
private void sendMessage(String topic, String userId) {
long currentTime = Instant.now().toEpochMilli();
QueueMessage queueMessage = new QueueMessage(userId, currentTime);
try {
String messageJson = objectMapper.writeValueAsString(queueMessage);
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, messageJson);
future.whenComplete((result, ex) -> {
if (ex == null) {
RecordMetadata metadata = result.getRecordMetadata();
log.info("메시지 전송 성공 [User: {}, CurrentTime: {}, Partition: {}, Offset: {}]",
userId, currentTime, metadata.partition(), metadata.offset());
} else {
log.error("메시지 전송 실패 [User: {}, CurrentTime: {}, Error: {}]", userId, currentTime, ex.getMessage());
}
});
} catch (JsonProcessingException e) {
log.error("메시지 변환 실패 [User: {}, Error: {}]", userId, e.getMessage());
}
}
}
addQueue
:
sendMessage
:
currentTime
을 포함하여 이후 Redis ZSet에서 정렬을 보장합니다.Kafka Consumer는 Kafka 토픽에서 메시지를 읽어와 Redis ZSet에 저장하는 역할을 합니다.
여기서 중요한 점은 Kafka 메시지의 currentTime
을 Redis ZSet의 score
로 사용하여 요청 순서를 보장한다는 것입니다.
@Slf4j
@Service
@RequiredArgsConstructor
public class QueueConsumer {
@Qualifier("queueReactiveRedisTemplate")
private final ReactiveRedisTemplate<String, String> queueReactiveRedisTemplate;
private final ObjectMapper objectMapper;
@KafkaListener(topicPattern = "ficket-queue-.*", groupId = "ticketing-group", concurrency = "3")
public void consume(ConsumerRecord<String, String> record) {
String topic = record.topic();
String eventId = topic.replace("ficket-queue-", "");
String messageValue = record.value();
try {
QueueMessage queueMessage = objectMapper.readValue(messageValue, QueueMessage.class);
log.info("Kafka 메시지 수신 [Event: {}, User: {}, CurrentTime: {}]",
eventId, queueMessage.getUserId(), queueMessage.getCurrentTime());
String redisKey = KeyHelper.getFicketRedisQueue(eventId);
addToRedisZSetWithRetry(redisKey, queueMessage.getUserId(), eventId, queueMessage.getCurrentTime());
} catch (Exception e) {
log.error("Kafka 메시지 처리 실패 [Topic: {}, Message: {}, Error: {}]", topic, messageValue, e.getMessage());
}
}
private void addToRedisZSetWithRetry(String redisKey, String userId, String eventId, double score) {
Mono<Boolean> redisOperation = queueReactiveRedisTemplate.opsForZSet()
.add(redisKey, userId, score)
.doOnSuccess(added -> {
if (Boolean.TRUE.equals(added)) {
log.info("Redis ZSet에 대기열 추가 성공 [Event: {}, User: {}, Score: {}]", eventId, userId, score);
} else {
log.warn("Redis ZSet에 이미 존재하는 사용자 [Event: {}, User: {}]", eventId, userId);
}
})
.doOnError(ex -> log.error("Redis ZSet에 대기열 추가 실패 [Event: {}, User: {}, Error: {}]", eventId, userId, ex.getMessage()));
redisOperation.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10)))
.doOnError(ex -> log.error("Redis ZSet 재시도 실패: 대기열 추가 불가 [Event: {}, User: {}, Error: {}]", eventId, userId, ex.getMessage()))
.subscribe();
}
}
consume
:
QueueMessage
객체로 역직렬화합니다.member
로, 요청 시간을 score
로 추가합니다.addToRedisZSetWithRetry
:
QueueService
는 대기열 관리의 핵심 서비스로, 다음과 같은 기능을 제공합니다:
1. 사용자 요청 처리: Kafka를 통해 사용자 요청을 대기열에 추가.
2. 대기열 상태 관리:
QueueService
는 Redis ZSet을 활용하여 대기열의 순서를 보장하고, Mono
및 Flux
를 통해 비동기 방식으로 데이터 흐름을 처리합니다.
enterQueue
)역할:
코드:
public Mono<Void> enterQueue(String userId, String eventId) {
return Mono.fromRunnable(() -> queueProducer.addQueue(userId, eventId));
}
설명:
queueProducer.addQueue
: Kafka 토픽에 사용자 요청을 메시지로 발행합니다.Mono.fromRunnable
: 비동기적으로 Kafka Producer를 실행합니다.getUserPosition
)역할:
코드:
public Mono<Long> getUserPosition(String userId, String eventId) {
String redisKey = KeyHelper.getFicketRedisQueue(eventId);
return queueReactiveRedisTemplate.opsForZSet()
.rank(redisKey, userId)
.defaultIfEmpty(-1L)
.map(rank -> rank >= 0 ? rank + 1 : -1L); // 0-based index → 1-based index
}
설명:
rank
: Redis ZSet에서 주어진 사용자 ID의 순위를 조회합니다. (0-based index)defaultIfEmpty(-1L)
: 사용자 ID가 대기열에 없으면 -1L
을 반환합니다.getQueueSize
)역할:
코드:
public Mono<Long> getQueueSize(String eventId) {
String redisKey = KeyHelper.getFicketRedisQueue(eventId);
return queueReactiveRedisTemplate.opsForZSet().size(redisKey);
}
설명:
size
: Redis ZSet의 크기를 반환해 대기열에 남아 있는 사용자의 총 수를 확인합니다.isQueueEmpty
)역할:
코드:
public Mono<Boolean> isQueueEmpty(String eventId) {
String redisKey = KeyHelper.getFicketRedisQueue(eventId);
return queueReactiveRedisTemplate.opsForZSet()
.size(redisKey)
.publishOn(Schedulers.boundedElastic())
.map(queueSize -> queueSize == 0); // 대기열 크기가 0이면 true, 아니면 false 반환
}
설명:
size
: 대기열의 크기를 반환합니다.map(queueSize -> queueSize == 0)
: 크기가 0이면 대기열이 비어 있음을 나타내는 true
를 반환합니다.getNextUserInQueue
)역할:
popMin
을 사용해 가장 낮은 점수(=가장 오래된 요청)를 가진 사용자를 삭제하고 반환합니다.코드:
public Mono<String> getNextUserInQueue(String eventId) {
String queueKey = KeyHelper.getFicketRedisQueue(eventId);
return queueReactiveRedisTemplate.opsForZSet()
.popMin(queueKey) // ZSet에서 가장 낮은 score의 요소를 가져오고 삭제
.flatMap(tuple -> {
if (tuple == null || tuple.getValue() == null) {
log.warn("ZSet이 비어 있음: queueKey={}, eventId={}", queueKey, eventId);
return Mono.empty();
}
String userId = tuple.getValue(); // 가져온 사용자 ID
log.info("ZSet에서 사용자 가져오기 성공: userId={}, eventId={}", userId, eventId);
return Mono.just(userId); // 사용자 ID 반환
})
.doOnError(error -> log.error("ZSet 처리 중 오류 발생: queueKey={}, eventId={}, error={}", queueKey, eventId, error.getMessage()));
}
설명:
popMin
: ZSet에서 가장 낮은 점수(=가장 오래된 요청)를 가진 요소를 가져오고 삭제합니다.flatMap
: 가져온 사용자가 없을 경우 빈 결과(Mono.empty()
)를 반환합니다.leaveQueue
)역할:
코드:
public Mono<Void> leaveQueue(String userId, String eventId) {
String redisKey = KeyHelper.getFicketRedisQueue(eventId);
return queueReactiveRedisTemplate.opsForZSet()
.remove(redisKey, userId)
.flatMap(count -> {
if (count > 0) {
log.info("사용자가 대기열에서 제거되었습니다: userId={}, eventId={}", userId, eventId);
} else {
log.warn("대기열에서 사용자를 찾을 수 없습니다: userId={}, eventId={}", userId, eventId);
}
return Mono.empty();
});
}
설명:
remove
: ZSet에서 특정 사용자를 제거합니다.SlotService
는 슬롯을 관리하는 서비스를 제공합니다.
슬롯은 동시에 작업할 수 있는 최대 사용자 수를 제한하여 시스템의 과부하를 방지하고, 공정하고 안정적인 처리를 보장하기 위해 사용됩니다.
주요 기능은 다음과 같습니다:
1. 슬롯 초기화 및 삭제
2. 사용자별 슬롯 점유 및 해제
3. 작업 가능한 슬롯 수 확인 및 반환
Redis를 기반으로 설계되었으며, Reactive Redis Template을 활용해 논블로킹 방식으로 구현되었습니다.
initializeSlots
)역할:
activeKey
는 현재 점유된 슬롯 수, maxSlotKey
는 최대 슬롯 수를 나타냅니다.코드:
public Mono<Void> initializeSlots(String eventId, int maxSlots) {
String activeKey = KeyHelper.getActiveSlotKey(eventId);
String maxSlotKey = KeyHelper.getMaxSlotKey(eventId);
return slotReactiveRedisTemplate.opsForValue()
.set(activeKey, "0") // 활성 슬롯 초기화
.then(slotReactiveRedisTemplate.opsForValue()
.set(maxSlotKey, String.valueOf(maxSlots))) // 최대 슬롯 설정
.doOnSuccess(unused -> log.info("슬롯 초기화 완료: EventId={}, MaxSlots={}", eventId, maxSlots))
.doOnError(error -> log.error("슬롯 초기화 실패: EventId={}, Error={}", eventId, error.getMessage()));
}
deleteSlots
)역할:
코드:
public Mono<Void> deleteSlots(String eventId) {
String activeKey = KeyHelper.getActiveSlotKey(eventId);
String maxKey = KeyHelper.getMaxSlotKey(eventId);
return slotReactiveRedisTemplate.opsForValue().get(activeKey)
.defaultIfEmpty("0")
.flatMap(activeValue -> {
long activeCount = Long.parseLong(activeValue);
if (activeCount == 0) {
return slotReactiveRedisTemplate.delete(activeKey)
.then(slotReactiveRedisTemplate.delete(maxKey));
} else {
String errorMessage = String.format("슬롯 삭제 실패: 활성 슬롯이 0이 아님 (ActiveCount=%d)", activeCount);
log.error(errorMessage);
return Mono.error(new IllegalStateException(errorMessage));
}
});
}
occupySlot
)역할:
코드:
public Mono<Boolean> occupySlot(String userId, String eventId) {
String activeKey = KeyHelper.getActiveSlotKey(eventId);
String maxKey = KeyHelper.getMaxSlotKey(eventId);
return slotReactiveRedisTemplate.execute(
new DefaultRedisScript<>(OCCUPY_SLOT_SCRIPT, Boolean.class),
List.of(activeKey, maxKey)
).singleOrEmpty()
.flatMap(success -> {
if (Boolean.TRUE.equals(success)) {
return enterWorkSpace(userId, eventId).thenReturn(true);
}
return Mono.just(false);
});
}
releaseSlot
)역할:
코드:
public Mono<Void> releaseSlot(String eventId) {
String activeKey = KeyHelper.getActiveSlotKey(eventId);
return slotReactiveRedisTemplate.execute(
new DefaultRedisScript<>(RELEASE_SLOT_SCRIPT, Boolean.class),
List.of(activeKey)
).singleOrEmpty()
.doOnNext(success -> {
if (Boolean.TRUE.equals(success)) {
log.info("슬롯 해제 성공: EventId={}", eventId);
} else {
log.warn("슬롯 해제 실패 (이미 0): EventId={}", eventId);
}
}).then();
}
releaseSlotByUserId
)역할:
코드:
public Mono<Void> releaseSlotByUserId(String userId) {
String eventId = findEventIdByUserId(userId);
if (eventId == null) {
log.warn("슬롯 해제 실패: EventId를 찾을 수 없음 (UserId={})", userId);
return Mono.empty();
}
String workspaceKey = KeyHelper.getFicketWorkSpace(eventId, userId);
return releaseSlot(eventId)
.then(deleteWorkSpace(workspaceKey))
.doOnSuccess(unused -> log.info("슬롯 해제 완료: UserId={}, EventId={}", userId, eventId))
.doOnError(error -> log.error("슬롯 해제 중 오류: UserId={}, EventId={}, Error={}", userId, eventId, error.getMessage()));
}
hasAvailableSlot
)역할:
코드:
public Mono<Boolean> hasAvailableSlot(String eventId) {
String activeKey = KeyHelper.getActiveSlotKey(eventId);
String maxKey = KeyHelper.getMaxSlotKey(eventId);
return Mono.zip(
slotReactiveRedisTemplate.opsForValue().get(activeKey).defaultIfEmpty("0").map(Long::parseLong),
slotReactiveRedisTemplate.opsForValue().get(maxKey).defaultIfEmpty("0").map(Long::parseLong)
).map(tuple -> tuple.getT1() < tuple.getT2());
}
getAvailableSlots
)역할:
코드:
public Mono<Long> getAvailableSlots(String eventId) {
String maxSlotKey = KeyHelper.getMaxSlotKey(eventId);
String activeSlotKey = KeyHelper.getActiveSlotKey(eventId);
String queueKey = KeyHelper.getFicketRedisQueue(eventId);
Mono<Long> maxSlotsMono = slotReactiveRedisTemplate.opsForValue().get(maxSlotKey)
.map(Long::parseLong)
.defaultIfEmpty(0L);
Mono<Long> activeSlotsMono = slotReactiveRedisTemplate.opsForValue().get(activeSlotKey)
.map(Long::parseLong)
.defaultIfEmpty(0L);
Mono<Long> queueSizeMono = queueReactiveRedisTemplate.opsForZSet()
.size(queueKey)
.defaultIfEmpty(0L);
return Mono.zip(maxSlotsMono, activeSlotsMono, queueSizeMono)
.map(tuple -> {
long maxSlots = tuple.getT1();
long activeSlots = tuple.getT2();
long queueSize = tuple.getT3();
return Math.min(maxSlots - activeSlots, queueSize);
});
}
## 7. WebSocket: 실시간 상태 업데이트
### 1. 역할 및 목적
WebSocket은 사용자와 시스템 간의 실시간 통신을 가능하게 하여, **사용자의 대기열 상태 업데이트** 및 **슬롯 상태 관리**를 실시간으로 제공합니다.
`WebSocketConfig`는 WebSocket 핸들러와 URL 매핑을 설정하며, `QueueStatusWebSocketHandler`는 WebSocket 연결을 통해 대기열 상태를 관리하고 사용자에게 업데이트 정보를 전달합니다.
---
### 2. WebSocketConfig 설정
#### 역할 및 주의사항
`WebSocketConfig`는 WebSocket 핸들러의 경로를 매핑합니다.
**중요**: Spring WebFlux는 WebSocket 의존성이 포함되어 있으므로, 별도로 WebSocket 의존성을 추가하면 안 됩니다.
#### 코드 및 설명
```java
@Configuration
@RequiredArgsConstructor
public class WebSocketConfig {
private final QueueStatusWebSocketHandler queueStatusWebSocketHandler;
@Bean
public HandlerMapping webSocketHandlerMapping() {
Map<String, WebSocketHandler> handlerMap = new HashMap<>();
handlerMap.put("/queue-status/*", queueStatusWebSocketHandler);
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(handlerMap);
mapping.setOrder(-1); // WebSocket 핸들러의 우선 순위를 설정
mapping.setCorsConfigurationSource(corsConfigurationSource()); // CORS 설정 추가
return mapping;
}
private CorsConfigurationSource corsConfigurationSource() {
return exchange -> {
String path = exchange.getRequest().getURI().getPath(); // 요청 URI 경로 가져오기
if (path.startsWith("/queue-status")) {
CorsConfiguration corsConfig = new CorsConfiguration();
corsConfig.addAllowedOrigin("http://localhost:5173");
corsConfig.addAllowedOrigin("http://localhost:8089");
corsConfig.addAllowedMethod("GET");
corsConfig.addAllowedMethod("POST");
corsConfig.addAllowedMethod("PUT");
corsConfig.addAllowedMethod("DELETE");
corsConfig.addAllowedMethod("OPTIONS");
corsConfig.addAllowedMethod("HEAD");
corsConfig.addAllowedHeader("*");
corsConfig.setAllowCredentials(true);
return corsConfig;
}
return null;
};
}
}
queueStatusWebSocketHandler
: WebSocket 연결 요청 시, 대기열 상태를 처리하는 핸들러.handlerMap
: WebSocket 경로(/queue-status/*
)와 핸들러를 매핑.setOrder(-1)
: WebSocket 핸들러의 우선순위를 설정하여 일반 HTTP 핸들러보다 먼저 처리되도록 설정.QueueStatusWebSocketHandler
는 다음과 같은 기능을 수행합니다:
1. WebSocket 연결 및 해제 관리:
handle
메서드)코드:
@NotNull
@Override
public Mono<Void> handle(@NotNull WebSocketSession session) {
String eventId = WebSocketUrlParser.getInfoFromUri(session);
String userId = session.getHandshakeInfo().getHeaders().getFirst("X-User-Id");
if (userId == null || userId.isEmpty()) {
log.warn("X-User-Id 헤더가 누락됨. 세션 ID: {}", session.getId());
return session.close().then();
}
sessionUserMap.put(session.getId(), userId);
eventSessions.computeIfAbsent(eventId, key -> new CopyOnWriteArraySet<>()).add(session);
log.info("WebSocket 연결: 세션 ID={}, 사용자 ID={}, 이벤트 ID={}", session.getId(), userId, eventId);
startMonitorIfNeeded(eventId);
// 상태 업데이트 주기적으로 실행
Mono<Void> periodicUpdates = sendQueueStatusPeriodically(session, eventId);
// 메시지 수신 처리
Mono<Void> messageHandling = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(payload -> handleMessage(session, eventId, payload))
.then();
// WebSocket 종료 처리
Mono<Void> disconnectionHandling = Mono.fromRunnable(() -> handleDisconnect(session, eventId));
return Mono.when(periodicUpdates, messageHandling)
.publishOn(Schedulers.boundedElastic())
.doFinally(signal -> disconnectionHandling.subscribe());
}
설명:
1. 사용자 세션 등록: sessionUserMap
과 eventSessions
에 사용자 세션 정보 등록.
2. 모니터링 시작: 해당 이벤트에 대해 상태 모니터링을 시작.
3. 상태 업데이트 주기 실행: sendQueueStatusPeriodically
를 통해 5초마다 사용자 상태 전송.
4. 종료 처리: WebSocket 연결 종료 시 대기열에서 사용자 제거 및 슬롯 해제.
sendQueueStatusPeriodically
)코드:
private Mono<Void> sendQueueStatusPeriodically(WebSocketSession session, String eventId) {
String userId = sessionUserMap.get(session.getId());
if (userId == null) {
log.warn("사용자를 찾을 수 없음: 세션 ID={}", session.getId());
closeSession(session);
return Mono.empty();
}
return Flux.interval(Duration.ofSeconds(5)) // 5초 간격으로 상태 전송
.takeWhile(tick -> session.isOpen()) // 세션이 열려 있는 동안만 실행
.flatMap(tick -> sendQueueStatus(session, eventId)) // 상태 업데이트
.then();
}
설명:
Flux.interval
: 5초마다 실행.takeWhile(session.isOpen())
: WebSocket 세션이 열려 있는 동안에만 실행.sendQueueStatus
: 사용자에게 대기열 상태를 전송.sendQueueStatus
)코드:
private Mono<Void> sendQueueStatus(WebSocketSession session, String eventId) {
String userId = sessionUserMap.get(session.getId());
if (userId == null) {
log.warn("사용자를 찾을 수 없음: 세션 ID={}", session.getId());
closeSession(session);
return Mono.empty();
}
return queueService.getQueueSize(eventId)
.flatMap(totalQueue -> queueService.getUserPosition(userId, eventId)
.flatMap(position -> {
QueueStatus status = getQueueStatus(position); // 사용자 상태 계산
String message = buildResponse(userId, eventId, position, totalQueue, status); // 메시지 생성
return sendWebSocketMessage(session, message); // WebSocket 메시지 전송
})
)
.doOnError(error -> log.error("대기열 상태 전송 실패: 세션 ID={}, 이벤트 ID={}, 오류={}", session.getId(), eventId, error.getMessage()))
.onErrorResume(error -> {
closeSession(session); // 오류 발생 시 세션 닫기
return Mono.empty();
});
}
설명:
queueService.getQueueSize
: Redis에서 대기열의 전체 크기를 가져옴.queueService.getUserPosition
: 사용자의 대기열 위치를 가져옴.sendWebSocketMessage
: WebSocket 메시지로 사용자 상태를 전송.monitorAndHandleQueue
)코드:
private Mono<Void> monitorAndHandleQueue(String eventId) {
return Flux.interval(Duration.ofSeconds(5)) // 5초 주기로 슬롯 상태 확인
.takeWhile(tick -> eventSessions.containsKey(eventId) && !eventSessions.get(eventId).isEmpty())
.flatMap(tick -> slotService.hasAvailableSlot(eventId)
.flatMapMany(canEnter -> {
if (canEnter) {
return processQueue(eventId);
}
return Flux.empty();
})
)
.then()
.doOnTerminate(() -> log.info("이벤트 ID={} 모니터링 작업이 종료되었습니다.", eventId));
}
설명:
1. Flux.interval
: 5초마다 슬롯 상태를 확인.
2. slotService.hasAvailableSlot
: 남은 슬롯 여부 확인.
3. processQueue
: 대기열에서 다음 사용자를 처리.
이번 글에서는 Ficket 대기열 시스템의 핵심 구현 과정을 다뤘습니다. Kafka를 활용한 메시지 큐와 Redis ZSet을 통한 대기열 정렬, WebSocket을 이용한 실시간 상태 업데이트 등 설계 단계에서 정의한 목표를 실제 코드로 구현해 봤습니다.
다음 글에서는 Locust를 활용한 성능 테스트와 최적화 과정을 자세히 다룰 예정입니다. 읽어주셔서 감사합니다!
Reference