대기열 구현 (2) - Kafka, Redis, WebSocket, WebFlux를 활용한 대기열 관리

오형상·2025년 1월 20일
0

Ficket

목록 보기
27/27

이번 글에서는 지난번에 기획하고 설계했던 Ficket 대기열 시스템의 실제 구현 과정을 공유하려고 합니다.
기획 단계에서 설정한 목표는 Kafka, Redis, WebSocket, WebFlux를 활용해 대기열 시스템을 만드는 것이었습니다.

이번 구현 과정에서는 설계를 기반으로 작성한 코드와 각 기술의 역할을 자세히 설명하겠습니다.


2. 구현 개요

이번 구현 단계의 핵심은 아래와 같습니다:

  1. Kafka Producer: 사용자 요청을 Kafka 토픽에 메시지로 발행.
  2. Kafka Consumer: 메시지를 처리하여 Redis ZSet에 저장, 정렬된 대기열 유지.
  3. Redis ZSet: Kafka 메시지의 currentTime을 스코어로 사용하여 요청 순서를 보장.
  4. WebSocket: 사용자에게 실시간 대기열 상태 전달.

3. Kafka Producer 구현

1. 역할 및 목적

Kafka Producer는 사용자 요청을 Kafka 토픽에 메시지 형태로 발행하는 역할을 합니다.
대기열에서 요청의 순서를 보장하기 위해 요청 시간(currentTime)을 메시지에 포함시켜 Redis ZSet에 저장될 때 정렬을 유지하도록 설계했습니다.

2. 코드

@Slf4j
@Service
@RequiredArgsConstructor
public class QueueProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ObjectMapper objectMapper;

    public void addQueue(String userId, String eventId) {
        String topic = KeyHelper.getFicketKafkaQueue(eventId);
        sendMessage(topic, userId);
    }

    private void sendMessage(String topic, String userId) {
        long currentTime = Instant.now().toEpochMilli();
        QueueMessage queueMessage = new QueueMessage(userId, currentTime);

        try {
            String messageJson = objectMapper.writeValueAsString(queueMessage);
            CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, messageJson);

            future.whenComplete((result, ex) -> {
                if (ex == null) {
                    RecordMetadata metadata = result.getRecordMetadata();
                    log.info("메시지 전송 성공 [User: {}, CurrentTime: {}, Partition: {}, Offset: {}]",
                            userId, currentTime, metadata.partition(), metadata.offset());
                } else {
                    log.error("메시지 전송 실패 [User: {}, CurrentTime: {}, Error: {}]", userId, currentTime, ex.getMessage());
                }
            });
        } catch (JsonProcessingException e) {
            log.error("메시지 변환 실패 [User: {}, Error: {}]", userId, e.getMessage());
        }
    }
}

3. 설명

  1. addQueue:

    • 사용자 요청을 받아 해당 이벤트의 Kafka 토픽으로 메시지를 발행합니다.
    • 토픽 이름은 이벤트 ID를 기준으로 동적으로 생성됩니다.
  2. sendMessage:

    • 메시지에 사용자 ID와 요청 시간을 포함하여 JSON으로 직렬화한 뒤 Kafka 토픽에 전송합니다.
    • Kafka의 병렬 처리 특성으로 인해 메시지 순서가 뒤섞일 수 있으므로 currentTime을 포함하여 이후 Redis ZSet에서 정렬을 보장합니다.
    • 메시지 전송 성공/실패 여부를 로깅합니다.

4. Kafka Consumer 구현

1. 역할 및 목적

Kafka Consumer는 Kafka 토픽에서 메시지를 읽어와 Redis ZSet에 저장하는 역할을 합니다.
여기서 중요한 점은 Kafka 메시지의 currentTime을 Redis ZSet의 score로 사용하여 요청 순서를 보장한다는 것입니다.

2. 코드

@Slf4j
@Service
@RequiredArgsConstructor
public class QueueConsumer {

    @Qualifier("queueReactiveRedisTemplate")
    private final ReactiveRedisTemplate<String, String> queueReactiveRedisTemplate;
    private final ObjectMapper objectMapper;

    @KafkaListener(topicPattern = "ficket-queue-.*", groupId = "ticketing-group", concurrency = "3")
    public void consume(ConsumerRecord<String, String> record) {
        String topic = record.topic();
        String eventId = topic.replace("ficket-queue-", "");
        String messageValue = record.value();

        try {
            QueueMessage queueMessage = objectMapper.readValue(messageValue, QueueMessage.class);
            log.info("Kafka 메시지 수신 [Event: {}, User: {}, CurrentTime: {}]",
                    eventId, queueMessage.getUserId(), queueMessage.getCurrentTime());

            String redisKey = KeyHelper.getFicketRedisQueue(eventId);
            addToRedisZSetWithRetry(redisKey, queueMessage.getUserId(), eventId, queueMessage.getCurrentTime());

        } catch (Exception e) {
            log.error("Kafka 메시지 처리 실패 [Topic: {}, Message: {}, Error: {}]", topic, messageValue, e.getMessage());
        }
    }

    private void addToRedisZSetWithRetry(String redisKey, String userId, String eventId, double score) {
        Mono<Boolean> redisOperation = queueReactiveRedisTemplate.opsForZSet()
                .add(redisKey, userId, score)
                .doOnSuccess(added -> {
                    if (Boolean.TRUE.equals(added)) {
                        log.info("Redis ZSet에 대기열 추가 성공 [Event: {}, User: {}, Score: {}]", eventId, userId, score);
                    } else {
                        log.warn("Redis ZSet에 이미 존재하는 사용자 [Event: {}, User: {}]", eventId, userId);
                    }
                })
                .doOnError(ex -> log.error("Redis ZSet에 대기열 추가 실패 [Event: {}, User: {}, Error: {}]", eventId, userId, ex.getMessage()));

        redisOperation.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10)))
                .doOnError(ex -> log.error("Redis ZSet 재시도 실패: 대기열 추가 불가 [Event: {}, User: {}, Error: {}]", eventId, userId, ex.getMessage()))
                .subscribe();
    }
}

3. 설명

  1. consume:

    • Kafka에서 메시지를 읽어와 JSON 데이터를 QueueMessage 객체로 역직렬화합니다.
    • Redis ZSet에 사용자 ID를 member로, 요청 시간을 score로 추가합니다.
  2. addToRedisZSetWithRetry:

    • Redis ZSet에서 동일한 사용자 ID를 중복으로 추가하지 않도록 제어합니다.
    • 메시지 처리 실패 시 최대 3회까지 재시도를 수행합니다.

5. QueueService: 대기열 관리

1. 역할 및 목적

QueueService대기열 관리의 핵심 서비스로, 다음과 같은 기능을 제공합니다:
1. 사용자 요청 처리: Kafka를 통해 사용자 요청을 대기열에 추가.
2. 대기열 상태 관리:

  • 사용자의 대기열 위치 조회.
  • 전체 대기열 크기 확인.
  1. 대기열 작업 처리:
    • 다음 사용자 가져오기.
    • 대기열에서 사용자 제거.

QueueServiceRedis ZSet을 활용하여 대기열의 순서를 보장하고, MonoFlux를 통해 비동기 방식으로 데이터 흐름을 처리합니다.

2. 주요 메서드 설명

(1) 대기열에 사용자 추가 (enterQueue)

역할:

  • 사용자를 Kafka Producer를 통해 대기열에 추가합니다.
  • Kafka를 활용하여 확장성과 안정성을 보장합니다.

코드:

public Mono<Void> enterQueue(String userId, String eventId) {
    return Mono.fromRunnable(() -> queueProducer.addQueue(userId, eventId));
}

설명:

  • queueProducer.addQueue: Kafka 토픽에 사용자 요청을 메시지로 발행합니다.
  • Mono.fromRunnable: 비동기적으로 Kafka Producer를 실행합니다.

(2) 사용자 대기열 위치 조회 (getUserPosition)

역할:

  • 주어진 사용자 ID에 해당하는 대기열에서의 위치를 조회합니다.

코드:

public Mono<Long> getUserPosition(String userId, String eventId) {
    String redisKey = KeyHelper.getFicketRedisQueue(eventId);
    return queueReactiveRedisTemplate.opsForZSet()
            .rank(redisKey, userId)
            .defaultIfEmpty(-1L)
            .map(rank -> rank >= 0 ? rank + 1 : -1L); // 0-based index → 1-based index
}

설명:

  • rank: Redis ZSet에서 주어진 사용자 ID의 순위를 조회합니다. (0-based index)
  • defaultIfEmpty(-1L): 사용자 ID가 대기열에 없으면 -1L을 반환합니다.
  • 1-based index로 변환: 순위에 1을 더해 사용자 친화적인 번호로 반환합니다.

(3) 대기열 크기 조회 (getQueueSize)

역할:

  • 특정 이벤트에 대한 대기열의 전체 크기를 반환합니다.

코드:

public Mono<Long> getQueueSize(String eventId) {
    String redisKey = KeyHelper.getFicketRedisQueue(eventId);
    return queueReactiveRedisTemplate.opsForZSet().size(redisKey);
}

설명:

  • size: Redis ZSet의 크기를 반환해 대기열에 남아 있는 사용자의 총 수를 확인합니다.

(4) 대기열 여부 확인 (isQueueEmpty)

역할:

  • 특정 이벤트에 대한 대기열이 비어 있는지 확인합니다.

코드:

public Mono<Boolean> isQueueEmpty(String eventId) {
    String redisKey = KeyHelper.getFicketRedisQueue(eventId);
    return queueReactiveRedisTemplate.opsForZSet()
            .size(redisKey)
            .publishOn(Schedulers.boundedElastic())
            .map(queueSize -> queueSize == 0); // 대기열 크기가 0이면 true, 아니면 false 반환
}

설명:

  • size: 대기열의 크기를 반환합니다.
  • map(queueSize -> queueSize == 0): 크기가 0이면 대기열이 비어 있음을 나타내는 true를 반환합니다.

(5) 대기열에서 다음 사용자 가져오기 (getNextUserInQueue)

역할:

  • 대기열에서 가장 오래 대기한 사용자를 가져옵니다.
  • Redis ZSet의 popMin을 사용해 가장 낮은 점수(=가장 오래된 요청)를 가진 사용자를 삭제하고 반환합니다.

코드:

public Mono<String> getNextUserInQueue(String eventId) {
    String queueKey = KeyHelper.getFicketRedisQueue(eventId);

    return queueReactiveRedisTemplate.opsForZSet()
            .popMin(queueKey) // ZSet에서 가장 낮은 score의 요소를 가져오고 삭제
            .flatMap(tuple -> {
                if (tuple == null || tuple.getValue() == null) {
                    log.warn("ZSet이 비어 있음: queueKey={}, eventId={}", queueKey, eventId);
                    return Mono.empty();
                }
                String userId = tuple.getValue(); // 가져온 사용자 ID
                log.info("ZSet에서 사용자 가져오기 성공: userId={}, eventId={}", userId, eventId);
                return Mono.just(userId); // 사용자 ID 반환
            })
            .doOnError(error -> log.error("ZSet 처리 중 오류 발생: queueKey={}, eventId={}, error={}", queueKey, eventId, error.getMessage()));
}

설명:

  • popMin: ZSet에서 가장 낮은 점수(=가장 오래된 요청)를 가진 요소를 가져오고 삭제합니다.
  • flatMap: 가져온 사용자가 없을 경우 빈 결과(Mono.empty())를 반환합니다.

(6) 대기열 나가기 (leaveQueue)

역할:

  • 특정 사용자를 대기열에서 제거합니다.

코드:

public Mono<Void> leaveQueue(String userId, String eventId) {
    String redisKey = KeyHelper.getFicketRedisQueue(eventId);
    return queueReactiveRedisTemplate.opsForZSet()
            .remove(redisKey, userId)
            .flatMap(count -> {
                if (count > 0) {
                    log.info("사용자가 대기열에서 제거되었습니다: userId={}, eventId={}", userId, eventId);
                } else {
                    log.warn("대기열에서 사용자를 찾을 수 없습니다: userId={}, eventId={}", userId, eventId);
                }
                return Mono.empty();
            });
}

설명:

  • remove: ZSet에서 특정 사용자를 제거합니다.
  • 결과 확인: 제거된 사용자가 있는 경우 성공 로그를, 없는 경우 경고 로그를 출력합니다.

6. SlotService: 슬롯 관리

1. 역할 및 목적

SlotService슬롯을 관리하는 서비스를 제공합니다.
슬롯은 동시에 작업할 수 있는 최대 사용자 수를 제한하여 시스템의 과부하를 방지하고, 공정하고 안정적인 처리를 보장하기 위해 사용됩니다.

주요 기능은 다음과 같습니다:
1. 슬롯 초기화 및 삭제
2. 사용자별 슬롯 점유 및 해제
3. 작업 가능한 슬롯 수 확인 및 반환

Redis를 기반으로 설계되었으며, Reactive Redis Template을 활용해 논블로킹 방식으로 구현되었습니다.

2. 주요 메서드

(1) 슬롯 초기화 (initializeSlots)

역할:

  • 주어진 이벤트 ID에 대해 슬롯 정보를 초기화합니다.
  • activeKey는 현재 점유된 슬롯 수, maxSlotKey는 최대 슬롯 수를 나타냅니다.

코드:

public Mono<Void> initializeSlots(String eventId, int maxSlots) {
    String activeKey = KeyHelper.getActiveSlotKey(eventId);
    String maxSlotKey = KeyHelper.getMaxSlotKey(eventId);
    return slotReactiveRedisTemplate.opsForValue()
            .set(activeKey, "0") // 활성 슬롯 초기화
            .then(slotReactiveRedisTemplate.opsForValue()
                    .set(maxSlotKey, String.valueOf(maxSlots))) // 최대 슬롯 설정
            .doOnSuccess(unused -> log.info("슬롯 초기화 완료: EventId={}, MaxSlots={}", eventId, maxSlots))
            .doOnError(error -> log.error("슬롯 초기화 실패: EventId={}, Error={}", eventId, error.getMessage()));
}

(2) 슬롯 제거 (deleteSlots)

역할:

  • 슬롯 정보를 삭제합니다.
  • 현재 점유된 슬롯이 0일 때만 삭제할 수 있습니다.

코드:

public Mono<Void> deleteSlots(String eventId) {
    String activeKey = KeyHelper.getActiveSlotKey(eventId);
    String maxKey = KeyHelper.getMaxSlotKey(eventId);

    return slotReactiveRedisTemplate.opsForValue().get(activeKey)
            .defaultIfEmpty("0")
            .flatMap(activeValue -> {
                long activeCount = Long.parseLong(activeValue);
                if (activeCount == 0) {
                    return slotReactiveRedisTemplate.delete(activeKey)
                            .then(slotReactiveRedisTemplate.delete(maxKey));
                } else {
                    String errorMessage = String.format("슬롯 삭제 실패: 활성 슬롯이 0이 아님 (ActiveCount=%d)", activeCount);
                    log.error(errorMessage);
                    return Mono.error(new IllegalStateException(errorMessage));
                }
            });
}

(3) 슬롯 점유 (occupySlot)

역할:

  • 사용자가 슬롯을 점유하려고 할 때 호출됩니다.
  • Redis Lua 스크립트를 사용해 현재 활성 슬롯 수와 최대 슬롯 수를 비교하고, 슬롯 점유 여부를 결정합니다.
  • 슬롯 점유에 성공하면 해당 사용자를 작업 공간에 등록합니다.

코드:

public Mono<Boolean> occupySlot(String userId, String eventId) {
    String activeKey = KeyHelper.getActiveSlotKey(eventId);
    String maxKey = KeyHelper.getMaxSlotKey(eventId);

    return slotReactiveRedisTemplate.execute(
            new DefaultRedisScript<>(OCCUPY_SLOT_SCRIPT, Boolean.class),
            List.of(activeKey, maxKey)
    ).singleOrEmpty()
    .flatMap(success -> {
        if (Boolean.TRUE.equals(success)) {
            return enterWorkSpace(userId, eventId).thenReturn(true);
        }
        return Mono.just(false);
    });
}

(4) 슬롯 해제 (releaseSlot)

역할:

  • 이벤트 ID에 해당하는 슬롯을 하나 해제합니다.
  • Redis Lua 스크립트를 사용해 활성 슬롯 수를 감소시키고, 그 결과를 반환합니다.

코드:

public Mono<Void> releaseSlot(String eventId) {
    String activeKey = KeyHelper.getActiveSlotKey(eventId);

    return slotReactiveRedisTemplate.execute(
            new DefaultRedisScript<>(RELEASE_SLOT_SCRIPT, Boolean.class),
            List.of(activeKey)
    ).singleOrEmpty()
    .doOnNext(success -> {
        if (Boolean.TRUE.equals(success)) {
            log.info("슬롯 해제 성공: EventId={}", eventId);
        } else {
            log.warn("슬롯 해제 실패 (이미 0): EventId={}", eventId);
        }
    }).then();
}

(5) 사용자 기반 슬롯 해제 (releaseSlotByUserId)

역할:

  • 특정 사용자의 슬롯 점유를 해제하고, 해당 사용자의 작업 공간을 삭제합니다.

코드:

public Mono<Void> releaseSlotByUserId(String userId) {
    String eventId = findEventIdByUserId(userId);

    if (eventId == null) {
        log.warn("슬롯 해제 실패: EventId를 찾을 수 없음 (UserId={})", userId);
        return Mono.empty();
    }

    String workspaceKey = KeyHelper.getFicketWorkSpace(eventId, userId);

    return releaseSlot(eventId)
            .then(deleteWorkSpace(workspaceKey))
            .doOnSuccess(unused -> log.info("슬롯 해제 완료: UserId={}, EventId={}", userId, eventId))
            .doOnError(error -> log.error("슬롯 해제 중 오류: UserId={}, EventId={}, Error={}", userId, eventId, error.getMessage()));
}

(6) 작업 가능한 슬롯 확인 (hasAvailableSlot)

역할:

  • 현재 이벤트에 작업 가능한 슬롯이 있는지 확인합니다.
  • 활성 슬롯 수가 최대 슬롯 수보다 작은지 비교합니다.

코드:

public Mono<Boolean> hasAvailableSlot(String eventId) {
    String activeKey = KeyHelper.getActiveSlotKey(eventId);
    String maxKey = KeyHelper.getMaxSlotKey(eventId);

    return Mono.zip(
            slotReactiveRedisTemplate.opsForValue().get(activeKey).defaultIfEmpty("0").map(Long::parseLong),
            slotReactiveRedisTemplate.opsForValue().get(maxKey).defaultIfEmpty("0").map(Long::parseLong)
    ).map(tuple -> tuple.getT1() < tuple.getT2());
}

(7) 작업 가능한 슬롯 수 반환 (getAvailableSlots)

역할:

  • 주어진 이벤트 ID에 대해 현재 작업 가능한 슬롯 수를 반환합니다.
  • 남은 슬롯 수와 대기열 크기를 비교하여 최소값을 반환합니다.

코드:

public Mono<Long> getAvailableSlots(String eventId) {
    String maxSlotKey = KeyHelper.getMaxSlotKey(eventId);
    String activeSlotKey = KeyHelper.getActiveSlotKey(eventId);
    String queueKey = KeyHelper.getFicketRedisQueue(eventId);

    Mono<Long> maxSlotsMono = slotReactiveRedisTemplate.opsForValue().get(maxSlotKey)
            .map(Long::parseLong)
            .defaultIfEmpty(0L);

    Mono<Long> activeSlotsMono = slotReactiveRedisTemplate.opsForValue().get(activeSlotKey)
            .map(Long::parseLong)
            .defaultIfEmpty(0L);

    Mono<Long> queueSizeMono = queueReactiveRedisTemplate.opsForZSet()
            .size(queueKey)
            .defaultIfEmpty(0L);

    return Mono.zip(maxSlotsMono, activeSlotsMono, queueSizeMono)
            .map(tuple -> {
                long maxSlots = tuple.getT1();
                long activeSlots = tuple.getT2();
                long queueSize = tuple.getT3();
                return Math.min(maxSlots - activeSlots, queueSize);
            });
}

## 7. WebSocket: 실시간 상태 업데이트

### 1. 역할 및 목적

WebSocket은 사용자와 시스템 간의 실시간 통신을 가능하게 하여, **사용자의 대기열 상태 업데이트** 및 **슬롯 상태 관리**를 실시간으로 제공합니다.  
`WebSocketConfig`는 WebSocket 핸들러와 URL 매핑을 설정하며, `QueueStatusWebSocketHandler`는 WebSocket 연결을 통해 대기열 상태를 관리하고 사용자에게 업데이트 정보를 전달합니다.

---

### 2. WebSocketConfig 설정

#### 역할 및 주의사항

`WebSocketConfig`는 WebSocket 핸들러의 경로를 매핑합니다.  
**중요**: Spring WebFlux는 WebSocket 의존성이 포함되어 있으므로, 별도로 WebSocket 의존성을 추가하면 안 됩니다.

#### 코드 및 설명
```java
@Configuration
@RequiredArgsConstructor
public class WebSocketConfig {
    private final QueueStatusWebSocketHandler queueStatusWebSocketHandler;

    @Bean
    public HandlerMapping webSocketHandlerMapping() {
        Map<String, WebSocketHandler> handlerMap = new HashMap<>();
        handlerMap.put("/queue-status/*", queueStatusWebSocketHandler);

        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(handlerMap);
        mapping.setOrder(-1); // WebSocket 핸들러의 우선 순위를 설정
        mapping.setCorsConfigurationSource(corsConfigurationSource()); // CORS 설정 추가
        return mapping;
    }
    
    private CorsConfigurationSource corsConfigurationSource() {
        return exchange -> {
            String path = exchange.getRequest().getURI().getPath(); // 요청 URI 경로 가져오기
            if (path.startsWith("/queue-status")) {
                CorsConfiguration corsConfig = new CorsConfiguration();
                corsConfig.addAllowedOrigin("http://localhost:5173");
                corsConfig.addAllowedOrigin("http://localhost:8089");
                corsConfig.addAllowedMethod("GET");
                corsConfig.addAllowedMethod("POST");
                corsConfig.addAllowedMethod("PUT");
                corsConfig.addAllowedMethod("DELETE");
                corsConfig.addAllowedMethod("OPTIONS");
                corsConfig.addAllowedMethod("HEAD");
                corsConfig.addAllowedHeader("*");
                corsConfig.setAllowCredentials(true);
                return corsConfig;
            }
            return null;
        };
    }
}
  • queueStatusWebSocketHandler: WebSocket 연결 요청 시, 대기열 상태를 처리하는 핸들러.
  • handlerMap: WebSocket 경로(/queue-status/*)와 핸들러를 매핑.
  • setOrder(-1): WebSocket 핸들러의 우선순위를 설정하여 일반 HTTP 핸들러보다 먼저 처리되도록 설정.

3. QueueStatusWebSocketHandler 구현

역할 및 주요 기능

QueueStatusWebSocketHandler는 다음과 같은 기능을 수행합니다:
1. WebSocket 연결 및 해제 관리:

  • 사용자 세션 등록 및 해제.
  • 연결 종료 시, 사용자를 대기열에서 제거하고 슬롯 해제.
  1. 주기적인 상태 업데이트:
    • Redis에서 사용자 상태를 가져와 5초마다 사용자에게 전달.
  2. 슬롯 및 대기열 상태 모니터링:
    • 남은 슬롯 수를 확인하고, 대기열에서 다음 사용자를 처리.
  3. 실시간 메시지 전송:
    • 사용자에게 대기열 상태를 WebSocket 메시지로 전송.

주요 메서드와 설명

(1) WebSocket 연결 관리 (handle 메서드)

코드:

@NotNull
@Override
public Mono<Void> handle(@NotNull WebSocketSession session) {
    String eventId = WebSocketUrlParser.getInfoFromUri(session);
    String userId = session.getHandshakeInfo().getHeaders().getFirst("X-User-Id");

    if (userId == null || userId.isEmpty()) {
        log.warn("X-User-Id 헤더가 누락됨. 세션 ID: {}", session.getId());
        return session.close().then();
    }

    sessionUserMap.put(session.getId(), userId);
    eventSessions.computeIfAbsent(eventId, key -> new CopyOnWriteArraySet<>()).add(session);

    log.info("WebSocket 연결: 세션 ID={}, 사용자 ID={}, 이벤트 ID={}", session.getId(), userId, eventId);

    startMonitorIfNeeded(eventId);

    // 상태 업데이트 주기적으로 실행
    Mono<Void> periodicUpdates = sendQueueStatusPeriodically(session, eventId);

    // 메시지 수신 처리
    Mono<Void> messageHandling = session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .doOnNext(payload -> handleMessage(session, eventId, payload))
            .then();

    // WebSocket 종료 처리
    Mono<Void> disconnectionHandling = Mono.fromRunnable(() -> handleDisconnect(session, eventId));

    return Mono.when(periodicUpdates, messageHandling)
            .publishOn(Schedulers.boundedElastic())
            .doFinally(signal -> disconnectionHandling.subscribe());
}

설명:
1. 사용자 세션 등록: sessionUserMapeventSessions에 사용자 세션 정보 등록.
2. 모니터링 시작: 해당 이벤트에 대해 상태 모니터링을 시작.
3. 상태 업데이트 주기 실행: sendQueueStatusPeriodically를 통해 5초마다 사용자 상태 전송.
4. 종료 처리: WebSocket 연결 종료 시 대기열에서 사용자 제거 및 슬롯 해제.


(2) 상태 업데이트 주기 실행 (sendQueueStatusPeriodically)

코드:

private Mono<Void> sendQueueStatusPeriodically(WebSocketSession session, String eventId) {
    String userId = sessionUserMap.get(session.getId());
    if (userId == null) {
        log.warn("사용자를 찾을 수 없음: 세션 ID={}", session.getId());
        closeSession(session);
        return Mono.empty();
    }

    return Flux.interval(Duration.ofSeconds(5)) // 5초 간격으로 상태 전송
            .takeWhile(tick -> session.isOpen()) // 세션이 열려 있는 동안만 실행
            .flatMap(tick -> sendQueueStatus(session, eventId)) // 상태 업데이트
            .then();
}

설명:

  • Flux.interval: 5초마다 실행.
  • takeWhile(session.isOpen()): WebSocket 세션이 열려 있는 동안에만 실행.
  • sendQueueStatus: 사용자에게 대기열 상태를 전송.

(3) 대기열 상태 전송 (sendQueueStatus)

코드:

private Mono<Void> sendQueueStatus(WebSocketSession session, String eventId) {
    String userId = sessionUserMap.get(session.getId());
    if (userId == null) {
        log.warn("사용자를 찾을 수 없음: 세션 ID={}", session.getId());
        closeSession(session);
        return Mono.empty();
    }

    return queueService.getQueueSize(eventId)
            .flatMap(totalQueue -> queueService.getUserPosition(userId, eventId)
                    .flatMap(position -> {
                        QueueStatus status = getQueueStatus(position); // 사용자 상태 계산
                        String message = buildResponse(userId, eventId, position, totalQueue, status); // 메시지 생성
                        return sendWebSocketMessage(session, message); // WebSocket 메시지 전송
                    })
            )
            .doOnError(error -> log.error("대기열 상태 전송 실패: 세션 ID={}, 이벤트 ID={}, 오류={}", session.getId(), eventId, error.getMessage()))
            .onErrorResume(error -> {
                closeSession(session); // 오류 발생 시 세션 닫기
                return Mono.empty();
            });
}

설명:

  • queueService.getQueueSize: Redis에서 대기열의 전체 크기를 가져옴.
  • queueService.getUserPosition: 사용자의 대기열 위치를 가져옴.
  • sendWebSocketMessage: WebSocket 메시지로 사용자 상태를 전송.

(4) 슬롯 상태 모니터링 (monitorAndHandleQueue)

코드:

private Mono<Void> monitorAndHandleQueue(String eventId) {
    return Flux.interval(Duration.ofSeconds(5)) // 5초 주기로 슬롯 상태 확인
            .takeWhile(tick -> eventSessions.containsKey(eventId) && !eventSessions.get(eventId).isEmpty())
            .flatMap(tick -> slotService.hasAvailableSlot(eventId)
                    .flatMapMany(canEnter -> {
                        if (canEnter) {
                            return processQueue(eventId);
                        }
                        return Flux.empty();
                    })
            )
            .then()
            .doOnTerminate(() -> log.info("이벤트 ID={} 모니터링 작업이 종료되었습니다.", eventId));
}

설명:
1. Flux.interval: 5초마다 슬롯 상태를 확인.
2. slotService.hasAvailableSlot: 남은 슬롯 여부 확인.
3. processQueue: 대기열에서 다음 사용자를 처리.


이번 글에서는 Ficket 대기열 시스템의 핵심 구현 과정을 다뤘습니다. Kafka를 활용한 메시지 큐와 Redis ZSet을 통한 대기열 정렬, WebSocket을 이용한 실시간 상태 업데이트 등 설계 단계에서 정의한 목표를 실제 코드로 구현해 봤습니다.
다음 글에서는 Locust를 활용한 성능 테스트와 최적화 과정을 자세히 다룰 예정입니다. 읽어주셔서 감사합니다!

Reference

0개의 댓글