Redis + Websocket 활용한 대기열 서비스 구현(Spring Boot)

개발하는 구황작물·2024년 7월 9일
0
post-custom-banner

계기

개인 프로젝트로 진행중인 선착순 퀴즈 서비스에 대기열 서비스를 추가하기로 했습니다.

아무래도 선착순 서비스이다보니 한번에 트래픽이 몰릴 것을 예상하고 만드는 것이 좋겠다 생각해서 만들게 되었습니다.

대기열 서비스 요구사항

  • 들어온 순서대로 입장이 가능해야 한다.
  • 대기열에서 본인의 앞에 몇 명 있는지 알 수 있어야 한다.
  • 유저마다 성공/혹은 실패인지 알 수 있어야 한다.

FLOW

  1. 사용자가 퀴즈 서비스에 접근하기 위해 REST API로 요청을 보냅니다. (GET/wait/{endpoint})
  2. 사용자를 Waiting Queue(Redis SortedSet)에 추가합니다.
  3. QueueScheduler는 n초에 1번씩 Queue 내부의 모든 사용자에게 본인의 앞에 몇 명 있는지 정보 전송합니다.(websocket)
  4. 앞에서 10명씩 queue에서 poll 한 후, 퀴즈 페이지로 이동시킵니다.
  5. 1-4번 까지 반복

구현

1. ParticipantInfoController

@GetMapping("/wait/{endpoint}")
    public ResponseEntity<ResponseDto<?>> saveParticipant(@PathVariable("endpoint") String endpoint,
                                                          @AuthenticationPrincipal UserAccount user) {
        // queue 에 참가자 추가
        Users users = usersService.findByEmail(user.getUsername());

        Long quizId = participantInfoFacade.getQuizIdByEndpoint(endpoint);
        Long rank = participantInfoQueueService.addQueue(quizId, users.getId());
        return ResponseEntity.ok(ResponseDto.success(new ParticipantQueueResponseDto(quizId, users.getId(),rank)));
    }

사용자가 API에 접근하면 Waiting Queue(Redis SortedSet)에 추가합니다.

2. QueueScheduler

@Slf4j
@EnableScheduling
@Component
@RequiredArgsConstructor
public class QueueScheduler {
    private final ParticipantInfoQueueRepository participantInfoQueueRepository;
    private final ApplicationEventPublisher eventPublisher;
    private final SimpMessagingTemplate messagingTemplate;

    @Async
    @Transactional(value = "redisTx")
    @Scheduled(fixedDelay = 2000)
    public void showRankAndPollUser() {
        Set<Long> quizIdSet = participantInfoQueueRepository.getQuizIdSet();

        for (Long quizId : quizIdSet) {
            Set<Long> allUsersInQuiz = participantInfoQueueRepository.getAllUsers(quizId);
            Set<Long> tenUsersInQuiz = participantInfoQueueRepository.get10Users(quizId);
            for (Long userId : allUsersInQuiz) {
                Long rank = participantInfoQueueRepository.getRank(quizId, userId);
                rank = rank == null ? 0 : rank;
                log.info("userId = {}, rank = {}", userId, rank);
                String endpoint = String.format("?quiz-id=%d", quizId);

                messagingTemplate.convertAndSend("/topic/rank" + endpoint, rank);

                if (tenUsersInQuiz.contains(userId)) {
                    Integer capacity = participantInfoQueueRepository.getParticipantNumber(quizId);
                    if (capacity > 0) {
                        log.info("capacity = {}", capacity);
                        boolean isUserTurn = rank < 10L;
                        eventPublisher.publishEvent(new ParticipantQueueInfoDto(quizId, userId, rank, true, isUserTurn));
                    } else {
                        eventPublisher.publishEvent(new ParticipantQueueInfoDto(quizId, userId, rank, false, false));
                    }
                }
                participantInfoQueueRepository.delete10Users(quizId, (long) tenUsersInQuiz.size());
            }

        }
    }
}

1초에 1번씩 queue 내부의 유저 정보들을 loop 타면서 유저 순서 정보를 전송합니다.
만약 유저가 앞의 10명 내에 포함되면 유저 순서 정보와 함께 유저가 선착순 안에 들었는지, 실패했는지에 대해 전달하는 이벤트를 발생시킵니다.

3. CustomEventListener

@Slf4j
@RequiredArgsConstructor
@Component
public class CustomEventListener {
    private final ResponsesFacade responsesFacade;
    private final ParticipantInfoFacade participantInfoFacade;
    private final SimpMessagingTemplate messagingTemplate;


    @Async
    @Transactional(value = "mongoTx",propagation = Propagation.REQUIRES_NEW)
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void sendMessage(ParticipantQueueInfoDto participantQueueInfoDto) {
        Long quizId = participantQueueInfoDto.quizId();
        Long userId = participantQueueInfoDto.userId();

        log.info("quizId = {}, userId = {}", quizId, userId);
        String endpoint = String.format("?quiz-id=%d&user-id=%d",quizId, userId);
        messagingTemplate.convertAndSend("/topic/participant" + endpoint, participantQueueInfoDto);
        //참여 가능 시
        if(participantQueueInfoDto.isCapacityLeft()) {
            log.info("save user start");
            participantInfoFacade.saveParticipants(quizId, userId);
        }

    }
}

QueueScheduler에서 이벤트가 발생되면 eventListener에서 websocket을 통해 사용자에게 유저 순서 정보와 함께 유저가 선착순 안에 들었는지, 실패했는지에 대한 정보를 전달합니다.

profile
어쩌다보니 개발하게 된 구황작물
post-custom-banner

0개의 댓글