개인 프로젝트로 진행중인 선착순 퀴즈 서비스에 대기열 서비스를 추가하기로 했습니다.
아무래도 선착순 서비스이다보니 한번에 트래픽이 몰릴 것을 예상하고 만드는 것이 좋겠다 생각해서 만들게 되었습니다.
/wait/{endpoint}
)@GetMapping("/wait/{endpoint}")
public ResponseEntity<ResponseDto<?>> saveParticipant(@PathVariable("endpoint") String endpoint,
@AuthenticationPrincipal UserAccount user) {
// queue 에 참가자 추가
Users users = usersService.findByEmail(user.getUsername());
Long quizId = participantInfoFacade.getQuizIdByEndpoint(endpoint);
Long rank = participantInfoQueueService.addQueue(quizId, users.getId());
return ResponseEntity.ok(ResponseDto.success(new ParticipantQueueResponseDto(quizId, users.getId(),rank)));
}
사용자가 API에 접근하면 Waiting Queue(Redis SortedSet)에 추가합니다.
@Slf4j
@EnableScheduling
@Component
@RequiredArgsConstructor
public class QueueScheduler {
private final ParticipantInfoQueueRepository participantInfoQueueRepository;
private final ApplicationEventPublisher eventPublisher;
private final SimpMessagingTemplate messagingTemplate;
@Async
@Transactional(value = "redisTx")
@Scheduled(fixedDelay = 2000)
public void showRankAndPollUser() {
Set<Long> quizIdSet = participantInfoQueueRepository.getQuizIdSet();
for (Long quizId : quizIdSet) {
Set<Long> allUsersInQuiz = participantInfoQueueRepository.getAllUsers(quizId);
Set<Long> tenUsersInQuiz = participantInfoQueueRepository.get10Users(quizId);
for (Long userId : allUsersInQuiz) {
Long rank = participantInfoQueueRepository.getRank(quizId, userId);
rank = rank == null ? 0 : rank;
log.info("userId = {}, rank = {}", userId, rank);
String endpoint = String.format("?quiz-id=%d", quizId);
messagingTemplate.convertAndSend("/topic/rank" + endpoint, rank);
if (tenUsersInQuiz.contains(userId)) {
Integer capacity = participantInfoQueueRepository.getParticipantNumber(quizId);
if (capacity > 0) {
log.info("capacity = {}", capacity);
boolean isUserTurn = rank < 10L;
eventPublisher.publishEvent(new ParticipantQueueInfoDto(quizId, userId, rank, true, isUserTurn));
} else {
eventPublisher.publishEvent(new ParticipantQueueInfoDto(quizId, userId, rank, false, false));
}
}
participantInfoQueueRepository.delete10Users(quizId, (long) tenUsersInQuiz.size());
}
}
}
}
1초에 1번씩 queue 내부의 유저 정보들을 loop 타면서 유저 순서 정보를 전송합니다.
만약 유저가 앞의 10명 내에 포함되면 유저 순서 정보와 함께 유저가 선착순 안에 들었는지, 실패했는지에 대해 전달하는 이벤트를 발생시킵니다.
@Slf4j
@RequiredArgsConstructor
@Component
public class CustomEventListener {
private final ResponsesFacade responsesFacade;
private final ParticipantInfoFacade participantInfoFacade;
private final SimpMessagingTemplate messagingTemplate;
@Async
@Transactional(value = "mongoTx",propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void sendMessage(ParticipantQueueInfoDto participantQueueInfoDto) {
Long quizId = participantQueueInfoDto.quizId();
Long userId = participantQueueInfoDto.userId();
log.info("quizId = {}, userId = {}", quizId, userId);
String endpoint = String.format("?quiz-id=%d&user-id=%d",quizId, userId);
messagingTemplate.convertAndSend("/topic/participant" + endpoint, participantQueueInfoDto);
//참여 가능 시
if(participantQueueInfoDto.isCapacityLeft()) {
log.info("save user start");
participantInfoFacade.saveParticipants(quizId, userId);
}
}
}
QueueScheduler에서 이벤트가 발생되면 eventListener에서 websocket을 통해 사용자에게 유저 순서 정보와 함께 유저가 선착순 안에 들었는지, 실패했는지에 대한 정보를 전달합니다.