대기열 플로우
: 오늘은 흔히 티케팅이나 , 온라인 예매에 사용되는 대기열에 대해서 공부하고 , 정리해볼 것입니다.
: 클라이언트는 NextJS를 사용하고 , 소켓을 관리해줄 서버로 Spring 서버를 이용하거 , 대기열 관리에 적합한 레디스를 사용할 것입니다.
(1) 클라이언트 ( Next.Js )
→ 방문 서버 및 대기열 서버로 가정
(2) 백엔드 ( Spring )
(3) 레디스 저장 서버
Q) 레디스 장점
Sorted Set 은 대기열 구조에 적합하다.& 단점
& 레디스가 적합한 경우
테스트 시나리오
/queue 페이지/queue/wait 페이지/queue 입장/queue/wait 로 이동됨/queue 로 입장 완료된다.테스트 모습
(1) /queue 페이지에서, 본 방에 20명을 입장시키기 버튼을 실행 한 후의 방문자 및 대기열 유저 모습입니다.
-> 방문자 10명
-> 대기자 10명

(2) 입장 버튼을 누른 후 , 현재 방문자가 이미 초가되었기에 , 대기 메세지를반환 받고 ,대기열 페이지로 이동되는 모습입니다.

(3) /queue/wait 에 입장되고 , 대기자가 10명이였기에 ,내 대기순번이 11명 받은 모습이다.

(4) 내 대기순번이 유지되는지 확인하기 위해 , 랜덤 유저를 입장시킨 후 , 내 대기순번은 그대로 유지되고 , 대기자수가 1면 늘어난 모습입니다.

(5) 1초간격으로 퇴장시키는 버튼이 싱행되고 , 대기자가 줄어들고 내 대기순번도 줄어드는 모습입니다.

(6) 대기자가 모두 빠지고 내 대기순번이 온 상태입니다. 그리고 , 다시 원하던 페이지로 이동됩니다.

(7) 본 페이지로 입장 완료!

코드 정리
(1) 클라이언트 코드
: 클라이언트 소켓 호출 로직
// hooks/useWebSocket.ts
import {useEffect, useRef, useState} from 'react';
import {useRouter} from "next/navigation";
export interface WebSocketMessage {
type: string;
userId: string;
}
export const useWebSocket = ({isSocketStart}: { isSocketStart: boolean }) => {
const [isConnected, setIsConnected] = useState(false);
const [visitorCount, setVisitorCount] = useState(0);
const [queueCount, setQueueCount] = useState(0);
const [maxCount, setMaxCount] = useState(0);
const [isAllowedToEnter, setIsAllowedToEnter] = useState(false); // 입장 가능 여부 상태 추가
const [error, setError] = useState<string | null>(null);
const wsRef = useRef<WebSocket | null>(null);
const router = useRouter();
const userId = 'user_10000';
useEffect(() => {
if (!isSocketStart) return;
// Generate or retrieve userId
wsRef.current = new WebSocket('ws://localhost:8080/blog');
wsRef.current.onopen = () => {
setIsConnected(true);
// 입장 신청
sendMessage({
type: 'ENTER',
userId: userId
});
};
wsRef.current.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Received data:', data);
switch (data.type) {
//
case 'VISITOR_COUNT':
setVisitorCount(data.count);
setQueueCount(data.queueCount);
setMaxCount(data.maxCount);
break;
// 대기 상태일 경우 대기 페이지로 보내기
case 'QUEUE':
router.push('/queue/open');
break;
// 입장 허용
case 'ALLOWED':
setIsAllowedToEnter(true); // 입장 가능 상태 변경
alert('입장이 허용되었습니다!');
break;
case 'ERROR':
setError(data.message);
break;
default:
console.warn('Unknown message type:', data.type); // 알 수 없는 메시지 타입 로깅
}
};
wsRef.current.onerror = (error) => {
console.error('WebSocket error:', error);
setError('연결 중 오류가 발생했습니다.');
};
return () => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
// sendMessage({
// type: 'LEAVE',
// userId: userId
// });
wsRef.current.close();
}
};
}, [isSocketStart]);
const sendMessage = (message: WebSocketMessage) => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify(message));
}
};
return {
isConnected,
visitorCount,
error,
isAllowedToEnter,
queueCount,
maxCount,
sendMessage,
userId
};
};
(1) 백엔드 코드
: 백엔드 소켓 로직
package jpabook.jpashop.handler;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Component
@RequiredArgsConstructor
public class QueueWebSocketHandler extends TextWebSocketHandler {
private static final String BLOG_VISITORS_KEY = "blog:visitors";
private static final String BLOG_QUEUE_KEY = "blog:queue";
private static final int MAX_CONCURRENT_USERS = 10;
@Autowired
private RedisTemplate<String, String> redisTemplate;
// 세션 관리 (sessionId 기반)
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
// userId -> WebSocketSession 매핑 (핵심 개선 포인트)
private final Map<String, WebSocketSession> userSessions = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
sessions.put(session.getId(), session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
JsonNode jsonMessage = new ObjectMapper().readTree(message.getPayload());
String type = jsonMessage.get("type").asText();
String userId = jsonMessage.get("userId").asText();
// userId를 세션에 저장하고, userSessions에도 등록
session.getAttributes().put("userId", userId);
userSessions.put(userId, session);
switch (type) {
case "ENTER":
handleEnterBlog(session, userId);
break;
case "LEAVE":
handleLeaveBlog(session, userId);
break;
case "CHECK_QUEUE_STATUS": // <-- 여기를 CHECK_QUEUE_STATUS로
handleQueueStatusCheck(session, userId);
break;
}
}
private void handleEnterBlog(WebSocketSession session, String userId) {
redisTemplate.execute(new SessionCallback<>() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
operations.watch(BLOG_VISITORS_KEY);
Long size = operations.opsForZSet().size(BLOG_VISITORS_KEY);
Double score = operations.opsForZSet().score(BLOG_VISITORS_KEY, userId);
operations.multi();
if (score == null && (size != null && size >= MAX_CONCURRENT_USERS)) {
// 이미 대기열에 있는지 확인 (중복 방지 로직 추가)
List<String> queue = operations.opsForList().range(BLOG_QUEUE_KEY, 0, -1);
if (queue == null || !queue.contains(userId)) {
operations.opsForList().rightPush(BLOG_QUEUE_KEY, userId);
}
List<Object> execResults = operations.exec();
if (execResults != null && !execResults.isEmpty()) {
sendQueueMessage(session, "대기열에 추가되었습니다. 순서를 기다려 주세요.");
} else {
// 트랜잭션 충돌로 인해 실패한 경우 처리
sendQueueMessage(session, "대기열 추가에 실패했습니다. 다시 시도해 주세요.");
}
return null;
}
operations.opsForZSet().add(BLOG_VISITORS_KEY, userId, System.currentTimeMillis());
List<Object> execResults = operations.exec();
if (execResults != null && !execResults.isEmpty()) {
sendAllowedMessage(session, "입장이 허용되었습니다!");
broadcastVisitorCount();
} else {
// 트랜잭션 충돌로 인해 입장 실패 시 처리
sendQueueMessage(session, "방문자 등록에 실패했습니다. 다시 시도해 주세요.");
}
return execResults;
}
});
}
private void handleLeaveBlog(WebSocketSession session, String userId) {
redisTemplate.execute(new SessionCallback<>() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
operations.watch(BLOG_VISITORS_KEY);
operations.multi();
operations.opsForZSet().remove(BLOG_VISITORS_KEY, userId);
operations.opsForList().remove(BLOG_QUEUE_KEY, 0, userId);
operations.exec();
processNextUserInQueue();
broadcastVisitorCount();
return null;
}
});
userSessions.remove(userId); // 세션 관리에서도 제거
}
private void processNextUserInQueue() {
Long size = redisTemplate.opsForZSet().size(BLOG_VISITORS_KEY);
if (size != null && size < MAX_CONCURRENT_USERS) {
String nextUser = redisTemplate.opsForList().leftPop(BLOG_QUEUE_KEY);
if (nextUser != null) {
redisTemplate.opsForZSet().add(BLOG_VISITORS_KEY, nextUser, System.currentTimeMillis());
WebSocketSession nextSession = userSessions.get(nextUser);
if (nextSession != null && nextSession.isOpen()) {
sendAllowedMessage(nextSession, "입장이 허용되었습니다!");
} else {
// 세션이 없으면 다음 유저 탐색
processNextUserInQueue();
}
}
}
}
private void sendTextMessage(WebSocketSession session, ObjectNode message) {
try {
session.sendMessage(new TextMessage(message.toString()));
} catch (IOException e) {
e.printStackTrace();
}
}
private void handleQueueStatusCheck(WebSocketSession session, String userId) {
Long position = getQueuePosition(userId);
Long visitorCount = redisTemplate.opsForZSet().size(BLOG_VISITORS_KEY);
Long queueCount = redisTemplate.opsForList().size(BLOG_QUEUE_KEY);
boolean isAllowed = false;
// 방문자수가 꽉 차지 않았고, 대기열 맨 앞이 현재 사용자라면 입장 허용
if (visitorCount != null && visitorCount < MAX_CONCURRENT_USERS && isFirstInQueue(userId)) {
// 방문자 목록에 추가
redisTemplate.opsForZSet().add(BLOG_VISITORS_KEY, userId, System.currentTimeMillis());
// 대기열에서 제거
redisTemplate.opsForList().remove(BLOG_QUEUE_KEY, 1, userId);
// 방문자 수 갱신
broadcastVisitorCount();
isAllowed = true;
}
ObjectNode message = new ObjectMapper().createObjectNode();
message.put("type", "QUEUE_STATUS");
message.put("allowed", isAllowed);
if (isAllowed) {
message.put("message", "입장이 허용되었습니다.");
} else if (position != null) {
message.put("message", "현재 대기 순번: " + position);
} else {
message.put("message", "대기열에서 찾을 수 없습니다.");
}
message.put("visitorCount", visitorCount != null ? visitorCount : 0);
message.put("queueCount", queueCount != null ? queueCount : 0);
message.put("maxCount", MAX_CONCURRENT_USERS);
sendTextMessage(session, message);
}
private Long getQueuePosition(String userId) {
List<String> queue = redisTemplate.opsForList().range(BLOG_QUEUE_KEY, 0, -1);
if (queue == null) return null;
for (int i = 0; i < queue.size(); i++) {
if (queue.get(i).equals(userId)) {
return (long) (i + 1);
}
}
return null;
}
private boolean isFirstInQueue(String userId) {
List<String> queue = redisTemplate.opsForList().range(BLOG_QUEUE_KEY, 0, 0);
return queue != null && !queue.isEmpty() && queue.get(0).equals(userId);
}
private void broadcastVisitorCount() {
ObjectNode message = new ObjectMapper().createObjectNode();
message.put("type", "VISITOR_COUNT");
Long visitorCount = redisTemplate.opsForZSet().size(BLOG_VISITORS_KEY);
Long queueCount = redisTemplate.opsForList().size(BLOG_QUEUE_KEY);
message.put("count", visitorCount != null ? visitorCount : 0);
message.put("queueCount", queueCount != null ? queueCount : 0);
message.put("maxCount", MAX_CONCURRENT_USERS);
broadcastMessage(message);
}
private void broadcastMessage(ObjectNode message) {
String msgStr = message.toString();
sessions.values().forEach(session -> {
try {
session.sendMessage(new TextMessage(msgStr));
} catch (IOException e) {
e.printStackTrace();
}
});
}
private void sendAllowedMessage(WebSocketSession session, String message) {
sendMessage(session, "ALLOWED", message);
}
private void sendQueueMessage(WebSocketSession session, String message) {
sendMessage(session, "QUEUE", message);
}
// private void sendQueueStatusMessage(WebSocketSession session, Long position) {
// String msg = (position != null) ? "현재 대기 순번: " + position : "대기열에서 찾을 수 없습니다.";
// sendMessage(session, "QUEUE_STATUS", msg);
// }
private void sendMessage(WebSocketSession session, String type, String message) {
try {
ObjectNode jsonMessage = new ObjectMapper().createObjectNode();
jsonMessage.put("type", type);
jsonMessage.put("message", message);
session.sendMessage(new TextMessage(jsonMessage.toString()));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
sessions.remove(session.getId());
String userId = (String) session.getAttributes().get("userId");
if (userId != null) {
userSessions.remove(userId);
}
}
}
테스트의 한계점 및 알게된 점
& 구조적 한계
& 한 개의 클라이언트 서버를 사용
& 소켓 요청에 대해 알게된 점
ENTER 요청을 했다면, ENTER 에대 대한 요청만을 받을 수 있으며, LEAVE 요청도 받고 싶으면 ,백엔드 서버에서 ENTER 때 LEAVE 요청도 추가로 브로드캐스팅되게 해줘야 요청을 받을 수 있다.실제 대기열 서버는 어떻게 구성될까?
&보안
&서버 분리
Q 대기열 서버를 분리하는 이유는?
Q.대기열 서버가 과부하되면 어떻게 되나?
→ 주의점!
: Redis 활용으로 대기열 상태를 공유해야 한다.
QQ. 그러면 대기열 수를 얼마나 둬야하나?
& 추가적인 운영 정책
&비동기 이벤트 기반 처리
VQUEUE (가상 대기열)
: 그리고 , 좀 더 찾아보다가 더 효율적인 방법에 대해 알게 된점이 VQUEUE(가상 대기열) 서버를 두는 방식입니다. 현재의 내가 가상의 시나리오에 사용한 방식은 서버 직접 관리 방식이다.
& 동작 방식
& 장점
| 구분 | 현재 방식 (서버 대기열) | VQUEUE (Virtual Queue) |
|---|---|---|
| 대기열 관리 위치 | 서비스 서버에서 직접 관리 | 별도 프록시/대기열 서버에서 관리 |
| WebSocket 유지 | 계속 연결 필요 | 필요 없음 (프록시가 관리) |
| 부하 분산 | 서비스 서버가 직접 부담 | 서비스 서버 부하 분산 (VQUEUE 서버가 흡수) |
| 사용자 경험 | 직접 대기순번 확인 | 브라우저에 대기 화면 표시 |
| 트래픽 폭주 시 | 서비스 서버가 감당 | VQUEUE 서버가 감당 |
& VQUEUE 방식
브라우저 → VQUEUE → 서비스
& 서버사이드
브라우저 → 서비스 → 서버 → 서비스
정리