Redis Stream + SSE를 통한 대기열 구현 모식

동준·6일 전
0

개인공부(스프링)

목록 보기
20/20
post-thumbnail

대기열(queue) 구현

1. 개요

대용량 트래픽이 몰리는 경우, 가장 문제가 될 수 있는 부분은 순서 정립이다. 예를 들어 티켓팅 서비스에서 선착순에 맞춰 인원을 컷하고 티켓을 제공해야되지만 트래픽이 몰리는 상황 속에서 어떤 요청이 먼저 왔는지 우열을 파악하기가 쉽지 않다. 즉 동시성 이슈가 발생할 우려가 커진다.

서비스 제공 측면에서는 위와 같고 실제 서버를 운용하는 개발자 입장에서 바라보자면, 특정 요청과 관련하여 트래픽이 몰리면서 부하가 급증하는 상황이 있을 때, 부하를 적절히 분배 처리함과 동시에 처리 결과의 정합성을 보장해야 하는 고민거리가 생긴다.

이 문제를 해결하기 위한 방법이 바로 대기열 구현인데, 사실 대기열이란 게 그리 특별한 게 아니라 우리가 흔히 마주하는 자료구조 중 하나인 큐(FIFO)다. 즉 동시다발적으로 들어오는 수많은 요청들을 큐에 집어넣어서 순서를 강제로 정립한 다음, 순차적으로 요청을 처리하면서 부하를 제어하는 것이 대기열의 기본 흐름이고 여기서 대기열의 시작점과 도착점을 병렬 확장시키면 부하 분산의 효과도 기대할 수 있다.

1) 대기열 구현을 위한 요소

저 병렬 확장을 위해 대기열에서 꺼내 처리하는 과정이 병렬적으로 이뤄질 수 있는 기능이 요구되는데, 이를 위해서 대기열은 보통 메세지 큐 시스템을 활용해서 복수의 컨슈머(대기열 처리 서버)가 작업 처리를 중복 없이 수행하도록 한다. 통상 사용되는 메세지 큐에는 Kafka, RabbitMQ, Redis Stream 등이 있다.

또한, 요청들을 대기열에 넣은 다음에 순차적으로 병렬 처리가 이뤄지기 때문에 사용자 경험 측면에서 실시간으로 자신의 요청이 어느 정도 처리됐는지를 나타내기 위한 실시간 연결이 요구되는데 WebSocket이나 SSE를 활용할 수 있다.

이번 모식 구현에서는 Redis StreamSSE를 활용했다.

2) Redis Stream

메세지 큐의 역할을 한다는 점에서 얼핏 보면 메세지브로커인 Redis Pub/Sub과 유사해보이기도 한다. 그 차이를 설명하자면...

단순히 구독자에게 메세지를 전달하면서 순서를 보장하지 않고 실시간 전달에만 집중하며, 따로 로그가 남지 않고 휘발되는 Redis Pub/Sub과 달리, 아예 Redis에서 제공하는 자료구조 중 하나로써 Stream을 활용하면 메세지의 로그를 남기면서 각 메세지별로 고유 ID를 제공함과 동시에 읽음 처리(XGROUPREAD)에 따라 대기 상태(PENDING) 전환 및 대기열 배제(XACK) 처리 등을 활용할 수 있다.

또한 컨슈머 그룹에 따라서 메세지 처리 대상을 묶어 분류할 수도 있기 때문에, 메세지를 여러 컨슈머들이 중복 없이 목적에 맞춰 처리할 수 있게 된다. 좀 더 자세한 플로우는 아래에서 설명한다.

3) SSE(Server Sent Event)

본래 클라이언트와 서버는 무상태성을 전제로 요청과 응답이 오고간다. 즉, 요청이 없으면 서버와 클라이언트는 연결될 일이 없지만, 채팅 등 실시간성이 요구되는 기능에서는 무상태성을 활용할 수 없기 때문에 좀 더 업그레이드 된 연결 방법이 요구된다.

그 중에 대표적으로 폴링웹소켓이 있는데, 폴링은 지속적인 요청을 계속 보내야 하므로 리소스에 상당한 부담이 가해지게 된다. 그래서 생각할 수 있는 게 웹소켓인데, 웹소켓은 클라이언트와 서버 간 쌍방 실시간 통신이 가능하다는 점에서 대기열 구현에서는 오버 엔지니어링이 될 수도 있다. 왜냐하면 서버의 실시간 상황을 클라이언트에게 보고하는 점에서 그치면 되지 굳이 클라이언트가 실시간으로 서버에게 요청을 보낼 일은 없기 때문이다. 그래서 대기열 구현과 같이 서버가 실시간으로 클라이언트에게 정보를 전달해야 할 경우에는 SSE를 활용한다. HTML5의 표준안이면서 재접속 같은 저수준의 처리를 자동 지원해준다.

2. 대기열 구현

1) 아키텍처

하나의 서버에서 모든 것(트래픽 수용, 대기열 처리)을 구현할 수도 있지만, 대기열의 시작점과 종단점을 구별해서 서버의 역할을 분리, 책임을 명확하게 나누기 위해 트래픽 수용 서버(8080번 포트)대기열 처리 서버(8081번 포트)를 간단하게 스프링부트로 구현한다.

트래픽 수용 서버에서 클라이언트로부터 요청을 받아들이며 대기열 처리 서버에서 SSE를 연결하여 대기열 처리 현황을 클라이언트에게 실시간 전달해준다.

2) 트래픽 수용 서버

Redis Stream이 대기열의 역할을 맡으므로, 트래픽 수용 서버는 요청을 받아들여서 대기열에게 해당 요청을 넘기게 된다. 여기서 Redis Stream에게 XADD를 통해 메세지를 넘겨야 하는데, 서버에서는 RedisTemplate를 통해 이뤄진다.

@Slf4j
@RestController
@RequestMapping("/queue")
@RequiredArgsConstructor
public class TrafficController {

    private static final String STREAM_KEY = "queue";

    private final RedisTemplate<String, String> redisTemplate;

    @PostMapping("/join")
    public ResponseEntity<JoinDTO> joinQueue(@RequestParam String userId) {
        RecordId recordId = redisTemplate.opsForStream()
                .add(STREAM_KEY, Map.of("userId", userId));

        log.info("대기열 참가: {} / {}", userId, recordId.getValue());
        return ResponseEntity.ok(new JoinDTO(userId, recordId.getValue()));
    }
}

스트림 키를 통해 어떤 큐에게 메세지를 넘길지를 지정해주면, 해당 메세지의 고유 식별값(RecordId)이 반환된다. 저 메세지 식별값을 활용해서 현재 대기열 내에서 몇 번째로 처리되고 있는지를 활용할 수 있다.

메세지 식별값은 현재 시간을 밀리세컨드로 표현하고 동일 시간대에는 인덱스를 부여하면서 고유성을 확보하게 된다. 이를 활용해서 Redis CLI에서 직접 XACK, XCLAIM, XPENDING 등을 수행할 수 있다.

만약 트래픽 수용과 대기열 처리가 같은 서버에서 이뤄졌으면 손쉽게 recordId 변수를 다른 서비스로 넘길 수 있겠지만, 나는 서버를 분리했기 때문에 다른 전달방법을 써야 하는데, 생각했던 방법은 2가지다.

  1. Redis에 저장해서 대기열 처리 서버에서 조회
  2. 클라이언트로 전달해서 SSE 연결 과정에서 대기열 처리 서버에게 같이 요청

나는 2번을 선택했는데, 이유는 내 나름대로 고난의 길을 걷기 위함(...)이다. SSE의 연결과 Redis Stream 메세지 수신의 비정합성 해결을 위한 방법을 고민하려고 2번을 선택했는데(물론 1번도 똑같은 이슈가 발생하지만, 2번보다는 간극이 덜할 것으로 생각) 생각해보니 MSA 채팅앱 구현에서 저 이슈를 해결한 적이 있다. 그 얘기는 대기열 처리 서버에서 설명할 예정.

구현이 정상적으로 이뤄졌다면, Redis Stream에 메세지를 실을 수 있다.

3) 대기열 처리 서버

대기열 처리 서버는 SSE 연결을 받아들임과 동시에 Redis Stream에서 메세지를 비동기적으로 수신한다. 잠시 Redis Stream에서의 메세지 처리 과정을 설명하자면...

  1. Redis Stream에 메세지가 추가된다(XADD). 각 메세지는 키와 값으로 이뤄진다.
    XADD <stream_name> <id> <field1> <value1> <field2> <value2> ...
  2. 메세지를 읽는다(XREADGROUP). 이때, 메세지를 컨슈머 그룹을 활용하여 읽으면 동일 컨슈머 그룹에 속한 소비자가 메세지를 분배해서 읽고 이것을 Redis가 메세지 식별값으로 추적해서 중복 처리를 방지한다.
    XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream >
  3. 읽은 메세지는 대기 상태로 전환된다(XPENDING)
    XPENDING mystream mygroup
  4. 대기 상태로 전환된 메세지를 확인 처리하면 대기열에서 빠져나온다(XACK)
    XACK <stream_name> <group_name> <message_id>

우리가 직접 관여할 스프링부트 서버에서 RedisTemplate을 통해 주로 다룰 부분은 4번이며, 2번은 사전 서버 세팅을 통해 부팅과 동시에 메세지를 수신하면서 읽음 처리를 수행한다. 2번이 이뤄지면 자동으로 3번이 이뤄지기 때문에 3번 이후에 대기열 처리(예약, 결제 등등)를 수행하고 4번을 코드로 호출한다. 참고로 1번은 이미 트래픽 수용 서버에서 이뤄지고 있다.

위의 과정은 비단 대기열 구현 외에도 Redis Stream 내에서의 메세지 생명주기를 나타내는 설명이기도 하다.

(1) 사전 세팅

일단 Redis 설정에서 (당연히) RedisTemplate을 빈으로 등록해주고, 스트림 메세지 리스너도 빈으로 등록해준다.

// 스트림 메세지 리스너 세팅
@Bean(name = "listenerContainer")
public StreamMessageListenerContainer<String, MapRecord<String, Object, Object>> streamMessageListenerContainer(RedisConnectionFactory connectionFactory) {
    
    // ...커스텀한 메세지 리스너 컨테이너 세팅

    return StreamMessageListenerContainer.create(connectionFactory, options);
}

스트림 메세지 리스너는 스프링부트 서버가 Redis Stream으로부터 메세지를 읽어오게 하는 역할을 담당한다. 이를 활용해서 대기열 처리 서버에서 중재자구독자를 세팅하여, Pub/Sub 모델을 기반으로 메세지를 병렬 처리할 수 있도록 한다.

@Slf4j
@Component
@RequiredArgsConstructor
public class RedisStreamConsumer
        implements StreamListener<String, MapRecord<String, Object, Object>>, InitializingBean, DisposableBean {
        
    // ...

    private Subscription subscription;

    private final StreamMessageListenerContainer<String, MapRecord<String, Object, Object>> listenerContainer;
    private final RedisTemplate<String, String> redisTemplate;
    
    // ...
    
    @Override
    public void onMessage(MapRecord<String, Object, Object> message) {
        // 대기열 처리(예약, 결제 등등...)

        // 수신 메세지 Ack 처리
        redisTemplate.opsForStream().acknowledge(CONSUMER_GROUP, message);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // ...큐 있는지 확인하고 없으면 큐 생성 + 컨슈머 그룹 세팅

        // 중재자 세팅(단일 컨슈머 그룹에서 메세지를 레디스 스트림 순서대로 수신)
        this.subscription = listenerContainer.receive(
                Consumer.from(CONSUMER_GROUP, CONSUMER_NAME),
                StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),
                this
        );

        // Redis Listen 시작
        this.listenerContainer.start();
    }

코드가 상당히 복잡한데, 일단 핵심은 앱이 부팅되기 전에 메세지 리스너 컨테이너를 구축해야 되기 때문에 afterPropertiesSet() 메소드를 오버라이딩해야 한다. Pub/Sub 모델을 위한 중재자와 구독자 세팅을 대기열 처리 서버에서 책임지기 때문에, 중재자(Subscription)를 사전에 세팅하여 메세지 리스너 컨테이너에게 특정 컨슈머 그룹으로부터 메세지를 수신할 수 있는 컨슈머 네임을 지정, 부여한다. 이를 통해서 구독자(Consumer)가 세팅되고 메세지를 실시간으로 Redis Stream으로부터 읽어오게 된다.

이를 통해서 아까 위에서 언급했던 2번, 읽음 처리가 메세지 수신과 동시에 수행될 수 있다. 읽어온 메세지는 앞서 언급했듯 곧바로 대기 상태로 전환된다.

(2) SSE 연결

클라이언트에게 실시간으로 대기열 처리 현황을 넘겨주기 위한 SSE 구축 역시 필요하다. 꽤나 간단하게 코드가 구성되지만, 앞서 얘기했듯 SSE 연결Redis Stream 메세지 수신의 비정합성을 해결해야 한다. 왜냐하면 인메모리 DB 특성상, Redis는 상상 이상으로 빠르기 때문(...)에 SSE가 연결되기 전에 Redis로부터 메세지가 수신되면 연결된 SSE가 없어서 메세지가 소실될 수 있다.

사전 세팅은 웹소켓처럼 복잡하지 않고 컨트롤러 API 메소드에 어노테이션 값만 할당하면 끝이다.

@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter queue(
        @RequestParam("userId") String userId,
        @RequestParam("messageId") String messageId
) {
    log.info("sse 시작: {} / {}", userId, messageId);
    // ...

물론 저 SseEmitter 객체를 반환하게 하기 위해 요청이 들어오는 클라이언트별로 해당 SseEmitter 객체를 생성해줘야 하는데, 그것은 일단 서비스 쪽으로 넘겼다.

@Slf4j
@Service
@RequiredArgsConstructor
public class SseEmitterService {

    private Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
    private Map<String, QueueDTO> records = new ConcurrentHashMap<>();
    private Map<String, Long> messageTimestamps = new ConcurrentHashMap<>();
    
    // ...

    public SseEmitter createEmitter(String userId, String messageId) {
        messageTimestamps.put(userId, extractTimeStamp(messageId)); // Stream 메세지 ID 저장

        SseEmitter emitter = new SseEmitter(60_000L); // 60초 타임아웃
        emitters.put(userId, emitter); // SSE Emitter 저장

        emitter.onCompletion(() -> emitters.remove(userId));
        emitter.onTimeout(() -> emitters.remove(userId));

        if (records.containsKey(userId)) {
            try {
                String response = objectMapper.writeValueAsString(records.get(userId));
                emitter.send(SseEmitter.event().name("queue").data(response));

                // SSE 연결 종료 책임은 클라이언트에게
            } catch (Exception e) {
                emitter.onCompletion(() -> emitters.remove(userId)); // 전송 실패 시 제거
            }
        }

        return emitter;
    }

ConcurrentHashMap 타입들이 상당히 많은데, 클라이언트의 고유값인 userId를 기반으로 메세지 고유값인 messageId에서 타임스탬프를 추출하여 사전 저장해두고, SseEmitter가 생성되는 시점에서 이미 메세지가 수신되어 있으면 그것을 저장해뒀다가 확인하여 클라이언트로 송신하는 로직을 추가한다. 이를 통해 메세지 소실을 방지할 수 있다.

8080번 포트가 점유하는 트래픽 수용 서버에 요청을 보내면, 8081번 포트에서 SSE를 통해 클라이언트로 실시간 정보가 전달되며, 구독이 이뤄진 클라이언트에서 확인이 가능해진다.

4) 클라이언트 구축 & 메세지 식별값 기반 처리 현황 연산

클라이언트 코드는 크게 설명은 하지 않는다. 단순 바닐라 자바스크립트를 통해 SSE 연결 수신을 구현했다. 리액트에서는 SSE 활용이 조금 다른 것으로 알고 있다.

function joinQueue() {
  const userId = document.getElementById("userId").value.trim();

  if (userId === "") {
    alert("Please enter a user ID");
    return;
  }

  fetch(`http://localhost:8080/queue/join?userId=${userId}`, {
    method: "POST",
    headers: {
      "Content-Type": "application/json"
    }
  })
    .then(response => {
    if (!response.ok) {
      throw new Error("대기열 참가 실패");
    }
    return response.json();
  })
    .then(data => {
    const userId = data.userId;
    const messageId = data.messageId;
    startSSE(userId, messageId); // SSE 시작
  })
    .catch(err => console.error("대기열 참가 실패:", err));
}

function startSSE(userId, messageId) { 
  document.getElementById("modalUserId").textContent = userId;

  eventSource = new EventSource(`http://localhost:8081/stream?userId=${userId}&messageId=${messageId}`, {
    withCredentials: true,
}

사용자 식별값을 쿼리 파라미터로 트래픽 수용 서버에 보내줌과 동시에 받은 응답에 담긴 메세지 식별값을 담아 대기열 처리 서버에 구현된 SSE 연결 구독을 시도한다. 간단한 구현을 위해서지만 실제 서비스에서는 JWT 토큰을 헤더에 담고 메세지 식별값을 요청 바디에 담는 등의 요청이 이뤄질 것이다.

앞서 언급했듯, 메세지 식별값은 타임스탬프 + 인덱스를 통해 고유성을 확보한다. 이 타임스탬프를 적절히 환산해서 사용자 자신의 메세지 식별값과 현재 처리되는 메세지 식별값의 차이를 활용해서 진행률을 계산할 수 있다.

    emitters.forEach((clientUserId, emitter) -> {
        long clientTime = messageTimestamps.getOrDefault(clientUserId, 99999L);
        double processPercent = 100 * Math.pow(Math.log10(10 * ((double) extractTimeStamp(messageId) / clientTime)), 20);
        processPercent = Math.floor(processPercent * 100) / 100.000;
    
    	// ....

    private long extractTimeStamp(String messageId) {
        Pattern pattern = Pattern.compile("^(\\d+)-");
        Matcher matcher = pattern.matcher(messageId); // 타임스탬프 부분
        if (matcher.find()) {
            String timestamp = matcher.group(1).substring(8);  // 임의 서브스트링
            return Long.parseLong(timestamp);
        }

        return 0L;
    }

SseEmitter 객체들을 저장한 곳에서 일괄 조회해서 현재 메세지 식별값에 따른 현황을 지속적으로 송신한다. 나는 변화율의 음수를 방지하고 그 폭을 극단적으로 증가시키기 위해서 로그와 제곱근을 활용해서 연산했다.

3. 테스트 결과

JMeter를 활용하여 가상 사용자 5000명을 상정하고 테스트를 수행한다.

1) 대기열 처리 서버 메세지 수신

트래픽 수용 서버로 트래픽을 쏘면 Redis Stream으로 넘어간 메세지들이 성공적으로 수신되면서 대기열 처리 서버의 히카리풀 로그에 찍히는 것을 볼 수 있다.

2) 클라이언트 실시간 수신

히카리풀 로그보다 클라이언트 데이터 수신이 더 빠르게 확인되는데, 데이터 수신은 정상적으로 이뤄지는 것이 확인되는 걸 보니 데이터 처리 속도가 매우 빨라서 히카리풀 로그가 따라가지 못하는 것으로 보인다(...)

만약 프론트엔드에 익숙하다면 실시간으로 변하는 데이터 처리 현황을 바탕으로 로딩바를 표현해서 좀 더 시각적으로 정보를 전달할 수도 있다.

3) 후기

생각 이상으로 어려운 토이 프로젝트였다. 처음에는 대기열 처리 서버 역시 비동기 처리에 강점을 보이는 Netty를 통해 구현하려 했으나 RedisTemplate에서 제공하는 Redis Stream 관련 세팅이나 기능이 제한적이었고 뭣보다 내가 리액티브 프로그래밍에 아직 익숙치 않았다.

거기에 더불어 늘 생기는 CORS 이슈와 SSE 연결 소실 이슈 등등... 다만 그럼에도 최대한 아키텍처를 유지하고자 했던 것은, 추후 배포 및 실제 서버 운용에서 앞서 언급했듯 대기열의 활용 극대화를 위한 서버 오토 스케일링이나 로드밸런싱을 고려했던 부분이라 그런 부분이 준수되면서 구현된 것은 꽤 만족스럽다.

Redis Stream에 대한 정보들에 대해 틀린 부분이 있으면 댓글 등으로 지적 바라며, 이 포스팅이 대기열이 궁금하신 분들이나 Redis Stream을 접하고자 하는 분들께 도움이 되길 바란다.

소스 코드
https://github.com/kimD0ngjun/queue

profile
scientia est potentia / 벨로그 이사 예정...

0개의 댓글

관련 채용 정보