대기열 - 실시간 처리 구조 RabbitMQ + SSE

heesan·2026년 1월 16일

IMPRESSER

목록 보기
4/5
post-thumbnail

GPU 기반 이미지 압축 서비스인 Impresser 프로젝트에서 이미지를 비동기 큐에 쌓아 처리한 뒤, 처리 결과나 진행 상태를 실시간으로 사용자 UI에 전달해야 했습니다.

이를 위해 서버는 RabbitMQ를 통해 메시지를 비동기 처리하고, SSE로 클라이언트에게 실시간 알림을 전송합니다.

🧩 전체 동작 흐름 요약

  1. 사용자가 이미지를 업로드하고 압축 요청
  2. 백엔드 서버가 RabbitMQ 큐에 메시지를 발행
  3. RunPod에서 실행 중인 GPU 이미지 압축 서버가 큐를 구독하여 작업 수행
  4. 압축 완료 후 결과를 백엔드 API로 전달
  5. 백엔드는 결과를 DB에 저장
  6. SSE를 통해 사용자 UI에 압축 완료 이벤트 전송


이 구조를 통해 대기열 기반 비동기 처리 + 실시간 사용자 피드백을 동시에 만족할 수 있었습니다.


1. RabbitMQ란? — 비동기 메시지 브로커

AMQP(Advanced Message Queuing Protocol, 메시지 지향 미들웨어를 위한 개방형 표준 응용 계층 프로토콜)를 구현한 오픈소스 메세지 브로커입니다.

즉, RabbitMQ는 메시지를 큐에 저장하고, Producer(생산자) ↔ Consumer(소비자) 간 비동기 통신을 가능하게 하는 오픈 소스 메시지 브로커입니다.

클라이언트와 서버가 직접 연결되지 않아도 메시지 전달이 가능하고, 서비스 간의 느슨한 결합(loose coupling) 구조를 만듭니다.

📌 메시지 큐 란?

프로세스 또는 프로그램 간에 데이터를 교환할 때 사용하는 통신방법으로 메시지 지향 미들웨어를 구현한 시스템을 의미합니다.

메시지 지향 미들웨어는 비동기 메시지를 사용하는 응용프로그램 사이에서 데이터를 송수신하는 것을 의미합니다.

여기서 메시지는 요청, 응답 및 오류 등 다양한 정보 등의 데이터가 될 수 있습니다.

📌 메시지 큐를 사용하는 경우

메시지 큐는 즉시 응답이 필요한 요청-응답 흐름에는 적합하지 않습니다.

❌ 사용하지 않는 예

  • 회원가입 성공/실패 여부
  • 비밀번호 변경 결과

⭕ 사용하기 좋은 예

  • 회원가입 완료 알림 메일
  • 비밀번호 변경 알림
  • 이미지 압축, 영상 변환, 대용량 연산 작업

👉 Impresser의 이미지 압축은 처리 시간이 길고, 결과만 나중에 전달하면 되는 작업이므로 메시지 큐에 매우 적합했습니다.

🔹 기본 개념

  • Producer: 메시지를 생성해 큐에 보낸다.
  • Exchange: 메시지를 라우팅하는 역할.
  • Queue: 메시지를 저장하는 임시 저장소.
  • Consumer: 큐로부터 메시지를 받아 처리한다.
  • RabbitMQ는 AMQP 프로토콜을 따르며, 다양한 routing과 Pub/Sub 구조를 지원합니다.
  • Binding : Exchange에게 알맞은 큐에 메시지를 라우팅 할 때 규칙을 지정하는 행위, Exchange의 종류에 따라 지정하는 방식이 달라집니다.

    (Pub/Sub는 메시지를 생성하는 서비스를 해당 메시지를 처리하는 서비스에서 분리하는 확장 가능한 비동기 메시징 서비스입니다.)

📌 왜 RabbitMQ를 쓸까?

✔ 처리 지연이 있을 수 있는 작업은 큐에 쌓아 비동기 처리
✔ 서비스 간 높은 트래픽 분산 및 장애 시 메시지 누락 예방
✔ 확장성과 유연성이 높음


2.📡 Server‑Sent Events (SSE)

SSE란?

Server‑Sent Events는 HTTP 기반의 단방향 실시간 스트리밍 기술입니다.
클라이언트가 EventSource 객체를 통해 하나의 GET 요청을 서버에 보내면, 서버는 text/event-stream MIME 타입으로 연속적인 이벤트를 푸시합니다.

특징

  • 자동 재연결: 연결이 끊어지면 브라우저가 자동으로 재시도합니다.
  • 경량: 별도 WebSocket 핸드쉐이크가 필요 없으며, HTTP/1.1만 지원하면 동작합니다.
  • 단순: 서버는 SseEmitter(Spring) 혹은 직접 문자열을 스트림에 쓰면 됩니다.
  • 일방향: 서버 → 클라이언트만 전송, 클라이언트 → 서버는 일반 HTTP 요청을 사용합니다.

왜 SSE를 사용?

상황전통적인 방법 (Polling)SSE 사용 시 장점
이미지 압축 작업은 길게 걸리며 결과가 나중에 도착매번 API를 호출해야 하며, 압축이 끝나지 않아도 불필요한 트래픽 발생서버가 압축 완료 시점에 즉시 푸시 → UI가 바로 업데이트
작업 진행 상황 (예: 단계별 진행률)클라이언트가 주기적으로 상태를 조회해야 함서버가 진행 단계를 실시간으로 전송 → 사용자는 “진행 중” UI를 즉시 확인
다수의 사용자가 동시에 압축 요청각 클라이언트마다 폴링 요청 발생 → 서버·네트워크 부하 증가단일 SSE 연결로 여러 이벤트 전송 → 서버·네트워크 부하 감소

(polling(폴링)은 리얼타임 웹을 위한 기법으로, 일정한 주기(특정한 시간)을 가지고 서버와 응답을 주고 받는 방식을 말한다.)


전체 흐름 (다이어그램)


1️⃣ RabbitMQ 관련 코드

1-1️⃣ RabbitMQConfig.java – 설정 및 Exchange / Queue 정의

@Configuration
@EnableRabbit
public class RabbitMQConfig {

    /* ---------- 1️⃣ ConnectionFactory ---------- */
    @Value("${spring.rabbitmq.host}")      private String rabbitmqHost;
    @Value("${spring.rabbitmq.port}")      private int    rabbitmqPort;
    @Value("${spring.rabbitmq.username}")  private String rabbitmqUsername;
    @Value("${spring.rabbitmq.password}")  private String rabbitmqPassword;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cf = new CachingConnectionFactory(rabbitmqHost, rabbitmqPort);
        cf.setUsername(rabbitmqUsername);
        cf.setPassword(rabbitmqPassword);
        cf.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
        cf.getRabbitConnectionFactory().setTopologyRecoveryEnabled(true);
        return cf;
    }

    /* ---------- 2️⃣ RabbitAdmin (자동 선언) ---------- */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory cf) {
        return new RabbitAdmin(cf);
    }

    /* ---------- 3️⃣ JSON 직렬화 ---------- */
    @Bean
    public Jackson2JsonMessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /* ---------- 4️⃣ RabbitTemplate (MessageConverter 적용) ---------- */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory cf,
                                         Jackson2JsonMessageConverter json) {
        RabbitTemplate tmpl = new RabbitTemplate(cf);
        tmpl.setMessageConverter(json);
        return tmpl;
    }

    /* ---------- 5️⃣ DirectExchange / Queue / Binding ---------- */
    public static final String COMPRESS_EX = "compress.direct";   // 이미지 압축용
    public static final String PRINT_EX    = "print.direct";      // 설비(프린트)용
    public static final int    N_SHARDS   = 32;                  // 압축 샤드 수

    @Bean
    public DirectExchange compressExchange() {
        return ExchangeBuilder.directExchange(COMPRESS_EX).durable(true).build();
    }

    @Bean
    public DirectExchange printExchange() {
        return ExchangeBuilder.directExchange(PRINT_EX).durable(true).build();
    }

    /* 샤드 큐 32개 (compress.shard.0 ~ compress.shard.31) */
    @Bean
    public Declarables compressShardTopology(@Qualifier("compressExchange") DirectExchange ex) {
        List<Declarable> decl = new ArrayList<>();
        for (int i = 0; i < N_SHARDS; i++) {
            String qName = "compress.shard." + i;
            Queue q = QueueBuilder.durable(qName)
                    .withArgument("x-dead-letter-exchange", DLX)
                    .build();
            Binding b = BindingBuilder.bind(q).to(ex).with(qName);
            decl.add(q);
            decl.add(b);
        }
        return new Declarables(decl);
    }

    /* 설비용 DirectExchange + 10개의 큐 (inkjet-1 ~ inkjet-10) */
    @Bean
    public Declarables printTopology(@Qualifier("printExchange") DirectExchange ex) {
        List<Declarable> decl = new ArrayList<>();
        for (int i = 1; i <= INKJET_COUNT; i++) {
            String qName = "inkjet-" + i + ".print.q";
            String routingKey = "inkjet-" + i;
            Queue q = QueueBuilder.durable(qName)
                    .withArgument("x-dead-letter-exchange", DLX)
                    .build();
            Binding b = BindingBuilder.bind(q).to(ex).with(routingKey);
            decl.add(q);
            decl.add(b);
        }
        return new Declarables(decl);
    }

    /* DLX / DLQ (실패 메시지 보관) */
    public static final String DLX = "dlx.topic";
    public static final String DLQ = "tasks.dlq";

    @Bean
    public TopicExchange dlx() {
        return ExchangeBuilder.topicExchange(DLX).durable(true).build();
    }

    @Bean
    public Queue dlq() {
        return QueueBuilder.durable(DLQ).build();
    }

    @Bean
    public Binding dlqBinding(TopicExchange dlx, Queue dlq) {
        return BindingBuilder.bind(dlq).to(dlx).with("#");
    }
}

📌 핵심 포인트

  • ConnectionFactory → RabbitMQ 서버와 연결, 자동 복구 활성화
  • RabbitTemplate + Jackson2JsonMessageConverter → 객체를 JSON 으로 직렬화해 전송
  • compress.direct / print.direct 두 개의 DirectExchange 로 압축과 프린트 라우팅 분리
  • 샤드 큐 32개 (compress.shard.*) 로 수평 확장 지원
  • DLX / DLQ 로 실패 메시지 안전하게 보관

1-2️⃣ Producer – 메시지 발행


PrintMessageProducer.java (프린트용)

@Service
@RequiredArgsConstructor
public class PrintMessageProducer {
    private final RabbitTemplate rabbitTemplate;

    @Value("${spring.rabbitmq.print.exchange}")
    private String printExchange;

    public void sendPrintMessage(String routingKey,
                                 CompressImageMessage message) {
        rabbitTemplate.convertAndSend(printExchange, routingKey, message);
    }
}

ImageMessageProducer.java (압축용)

@Service
@RequiredArgsConstructor
public class ImageMessageProducer {
    private final RabbitTemplate rabbitTemplate;

    @Value("${spring.rabbitmq.compress.exchange}")
    private String compressExchange;

    public void sendCompressMessage(String routingKey,
                                    CompressImageMessage message) {
        rabbitTemplate.convertAndSend(compressExchange, routingKey, message);
    }
}

📌 흐름

  • 클라이언트가 /api/compress 로 파일 전송
  • 백엔드 컨트롤러가 ImageMessageProducer.sendCompressMessage(...) 호출
  • routingKey = compress.shard.{i} 로 샤드 큐에 메시지 발행
  • GPU-RunPod 가 큐를 구독하고 압축 수행

2️⃣ SSE (Server-Sent Events) 구현


2-1️⃣ 구독 엔드포인트 – SseController.java

@RestController
@RequestMapping("/sse")
@RequiredArgsConstructor
public class SseController {
    private final SseService sseService;

    @GetMapping(value = "/subscribe",
                produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter subscribe() {
        UUID userUuid = SecurityUtil.getCurrentUserUuid()
                .orElseThrow(() -> new BusinessException(ErrorCode.INVALID_ACCESS_TOKEN));
        return sseService.subscribe(userUuid);
    }
}
  • produces = MediaType.TEXT_EVENT_STREAM_VALUE 로 SSE 스트림 명시
  • 사용자 UUID 기준으로 구독 관리

2-2️⃣ 서비스 – SseService.java

@Service
@RequiredArgsConstructor
@Slf4j
public class SseService {

    private final EmitterRepository sseEmitterRepository;
    private static final long DEFAULT_TIMEOUT = 15 * 60 * 1000L;

    public SseEmitter subscribe(UUID userUuid) {
        SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
        sseEmitterRepository.add(userUuid, emitter);

        emitter.onCompletion(() -> sseEmitterRepository.remove(userUuid, emitter));
        emitter.onTimeout(()    -> sseEmitterRepository.remove(userUuid, emitter));
        emitter.onError(e ->    sseEmitterRepository.remove(userUuid, emitter));

        sendInitAndPing(emitter, userUuid);
        return emitter;
    }

    public void sentToClient(UUID userUuid, String eventName, Object data) {
        Collection<SseEmitter> emitters = sseEmitterRepository.getAll(userUuid);
        if (emitters.isEmpty()) return;

        String eventId = userUuid + "_" + System.currentTimeMillis();
        for (SseEmitter emitter : new ArrayList<>(emitters)) {
            try {
                emitter.send(SseEmitter.event()
                        .name(eventName)
                        .id(eventId)
                        .data(data));
            } catch (IOException e) {
                sseEmitterRepository.remove(userUuid, emitter);
            }
        }
    }

    private void sendInitAndPing(SseEmitter emitter, UUID userUuid) {
        try {
            emitter.send(SseEmitter.event()
                    .name("INIT")
                    .id(userUuid + "_init_" + System.currentTimeMillis())
                    .data("ok"));
            emitter.send(SseEmitter.event()
                    .name("PING")
                    .id(userUuid + "_ping_" + System.currentTimeMillis())
                    .data("ping"));
        } catch (IOException e) {
            sseEmitterRepository.remove(userUuid, emitter);
            emitter.completeWithError(e);
        }
    }
}

2-3️⃣ EmitterRepository (핵심 저장소)

@Component
public class EmitterRepository {
    private final ConcurrentMap<UUID, Set<SseEmitter>> emitters = new ConcurrentHashMap<>();

    public void add(UUID userUuid, SseEmitter emitter) {
        emitters.computeIfAbsent(userUuid, k -> ConcurrentHashMap.newKeySet()).add(emitter);
    }

    public void remove(UUID userUuid, SseEmitter emitter) {
        Set<SseEmitter> set = emitters.get(userUuid);
        if (set != null) {
            set.remove(emitter);
            if (set.isEmpty()) emitters.remove(userUuid);
        }
    }

    public Collection<SseEmitter> getAll(UUID userUuid) {
        return emitters.getOrDefault(userUuid, Collections.emptySet());
    }
}

3️⃣ 실제 동작 예시

@PostMapping("/compress")
public ResponseEntity<Void> requestCompress(@RequestPart MultipartFile file,
                                            @AuthenticationPrincipal UserDetails user) {
    UUID userUuid = ((CustomUser) user).getUuid();

    CompressImageMessage msg = new CompressImageMessage(
            file.getOriginalFilename(),
            file.getBytes(),
            userUuid
    );

    String routingKey = "compress.shard." + (userUuid.hashCode() & 0x1F);
    imageMessageProducer.sendCompressMessage(routingKey, msg);

    return ResponseEntity.accepted().build();
}

public void handleCompressResult(CompressResultDto result) {
    compressResultRepository.save(result);

    sseService.sentToClient(
            result.getUserUuid(),
            "COMPRESS_DONE",
            result
    );
}

클라이언트는 EventSource/sse/subscribe 를 구독하고 있다가
COMPRESS_DONE 이벤트를 받으면 UI 에 “압축이 완료되었습니다” 알림을 표시합니다.


4️⃣ 정리

요구사항RabbitMQSSE
비동기 처리메시지를 큐에 적재 → 백엔드 즉시 응답작업 완료 시 실시간 알림
신뢰성메시지 영속성, 재시도 가능자동 정리 및 재연결
복잡도Spring AMQP 설정만으로 구현SseEmitter 기반 단순 구조

마무리

RabbitMQ가 “작업을 백그라운드로 넘기는 역할”을,
SSE가 “작업 완료를 UI로 알려주는 역할”을 담당하여
비동기 처리와 실시간 피드백을 깔끔하게 분리했습니다.

profile
👩‍💻Backend Engineering

0개의 댓글