Redis SortedSet + Websocket 활용한 대기열 서비스 구현(Spring Boot)

개발하는 구황작물·2024년 7월 9일
0

계기

개인 프로젝트로 진행중인 선착순 퀴즈 서비스에 대기열 서비스를 추가하기로 했습니다.

기존 선착순 입장 기능을 구현할 때, 동시성 이슈를 해결하기 위해 Redisson을 활용하였습니다.

하지만 Redisson 자체로는 사용자의 순서를 보장해주지 못합니다.

이를 해결하기 위해 Redis Sorted Set를 활용하여 순서를 보장해주고, 클라이언트와 본인의 대기 순서를 실시간으로 알 수 있도록 WebSocket를 활용하였습니다.

Redis Sorted Set

Redis Sorted Set는 Java의 Set 처럼 고유의 값을 가지는 컬렉션입니다.

다만 차이점이라면 Sorted Set 내부의 값들은 score 라는 값을 기준으로 정렬됩니다.(스코어가 낮은 순에서 높은 순으로 정렬)

여기서 score를 날짜-시간으로 지정해두면 사용자를 선착순으로 정렬 할 수 있습니다.

대기열 서비스 요구사항

  • 들어온 순서대로 입장이 가능해야 한다(순서 보장).
  • 대기열에서 본인의 앞에 몇 명 있는지 알 수 있어야 한다.
  • 유저마다 성공/혹은 실패인지 알 수 있어야 한다.

FLOW

  1. 사용자가 퀴즈 서비스에 접근하기 위해 REST API로 요청을 보냅니다. (GET/wait/{endpoint})
  2. 사용자를 Waiting Queue(Redis SortedSet)에 추가합니다.
  3. QueueScheduler는 n초에 1번씩 Queue 내부의 모든 사용자에게 본인의 앞에 몇 명 있는지 정보 전송합니다.(websocket)
  4. 앞에서 10명씩 queue에서 poll 한 후, 퀴즈 페이지로 이동시킵니다.
  5. 1-4번 까지 반복합니다.

구현

1. ParticipantInfoController

@GetMapping("/wait/{endpoint}")
    public ResponseEntity<ResponseDto<?>> saveParticipant(@PathVariable("endpoint") String endpoint,
                                                          @AuthenticationPrincipal UserAccount user) {
        // queue 에 참가자 추가
        Users users = usersService.findByEmail(user.getUsername());

        Long quizId = participantInfoFacade.getQuizIdByEndpoint(endpoint);
        Long rank = participantInfoQueueService.addQueue(quizId, users.getId());
        return ResponseEntity.ok(ResponseDto.success(new ParticipantQueueResponseDto(quizId, users.getId(),rank)));
    }

사용자가 API에 접근하면 Waiting Queue(Redis SortedSet)에 추가합니다.

2. QueueScheduler

@Slf4j
@EnableScheduling
@Component
@RequiredArgsConstructor
public class QueueScheduler {
    private final ParticipantInfoQueueRepository participantInfoQueueRepository;
    private final ApplicationEventPublisher eventPublisher;
    private final SimpMessagingTemplate messagingTemplate;

    @Async
    @Transactional(value = "redisTx")
    @Scheduled(fixedDelay = 2000)
    public void showRankAndPollUser() {
        Set<Long> quizIdSet = participantInfoQueueRepository.getQuizIdSet();

        for (Long quizId : quizIdSet) {
            Set<Long> allUsersInQuiz = participantInfoQueueRepository.getAllUsers(quizId);
            Set<Long> tenUsersInQuiz = participantInfoQueueRepository.get10Users(quizId);
            for (Long userId : allUsersInQuiz) {
                Long rank = participantInfoQueueRepository.getRank(quizId, userId);
                rank = rank == null ? 0 : rank;
                log.info("userId = {}, rank = {}", userId, rank);
                String endpoint = String.format("?quiz-id=%d", quizId);

                messagingTemplate.convertAndSend("/topic/rank" + endpoint, rank);

                if (tenUsersInQuiz.contains(userId)) {
                    Integer capacity = participantInfoQueueRepository.getParticipantNumber(quizId);
                    if (capacity > 0) {
                        log.info("capacity = {}", capacity);
                        boolean isUserTurn = rank < 10L;
                        eventPublisher.publishEvent(new ParticipantQueueInfoDto(quizId, userId, rank, true, isUserTurn));
                    } else {
                        eventPublisher.publishEvent(new ParticipantQueueInfoDto(quizId, userId, rank, false, false));
                    }
                }
                participantInfoQueueRepository.delete10Users(quizId, (long) tenUsersInQuiz.size());
            }

        }
    }
}

1초에 1번씩 queue 내부의 유저 정보들을 loop 타면서 유저 순서 정보를 전송합니다.
만약 유저가 Sorted Set의 상위 10명 내에 포함되면 유저 순서 정보와 함께 유저가 선착순 안에 들었는지, 실패했는지에 대해 전달하는 이벤트를 발생시킵니다.

3. CustomEventListener

@Slf4j
@RequiredArgsConstructor
@Component
public class CustomEventListener {
    private final ResponsesFacade responsesFacade;
    private final ParticipantInfoFacade participantInfoFacade;
    private final SimpMessagingTemplate messagingTemplate;


    @Async
    @Transactional(value = "mongoTx",propagation = Propagation.REQUIRES_NEW)
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void sendMessage(ParticipantQueueInfoDto participantQueueInfoDto) {
        Long quizId = participantQueueInfoDto.quizId();
        Long userId = participantQueueInfoDto.userId();

        log.info("quizId = {}, userId = {}", quizId, userId);
        String endpoint = String.format("?quiz-id=%d&user-id=%d",quizId, userId);
        messagingTemplate.convertAndSend("/topic/participant" + endpoint, participantQueueInfoDto);
        //참여 가능 시
        if(participantQueueInfoDto.isCapacityLeft()) {
            log.info("save user start");
            participantInfoFacade.saveParticipants(quizId, userId);
        }

    }
}

QueueScheduler에서 이벤트가 발생되면 eventListener에서 websocket을 통해 사용자에게 유저 순서 정보와 함께 유저가 선착순 안에 들었는지, 실패했는지에 대한 정보를 전달합니다.

이후 순서에 들어간 유저들에게 퀴즈 화면을 보여줍니다.

profile
어쩌다보니 개발하게 된 구황작물

0개의 댓글