대기열 서버 관리 ( Socket , VQUEUE )

성찬홍·2025년 3월 2일

아키텍쳐

목록 보기
1/1
post-thumbnail

대기열 플로우

: 오늘은 흔히 티케팅이나 , 온라인 예매에 사용되는 대기열에 대해서 공부하고 , 정리해볼 것입니다.

1. 테스트 스펙

: 클라이언트는 NextJS를 사용하고 , 소켓을 관리해줄 서버로 Spring 서버를 이용하거 , 대기열 관리에 적합한 레디스를 사용할 것입니다.

(1) 클라이언트 ( Next.Js )
→ 방문 서버 및 대기열 서버로 가정
(2) 백엔드 ( Spring )
(3) 레디스 저장 서버

Q) 레디스 장점

  • 빠른 성능
    • 인메모리 데이터베이스로 , RDBMS 등을 사용하는 것보다 훨씬 빠른 속도로 데이터를 읽고 쓸 수 있어서 , 대기열 시스템에서 실시간으로 데이터를 처리하기에 적합하다.
  • 데이터 구조 지원
    • List,Set, Sorted Set 등의 다양한 자료구조를 지원하여 대기열 구현에 유용하다.
      Sorted Set 은 대기열 구조에 적합하다.
  • 원자적 트랜잭션을 지원해서 , 대기열의 일관성을 유지할 수 있다.
  • 클러스터링을 통해 수평적 확장이 가능하고 ,대규모 트래픽 처리에 효과적이다.

& 단점

  • 인메모리 저장소이기 때문에 , 서버가 재시작되면 데이터가 사라질 수 있다.
    • AOF설정이나 RDB 설정으로 영속성을 보완할 수 있지만, RDB만큼 안정적이지는 않다
  • 모든 데이터를 메모리에 저장하기에, 메모리 용량에 제한이 있다.

& 레디스가 적합한 경우

  • 빠른 속도와 간단한 구현이 중요한 경우 ( 실시간 대기열 , 캐시 기반 대기열 )
  • 대기열에 대한 복잡한 기능이 없고 , 인메모리 처리가 충분한 경우

테스트 시나리오

테스트 시나리오

  • /queue 페이지
    ⇒ 방문을 원하는 홈페이지로 가정
  • /queue/wait 페이지
    ⇒ 부하를 줄이기 위한 대기열 서버로 가정
  1. /queue 입장
  2. 유저 생성 버튼으로, 임의의 1~20 user를 입장
    a. 최대 방문자를 10명으로 지정
    b. 방문자 테이블에 10명 추가
    c. 대기열 테이블에 10명 추가
  3. 본인(user_10000번)인 입장 버튼 클릭
    a. 현재 방문자가 10명 이므로 대기열에 추가되고 , /queue/wait 로 이동됨
  4. 이동된 페이지에서 , 일정 간격으로 상태 확인 소켓 메세지를 호출
  5. 현재 대기열에 추가된 본인의 순번이 다른 입장자가 생겨도 유지되는지 확인하기 위해 , 유저 랜덤 입장버튼을 사용
  6. 임의로 만든 유저이들이게 , 1초 간격으로 랜덤하게 방문자가 떠나게 하는 버튼을 호출
    a. 1초간격으로 방문자가 나가고 ,대기열의 우선순위 방문자가 추가된다
    b. 그리고, 나의 순번이 오면 입장이 되고 , /queue 로 입장 완료된다.

테스트 모습

(1) /queue 페이지에서, 본 방에 20명을 입장시키기 버튼을 실행 한 후의 방문자 및 대기열 유저 모습입니다.

-> 방문자 10명
-> 대기자 10명

(2) 입장 버튼을 누른 후 , 현재 방문자가 이미 초가되었기에 , 대기 메세지를반환 받고 ,대기열 페이지로 이동되는 모습입니다.

(3) /queue/wait 에 입장되고 , 대기자가 10명이였기에 ,내 대기순번이 11명 받은 모습이다.

(4) 내 대기순번이 유지되는지 확인하기 위해 , 랜덤 유저를 입장시킨 후 , 내 대기순번은 그대로 유지되고 , 대기자수가 1면 늘어난 모습입니다.

(5) 1초간격으로 퇴장시키는 버튼이 싱행되고 , 대기자가 줄어들고 내 대기순번도 줄어드는 모습입니다.

(6) 대기자가 모두 빠지고 내 대기순번이 온 상태입니다. 그리고 , 다시 원하던 페이지로 이동됩니다.

(7) 본 페이지로 입장 완료!

코드 정리

(1) 클라이언트 코드
: 클라이언트 소켓 호출 로직

// hooks/useWebSocket.ts
import {useEffect, useRef, useState} from 'react';
import {useRouter} from "next/navigation";

export interface WebSocketMessage {
    type: string;
    userId: string;
}

export const useWebSocket = ({isSocketStart}: { isSocketStart: boolean }) => {
    const [isConnected, setIsConnected] = useState(false);
    const [visitorCount, setVisitorCount] = useState(0);
    const [queueCount, setQueueCount] = useState(0);
    const [maxCount, setMaxCount] = useState(0);

    const [isAllowedToEnter, setIsAllowedToEnter] = useState(false);  // 입장 가능 여부 상태 추가
    const [error, setError] = useState<string | null>(null);
    const wsRef = useRef<WebSocket | null>(null);
    const router = useRouter();
    const userId = 'user_10000';


    useEffect(() => {
        if (!isSocketStart) return;
        // Generate or retrieve userId

        wsRef.current = new WebSocket('ws://localhost:8080/blog');

        wsRef.current.onopen = () => {
            setIsConnected(true);
            // 입장 신청
            sendMessage({
                type: 'ENTER',
                userId: userId
            });
        };

        wsRef.current.onmessage = (event) => {
            const data = JSON.parse(event.data);
            console.log('Received data:', data);

            switch (data.type) {
                //
                case 'VISITOR_COUNT':
                    setVisitorCount(data.count);
                    setQueueCount(data.queueCount);
                    setMaxCount(data.maxCount);
                    break;

                // 대기 상태일 경우 대기 페이지로 보내기
                case 'QUEUE':
                    router.push('/queue/open');
                    break;
                // 입장 허용
                case 'ALLOWED':
                    setIsAllowedToEnter(true);  // 입장 가능 상태 변경
                    alert('입장이 허용되었습니다!');
                    break;

                case 'ERROR':
                    setError(data.message);
                    break;
                default:
                    console.warn('Unknown message type:', data.type);  // 알 수 없는 메시지 타입 로깅
            }
        };

        wsRef.current.onerror = (error) => {
            console.error('WebSocket error:', error);
            setError('연결 중 오류가 발생했습니다.');
        };

        return () => {
            if (wsRef.current?.readyState === WebSocket.OPEN) {
                // sendMessage({
                //     type: 'LEAVE',
                //     userId: userId
                // });
                wsRef.current.close();
            }
        };
    }, [isSocketStart]);

    const sendMessage = (message: WebSocketMessage) => {
        if (wsRef.current?.readyState === WebSocket.OPEN) {
            wsRef.current.send(JSON.stringify(message));
        }
    };

    return {
        isConnected,
        visitorCount,
        error,
        isAllowedToEnter,
        queueCount,
        maxCount,
        sendMessage,
        userId
    };
};

(1) 백엔드 코드
: 백엔드 소켓 로직

package jpabook.jpashop.handler;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

@Component
@RequiredArgsConstructor
public class QueueWebSocketHandler extends TextWebSocketHandler {

    private static final String BLOG_VISITORS_KEY = "blog:visitors";
    private static final String BLOG_QUEUE_KEY = "blog:queue";
    private static final int MAX_CONCURRENT_USERS = 10;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    // 세션 관리 (sessionId 기반)
    private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();

    // userId -> WebSocketSession 매핑 (핵심 개선 포인트)
    private final Map<String, WebSocketSession> userSessions = new ConcurrentHashMap<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        sessions.put(session.getId(), session);
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        JsonNode jsonMessage = new ObjectMapper().readTree(message.getPayload());
        String type = jsonMessage.get("type").asText();
        String userId = jsonMessage.get("userId").asText();

        // userId를 세션에 저장하고, userSessions에도 등록
        session.getAttributes().put("userId", userId);
        userSessions.put(userId, session);

        switch (type) {
            case "ENTER":
                handleEnterBlog(session, userId);
                break;
            case "LEAVE":
                handleLeaveBlog(session, userId);
                break;
            case "CHECK_QUEUE_STATUS":  // <-- 여기를 CHECK_QUEUE_STATUS로
                handleQueueStatusCheck(session, userId);
                break;
        }

    }


    private void handleEnterBlog(WebSocketSession session, String userId) {
        redisTemplate.execute(new SessionCallback<>() {
            @Override
            public Object execute(RedisOperations operations) throws DataAccessException {
                operations.watch(BLOG_VISITORS_KEY);

                Long size = operations.opsForZSet().size(BLOG_VISITORS_KEY);
                Double score = operations.opsForZSet().score(BLOG_VISITORS_KEY, userId);

                operations.multi();

                if (score == null && (size != null && size >= MAX_CONCURRENT_USERS)) {
                    // 이미 대기열에 있는지 확인 (중복 방지 로직 추가)
                    List<String> queue = operations.opsForList().range(BLOG_QUEUE_KEY, 0, -1);
                    if (queue == null || !queue.contains(userId)) {
                        operations.opsForList().rightPush(BLOG_QUEUE_KEY, userId);
                    }
                    List<Object> execResults = operations.exec();

                    if (execResults != null && !execResults.isEmpty()) {
                        sendQueueMessage(session, "대기열에 추가되었습니다. 순서를 기다려 주세요.");
                    } else {
                        // 트랜잭션 충돌로 인해 실패한 경우 처리
                        sendQueueMessage(session, "대기열 추가에 실패했습니다. 다시 시도해 주세요.");
                    }
                    return null;
                }

                operations.opsForZSet().add(BLOG_VISITORS_KEY, userId, System.currentTimeMillis());
                List<Object> execResults = operations.exec();

                if (execResults != null && !execResults.isEmpty()) {
                    sendAllowedMessage(session, "입장이 허용되었습니다!");
                    broadcastVisitorCount();
                } else {
                    // 트랜잭션 충돌로 인해 입장 실패 시 처리
                    sendQueueMessage(session, "방문자 등록에 실패했습니다. 다시 시도해 주세요.");
                }

                return execResults;
            }
        });
    }


    private void handleLeaveBlog(WebSocketSession session, String userId) {
        redisTemplate.execute(new SessionCallback<>() {
            @Override
            public Object execute(RedisOperations operations) throws DataAccessException {
                operations.watch(BLOG_VISITORS_KEY);

                operations.multi();
                operations.opsForZSet().remove(BLOG_VISITORS_KEY, userId);
                operations.opsForList().remove(BLOG_QUEUE_KEY, 0, userId);
                operations.exec();

                processNextUserInQueue();

                broadcastVisitorCount();
                return null;
            }
        });

        userSessions.remove(userId);  // 세션 관리에서도 제거
    }

    private void processNextUserInQueue() {
        Long size = redisTemplate.opsForZSet().size(BLOG_VISITORS_KEY);

        if (size != null && size < MAX_CONCURRENT_USERS) {
            String nextUser = redisTemplate.opsForList().leftPop(BLOG_QUEUE_KEY);
            if (nextUser != null) {
                redisTemplate.opsForZSet().add(BLOG_VISITORS_KEY, nextUser, System.currentTimeMillis());
                WebSocketSession nextSession = userSessions.get(nextUser);

                if (nextSession != null && nextSession.isOpen()) {
                    sendAllowedMessage(nextSession, "입장이 허용되었습니다!");
                } else {
                    // 세션이 없으면 다음 유저 탐색
                    processNextUserInQueue();
                }
            }
        }
    }

    private void sendTextMessage(WebSocketSession session, ObjectNode message) {
        try {
            session.sendMessage(new TextMessage(message.toString()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    private void handleQueueStatusCheck(WebSocketSession session, String userId) {
        Long position = getQueuePosition(userId);
        Long visitorCount = redisTemplate.opsForZSet().size(BLOG_VISITORS_KEY);
        Long queueCount = redisTemplate.opsForList().size(BLOG_QUEUE_KEY);

        boolean isAllowed = false;

        // 방문자수가 꽉 차지 않았고, 대기열 맨 앞이 현재 사용자라면 입장 허용
        if (visitorCount != null && visitorCount < MAX_CONCURRENT_USERS && isFirstInQueue(userId)) {
            // 방문자 목록에 추가
            redisTemplate.opsForZSet().add(BLOG_VISITORS_KEY, userId, System.currentTimeMillis());
            // 대기열에서 제거
            redisTemplate.opsForList().remove(BLOG_QUEUE_KEY, 1, userId);
            // 방문자 수 갱신
            broadcastVisitorCount();
            isAllowed = true;
        }

        ObjectNode message = new ObjectMapper().createObjectNode();
        message.put("type", "QUEUE_STATUS");
        message.put("allowed", isAllowed);

        if (isAllowed) {
            message.put("message", "입장이 허용되었습니다.");
        } else if (position != null) {
            message.put("message", "현재 대기 순번: " + position);
        } else {
            message.put("message", "대기열에서 찾을 수 없습니다.");
        }

        message.put("visitorCount", visitorCount != null ? visitorCount : 0);
        message.put("queueCount", queueCount != null ? queueCount : 0);
        message.put("maxCount", MAX_CONCURRENT_USERS);

        sendTextMessage(session, message);
    }


    private Long getQueuePosition(String userId) {
        List<String> queue = redisTemplate.opsForList().range(BLOG_QUEUE_KEY, 0, -1);
        if (queue == null) return null;

        for (int i = 0; i < queue.size(); i++) {
            if (queue.get(i).equals(userId)) {
                return (long) (i + 1);
            }
        }
        return null;
    }

    private boolean isFirstInQueue(String userId) {
        List<String> queue = redisTemplate.opsForList().range(BLOG_QUEUE_KEY, 0, 0);
        return queue != null && !queue.isEmpty() && queue.get(0).equals(userId);
    }


    private void broadcastVisitorCount() {
        ObjectNode message = new ObjectMapper().createObjectNode();
        message.put("type", "VISITOR_COUNT");

        Long visitorCount = redisTemplate.opsForZSet().size(BLOG_VISITORS_KEY);
        Long queueCount = redisTemplate.opsForList().size(BLOG_QUEUE_KEY);

        message.put("count", visitorCount != null ? visitorCount : 0);
        message.put("queueCount", queueCount != null ? queueCount : 0);
        message.put("maxCount", MAX_CONCURRENT_USERS);

        broadcastMessage(message);
    }

    private void broadcastMessage(ObjectNode message) {
        String msgStr = message.toString();
        sessions.values().forEach(session -> {
            try {
                session.sendMessage(new TextMessage(msgStr));
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

    private void sendAllowedMessage(WebSocketSession session, String message) {
        sendMessage(session, "ALLOWED", message);
    }

    private void sendQueueMessage(WebSocketSession session, String message) {
        sendMessage(session, "QUEUE", message);
    }

//    private void sendQueueStatusMessage(WebSocketSession session, Long position) {
//        String msg = (position != null) ? "현재 대기 순번: " + position : "대기열에서 찾을 수 없습니다.";
//        sendMessage(session, "QUEUE_STATUS", msg);
//    }

    private void sendMessage(WebSocketSession session, String type, String message) {
        try {
            ObjectNode jsonMessage = new ObjectMapper().createObjectNode();
            jsonMessage.put("type", type);
            jsonMessage.put("message", message);
            session.sendMessage(new TextMessage(jsonMessage.toString()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        sessions.remove(session.getId());
        String userId = (String) session.getAttributes().get("userId");
        if (userId != null) {
            userSessions.remove(userId);
        }
    }
}

테스트의 한계점 및 알게된 점

& 구조적 한계

  • 임의로 넣는 테스트이기에 여러 IP로 들어오는 유저들의 유연한 테스트는 불가능하기에 , 예상치 못한 오류가 있을 수 있다.
    • 동일 클라이언트에서 다수의 유저를 생성하고 관리하기 때문에 실제 분산 환경과 다름.
  • 대기열 서버를 따로 두는게 일반적인 방법일 것인데 , 같은 클라이언트 서버를 두기에 구현에 대기열 플로우 구현에 둔 테스트이다.

& 한 개의 클라이언트 서버를 사용

  • 페이지 이동시에 , 언마운트되면서 , LEAVE 처리가 되는데 이게 겹쳐서 현재는 주석처리

& 소켓 요청에 대해 알게된 점

  • 소켓을 연결했다고해서 , 모든 이벤트를 감지 할 수 있는 것이 아니다.
    • ENTER 요청을 했다면, ENTER 에대 대한 요청만을 받을 수 있으며, LEAVE 요청도 받고 싶으면 ,백엔드 서버에서 ENTERLEAVE 요청도 추가로 브로드캐스팅되게 해줘야 요청을 받을 수 있다.

실제 대기열 서버는 어떻게 구성될까?

&보안

  • 대기열을 통과한 유저만 접근이 가능하게 해준다.
  • 대기열에서 ‘허용’ 요청을 받은 유저가 JWT를 발급 받아서 입장을 혀용해준다.

&서버 분리

  • 대기열 관리 서버와 실제 서비스 서버는 분리해준다.
  • 대기열 서버는 사용자 수, 순번 관리만 담당해준다.
  • 실제 서비스 서버는 비즈니스 로직 처리 담당해준다.

Q 대기열 서버를 분리하는 이유는?

  • 대기열 관리와 실제 서비스는 트래픽 특성이 다릅니다.
  • 대기열은 실시간으로 많은 요청(입장, 퇴장, 순번 확인 등)을 처리해야 합니다.
  • 실제 서비스는 비즈니스 로직, 데이터 조회 등 다른 작업을 합니다.
  • 서로 성격이 다른 트래픽이 한 서버에 몰리면 성능 저하, 장애 가능성이 높아집니다.
  • 따라서 대기열 전용 서버로 분리해 트래픽을 분산합니다.

Q.대기열 서버가 과부하되면 어떻게 되나?

  • 대기열 서버를 여러 대 운영한다.
    • 대기열 서버를 여러 대로 늘리고 (N대 운영), 접속자를 부하 분산(Load Balancing)으로 나눕니다.
    • Sticky Session (세션 고정), userId는 처음 연결된 세션에 계속 붙어 있어야 한다.

→ 주의점!
: Redis 활용으로 대기열 상태를 공유해야 한다.

  • 각 대기열 서버가 각자 대기열을 관리하면 대기열 순서 꼬일 수 있다.
  • 따라서 모든 대기열 서버가 하나의 Redis를 공유해서 중앙관리하는 구조가 필요하다.

QQ. 그러면 대기열 수를 얼마나 둬야하나?

  • 일반적으로 웹소켓 서버 1대당 1~2만 세션 정도가 현실적인 한계입니다.
    • 이를 고려해서 , 사용자를 산정하야 둬야한다.

& 추가적인 운영 정책

  • 필요 시 Auto Scaling 설정
  • 트래픽 초과 시 일부 차단 정책

&비동기 이벤트 기반 처리

  • 대기열 퇴장 시 Kafka, Redis Pub/Sub로 다음 순번 처리
  • WebSocket은 단순 알림용
  • 대기열 서버는 상태 저장, 이벤트 발행 역할만 수행

VQUEUE (가상 대기열)

: 그리고 , 좀 더 찾아보다가 더 효율적인 방법에 대해 알게 된점이 VQUEUE(가상 대기열) 서버를 두는 방식입니다. 현재의 내가 가상의 시나리오에 사용한 방식은 서버 직접 관리 방식이다.

VQUEUE (가상 대기열 )

  • 실제 서버에 부하를 주지 않고, 사용자 브라우저나 프록시 단계에서 가상으로 대기열을 관리하는 방식
  • 사용자가 직접 서버에 접속해서 줄을 서는게 아니라 , “대기중이다”라는 화면을 보여주면서 서버에 주지 않고 , 서버 과부하를 막기 위한 기법이다.

& 동작 방식

  • 사용자가 서비스 접속 요청
  • 서비스 서버가 직접 응답하지 않고, 대기열 서버 (V 큐)가 응답
  • V 큐 서버가 대기열 순서를 부여하고, 대기 페이지를 응답
  • 대기열 순서가 되면, 실제 서비스 서버로 요청을 넘김
  • 사용자는 실제 서비스 화면을 보게 됨

& 장점

  • 메인 서비스 서버 보호
  • UX 개선
  • 순차 입장 보장
구분현재 방식 (서버 대기열)VQUEUE (Virtual Queue)
대기열 관리 위치서비스 서버에서 직접 관리별도 프록시/대기열 서버에서 관리
WebSocket 유지계속 연결 필요필요 없음 (프록시가 관리)
부하 분산서비스 서버가 직접 부담서비스 서버 부하 분산 (VQUEUE 서버가 흡수)
사용자 경험직접 대기순번 확인브라우저에 대기 화면 표시
트래픽 폭주 시서비스 서버가 감당VQUEUE 서버가 감당

& VQUEUE 방식
브라우저 → VQUEUE → 서비스

& 서버사이드
브라우저 → 서비스 → 서버 → 서비스

정리

  • 우리가 흔히 보던 대기열의 방식의 구조에 대해서 어느정도 알아볼 수 있었습니다.
  • 서버 관리에 있어서는 서버에 비용을 들이는만큼 해결할 수 있는 것도 있는데, 이를 최소화하기 위해 오토스케일링같은 기술들이 나오게되는게 아닌가싶다.
  • 실제 대용량 트랙을 터리하는 홈페이지는 어느정도의 사용자를 커버할 수 있을 지 궁금해졌다.
profile
꾸준한 개발자

0개의 댓글