레디스에서 사용자의 순위가 변한다던지 참가열로 이동하는 행위가 발생하였을 때 kafka로 메세지를 전송하게 되는데 이러한 전송이 실패할 경우 대기열에서의 사용자의 상태 정보 변화가 누락될 수 있음
따라서, DB와 Debezium을 사용하여 메세지 전송에 대한 재시도와 순서 보장을 했지만, 만약 DB에 장애가 발생했을 때 이는 단일 실패 지점이 되는 심각한 문제를 초래할 수 있다고 생각하였습니다.
따라서, DB를 제거하고 직접 카프카로 이벤트를 발행하는 방식으로 변경하였습니다.
이 프로젝트에서는 사용자의 순위 변동이나 참가열 접근 권한 변경 사항을 실시간으로 전달하기 위해 SSE Sink를 사용했습니다.
단일 서버 환경에서는 sink를 통해 이벤트를 전달하는데에 있어 문제가 발생하지 않지만, 분산 환경에서는 문제가 발생합니다.
SSE Sink는 서버별 독립적으로 동작하기 때문입니다.
⇒ 특정 서버로 전달된 이벤트는 다른 서버로는 전달되지 않음
⇒ 각 서버별 sink의 동기화가 이루어지지 않음
이를 해결하기 위해 Redis의 Pub/Sub 기능을 사용하였습니다.
모든 서버가 Redis의 특정 채널을 구독하도록 하고, 이벤트 발생 시 해당 이벤트를 Redis로 발행하여 이후 Redis 채널을 구독 중인 각 서버가 자신의 SSE Sink로 이벤트를 전달하여, 모든 서버의 Sink가 동일한 이벤트를 받을 수 있도록 하였습니다.

동작 과정
queueing-system 토픽으로 메시지가 발행됩니다.메세지 브로커로 Redis Pub/Sub을 사용
여러 서버들을 동일한 카프카 consumer group으로 묶습니다.
서버들을 동일한 카프카 consumer group 으로 설정하면 produce 된 메세지를 반드시 서버 중 하나만 처리하게 됩니다.
따라서, 모든 서버로 메세지를 전달하기 위한 브로커 역할로 Redis Pub/Sub을 사용하였습니다.
각 서버는 레디스 채널을 구독하게 하고, 카프카 컨슈머가 받은 메시지를 이 레디스 채널에 다시 전달합니다. 이렇게 하면 레디스가 해당 메시지를 구독 중인 모든 서버로 전파하여, 모든 서버가 같은 메시지를 받도록 하였습니다.
Redis Pub/Sub이란 ?
특정 주제 ( 토픽 또는 채널 )을 구독한 사용자에게 메세지를 발행하는 통신 방법으로, publisher가 메시지를 publish 하면 해당 주제를 구독하고 있는 subscriber 들이 메시지를 받을 수 있는 구조
redis는 in-memory 방식이기 때문에 다른 방식보다 매우 빠르게 메세지를 주고 받을 수 있는 장점이 있지만, publish 한 메세지는 따로 저장되지 않으며 subscriber들이 수신했는지 확인할 수 없다는 단점이 있습니다.
따라서, 메세지를 빠르게 보내야하고, 따로 저장하지 않아도 되며 확인이 필요하지 않을 때
즉, 확실히 전송이 보장되지 않아도 상관 없는 상황에서 사용하기 좋습니다.
Redis Pub/Sub 방식을 사용하면 메세지를 받지 못하는 현상이 발생할 수 있음
Pub/Sub 구조에서는 발행 시점에 구독이 끊긴다던지, 처리 도중에 장애가 생길 경우 메세지가 유실되어 재처리도 할 수 없는 심각한 장애가 발생할 수도 있는데, 이를 해결하기 위해 redis의 list 자료구조를 사용하여 발급 요청 데이터를 RPush로 큐에 저장하고, 주기적으로 큐를 확인하여 데이터가 존재하는 경우LPop으로 꺼내어 처리하는 방식을 사용하였습니다.
이는 list에 저장된 데이터가 redis에 저장되어 있으므로 발행 순간에 놓쳐서 유실되는 문제를 방지할 수 있습니다.
Redis Pub/Sub의 한계
Redis Pub/Sub은 메세지를 발행하는 순간에만 구독자에게만 전달하고 발행 했을 때 연결되어있지 않다면 메세지가 사라지며, 영구적으로 저장하거나 보관하지 않기 때문에 만약 메세지 전송에 실패한다면 재전송이나 재처리에 어려움을 가질 수 있음
또한 절대적으로 메세지 순서를 보장하지 않음
만약 여러 발행자가 거의 동시에 메세지를 보낸다면, 네트워크 지연, redis 내부 처리 순서에 따라 구독자가 받는 순서가 바뀔 수 있음
따라서, 단순히 별도로 상태를 관리하지 않아도 되는 메세지의 경우 이를 사용하는게 적합할 수 있지만, 메세지에 많은 정보를 포함해야 하거나 재처리 및 순서 제어에 대한 로직이 필요하다면 RabbitMQ나 kafka를 사용하는 것이 좋음
하지만 순서 보장이 필요하지는 않음
따라서 전달이 실패했을 때 재시도 로직만 작성하면 됨
구현
@Configuration
class RedisConfig(
@Value("\${spring.redis.host}") val host: String,
@Value("\${spring.redis.port}") val port: Int
): Loggable {
@Bean
fun lettuceConnectionFactory(): LettuceConnectionFactory {
return LettuceConnectionFactory(host, port)
}
/** Reactive RedisTemplate */
@Bean
@Primary
fun reactiveRedisTemplate(): ReactiveRedisTemplate<String, String> {
val context = RedisSerializationContext
.newSerializationContext<String, String>(StringRedisSerializer())
.value(StringRedisSerializer())
.build()
return ReactiveRedisTemplate(lettuceConnectionFactory(), context)
}
/*
* ReactiveRedisMessageListenerContainer : Spring Data Redis에서 제공하는 Reactive Redis pub/sub의 subscribe를 위한 메세지 리스너
* ⇒ 특정 채널에서 비동기적으로 메세지를 수신할 수 있음
*
* receiveMessages : 메세지 수신 처리가 일어나는 함수
* */
@Bean
fun listenerContainer(
// 자동으로 lettuceConnectionFactory Bean이 주입됨
connectionFactory: ReactiveRedisConnectionFactory
)
: ReactiveRedisMessageListenerContainer {
return ReactiveRedisMessageListenerContainer(connectionFactory)
}
} @Service
class RedisMessageListenerService(
private val redisMessageListenerContainer: ReactiveRedisMessageListenerContainer
): Loggable {
@PostConstruct
fun init() {
receiveMessages(redisMessageListenerContainer, ChannelTopic(CHANNEL_NAME))
}
fun receiveMessages(
messageListenerContainer: ReactiveRedisMessageListenerContainer,
channel: ChannelTopic
) {
val serializer = createChannelAndValueSerializer()
// 특정 채널을 구독하고
messageListenerContainer.receive(listOf(channel), serializer, serializer)
.flatMap {
val event = it.message
mono {
SseEventService.sink.tryEmitNext(QueueEventPayload(event))
}
}
.onErrorContinue { e, _ ->
log.warn { "redis subscribe error: ${e.message}" }
}
.subscribe()
}
private fun createChannelAndValueSerializer(): RedisSerializationContext.SerializationPair<String> {
return RedisSerializationContext.SerializationPair.fromSerializer(StringRedisSerializer())
}
} @Component
class RedisPublisher(
private val reactiveRedisTemplate: ReactiveRedisTemplate<String, String>
) {
fun publish(channel: String, message: String) {
// 특정 채널로 메세지를 전달 ( 이 채널을 구독 중인 클라이언트에게 전달할 수 있음 )
// .subscribe() 꼭 붙여야 함 !!!!!
reactiveRedisTemplate.convertAndSend(channel, message).subscribe()
}
}
[ **subscribe()을 꼭 붙여야 되는 이유 ]**
리액티브 프로그래밍에서 데이터 흐름이 지연 실행 되기 때문
Mono, Flux와 같은 Reactive 타입은 데이터 흐름을 정의만 할 뿐, 실제로 아무 작업도 수행하지 않음
작업이 시작되는 시점은 ".subscribe()"가 호출될 때 !
⇒ subscribe()`가 있어야만 내부 파이프라인이 실행되고, 네트워크 요청, Redis publish 등이 실제로 수행
// 이 시점에서는 publish 로직이 정의만 됨, 실제 Redis로는 아직 전달되지 않은 상태
reactiveRedisTemplate.convertAndSend(channel, message)
// 이게 호출되어야만 publish 동작이 실제로 실행되어 Redis 서버에 메시지가 발행됨
.subscribe() @Service
class KafkaConsumerService(
private val objectMapper: ObjectMapper,
private val redisPublisher: RedisPublisher
): Loggable {
@KafkaListener(topics = ["queueing-system"], groupId = "queue-event-group")
fun consume(message: String) {
try {
val messageDto: KafkaMessageDto = objectMapper.readValue(message, KafkaMessageDto::class.java)
log.info { "queueType : ${messageDto.queueType} , 실행" }
redisPublisher.publish(CHANNEL_NAME, messageDto.queueType)
// queueService.sink.tryEmitNext(QueueEventPayload(messageDto.queueType))
log.info{ "Kafka 이벤트 수신 - queueType: ${messageDto.queueType}" }
} catch (e: Exception) {
log.error{ "Kafka 메시지 consume 실패" }
}
}
}