
얼마 전 올리브영 쿠폰 시스템의 성장기를 보면서 Redis Pub/Sub에 대해 접하게 됐다. 개선 과정에서 Redis Pub/Sub이 메시지 전송을 100% 보장하지 못한다는 특징이 있어, 이를 개선하는 과정도 나와 있었다. 이를 보면서 "왜 Redis Pub/Sub은 메시지 전송을 100% 보장하지 못하지?"라는 의문이 생겨 Redis Pub/Sub에 대해 알아보기로 했다.
Redis Pub/Sub은 Redis가 제공하는 메시징 패턴으로, Publisher가 메시지를 특정 채널에 발행하면 해당 채널을 Subscribe하고 있는 모든 구독자 Subscriber에게 실시간으로 메시지를 전달하는 방식이다. Redis Pub/Sub는 메모리 기반으로 동작하여 매우 빠른 속도로 메시지를 전달할 수 있다.

Redis 공식 문서를 보면 다음과 같이 나와 있다.


Redis Pub/Sub은 "at-most-once"라는 방식으로 메시지를 전달하고, 메시지를 전달하는 과정에서 어떤 이유로 전달하지 못했을 경우에는 메시지가 영구적으로 손실된다고 한다.


이 때, 메시지를 전달하고 메시지를 잘 받았다는 ACK가 오기를 기다린다. 이러한 과정으로 메시지 중복 전달이 생긴다.
Exactly-Once는 At-Least-Once와 메시지 ID를 함께 사용한다. Subscriber가 메시지를 받을 때, 받은 적 있는 메시지인지를 메시지 ID를 통해 구분한다. 중복된 메시지일 경우 저장(혹은 처리ㅊ과정) 없이 ACK만 보낸다.
Redis Pub/Sub은 메시지를 안전하게 전달한다고 보장하지 못한다. 그렇다면 메시지를 안전하게 전달하려면 뭘 써야 할까?

Redis 공식 문서에서는 Redis Stream을 사용하길 권장한다고 한다. Redis Stream은 "At-Least-Once" 방식을 지원한다고 한다.
Redis 이외의 Message Queue를 사용하는 것도 방법일 것 같다.
공부하며 Redis Pub/Sub이 메시지 전달을 100% 보장하지 않는 이유는 의도적인 설계라고 생각했다.
처음, 이 의문을 품었을 때는 메시지 유실이 모든 상황에서 치명적인 리스크처럼 느껴졌다. 하지만 채팅방의 ‘상대방이 작성 중…’ 상태 알림이나 모니터링 시스템의 신호 전달 같은 사례를 떠올리며 생각이 바뀌었다. 이런 경우 메시지는 단발성이 아니라 지속적으로 반복되며, 일부가 누락되더라도 전체 흐름이나 상태만 유지되면 문제가 되지 않는다.
이처럼 메시지 전달의 신뢰성보다 속도와 낮은 오버헤드가 더 중요한 상황에서는 약간의 유실을 감수하는 것이 오히려 합리적인 선택이 될 수 있다. Redis Pub/Sub은 바로 이런 요구에 최적화된 구조다.
반대로 메시지 유실이 곧바로 비즈니스 장애로 이어지는 경우라면, 비교적 속도가 느리더라도 메시지 전달의 신뢰성을 보장하는 Redis Stream과 같은 기술을 선택하는 것이 더 적절하다.
결국 Redis는 단일한 정답을 강요하지 않고, 상황에 따라 안정성과 성능 사이의 트레이드오프를 선택할 수 있도록 여지를 남겨두었으며 Redis Pub/Sub 역시 그 맥락에서 설계된 도구라고 생각한다.
이제 간단한 실습을 통해 Redis Pub/Sub을 써보고, 실제로 메시지 손실이 발생하는지도 확인해 보자.
환경: Java 17 + Spring Boot 4.0 + Gradle, Docker, JUnit5
참고: GitHub Repository
우선 Docker로 테스트를 위한 Redis를 띄워준다.
sudo docker run --name redis-pubsub \
-p 6379:6379 \
-d redis:7.2-alpine
다음 종속성과 함께 Spring Boot 프로젝트를 생성해 준다.
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
implementation 'org.springframework.boot:spring-boot-starter-webmvc'
application.yml
spring:
data:
redis:
host: localhost
port: 6379
RedisPubSubConfig.class
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Configuration
class RedisPubSubConfig {
private static final String CHANNEL = "message.issue";
@Bean
public ChannelTopic messageTopic() {
return new ChannelTopic(CHANNEL);
}
@Bean
public MessageListenerAdapter messageListenerAdapter(MessageSubscriber subscriber) {
return new MessageListenerAdapter(subscriber, "receiveMessage");
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
ChannelTopic topic
){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, topic);
return container;
}
@Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
}
MessagePublisher.class
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class MessagePublisher {
private final StringRedisTemplate redisTemplate;
private final ChannelTopic channelTopic;
public void publish(String message) {
redisTemplate.convertAndSend(channelTopic.getTopic(), message);
}
}
MessageSubscriber.class
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MessageSubscriber {
@Getter
private static int COUNT = 0; // 테스트를 위한 카운트
public void receiveMessage(String message) {
COUNT += 1;
log.info("[Redis SUB] received: {} == count: {}", message, COUNT);
}
}
구현이 끝났으면 테스트를 작성해 준다. 테스트는 1만 명이 동시에 Publish를 했을 때, Subscriber가 모두 메시지를 받는지 확인한다.
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.awaitility.Awaitility.await;
@SpringBootTest
public class RedisPubSubTest {
@Autowired
private MessagePublisher messagePublisher;
@Autowired
private MessageSubscriber messageSubscriber;
@Test
void publishConcurrentlyWithExecutor() throws InterruptedException {
int n = 10_000;
int threadSize = 200;
ExecutorService pool = Executors.newFixedThreadPool(threadSize);
CountDownLatch ready = new CountDownLatch(n);
CountDownLatch start = new CountDownLatch(1);
CountDownLatch done = new CountDownLatch(n);
AtomicInteger ok = new AtomicInteger(0);
AtomicInteger fail = new AtomicInteger(0);
for (int i=0; i<n; i++) {
pool.submit(() -> {
ready.countDown();
try {
start.await();
try {
messagePublisher.publish("test message");
ok.incrementAndGet();
} catch (Exception e) {
fail.incrementAndGet();
} finally {
done.countDown();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
ready.await(10, TimeUnit.SECONDS);
start.countDown();
done.await(60, TimeUnit.SECONDS);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(MessageSubscriber.getCOUNT()).isEqualTo(n);
});
pool.shutdownNow();
}
}

예상대로 1만 개의 요청을 보냈는데, 9,997개의 메시지만 전달된 것을 확인할 수 있었다.

하지만 항상 메시지 손실이 발생하는 점은 아니라는 것도 확인할 수 있었다.