재밌는 문제가 발생했다. 서비스를 개발하던 중 클라이언트 사이드에게 Noti를 주기 위해, SSE protocol를 사용했다. 그런데 기존에 잘 동작하던 Connection이 메시지를 정상적으로 송수신하지 못하는 상황이 벌어지는 게 아닌가?
원인은 금방 찾을 수 있었다. 기존 서버에는 클라이언트와 SSE Connenction 을 맺어주는 별도의 API가 존재하는데, sticky session이 필요한 원인과 같은 이유로 기존 단일 서버로 호스팅되던 걸, 2개의 서버로 분산처리하도록 ALB에 붙이자 클라이언트가 서버를 번갈아가면서 호출하다보니, 이전서버와 맺은 Connection과 뒤에 호출한 서버의 Emitter가 일치하지 않는 것이다. 다만 이 원인을 어떻게 해결할지 고민하는 데 시간이 걸렸다.
@GetMapping(value = ["/subscribe"], produces = ["text/event-stream"])
fun subscribe(
@AuthenticationPrincipal principalDetails: PrincipalDetail,
@RequestParam eventName: SseEmitterEvent.SseEventName,
): SseEmitter {
//eventname으로 판단
return sseService.subscribe(principalDetails, eventName)
}
fun subscribe(principalDetails: PrincipalDetail, eventName: SseEmitterEvent.SseEventName): SseEmitter {
// 리버스 프록시에서의 오동작을 방지
ServletUtil.getCurrentResponse()?.addHeader("X-Accel-Buffering", "no")
val emitterId = eventName.getEmitterId(principalDetails)
val emitterDto = SseEmitterEvent(
id = emitterId, eventName = eventName,
data = "EventStream Created. [id=$emitterId]"
)
val emitter = emitterRepository.save(emitterId, SseEmitter(60L * 1000 * 60))
// 503 에러를 방지하기 위한 더미 이벤트 전송
this.sendToClient(emitterDto)
return emitter
}
@Repository
class EmitterRepository {
private val log = KotlinLogging.logger { }
private val emitters = ConcurrentHashMap<String, SseEmitter>()
...
클라이언트 쪽 코드는 생략하겠다.
제일 먼저 든 생각은 ConcurrentHashMap 으로 단일 서버 내에서 관리하던 자원을 두 대의 서버가 공유하도록 하는 방향이었다. 외부애 저장소를 하나두고, SSeEmitter 를 거기다 저장시키는 것이다. 시도해보진 않았지만, 상식선에서 불가능할 것 같아 생각만 했다.
로드 밸런싱 환경에서 일관성 없는 Connection이 문제라면, 클라이언트에서 단일 서버로 Connection을 맺어주도록 api를 변경하고, 알람이 필요할 때, 내쪽 서버에서 이 서버로 요청을 날리고 이 단일 서버가 전송하도록 로직을 바꾸면, 문제가 쉽게 해결될 거라 생각했다. 만약 이 방향을 시도한다면 소소하게 바꿔야 하는 코드들이 많아지고, 클라이언트 쪽 개발자하고도 협의를 해야 한다. 무엇보다 관리해야 할 서버가 하나 더 늘어나게 된다. 약간 닭 잡자고, 소칼 쓰는 느낌이다.
이 방법이 가장 간단하고 공수가 적어보여서, 시도해보았다. 처음엔 ALB의 sticy session을 어떻게든 SSE용으로 활용할 수 있지 않을까 시도해보았지만 생각대로 잘 안 된다는 거로 결론..
여차저차해서 4번째 방안으로 넘어갔다. SSE Connection을 단일 서버로 제한해서 연결시키는 걸 배제하고, 그냥 로드밸런싱 중인 서버 모두에게 SSE Emitter를 생성하고 모두 Connection을 맺어주라고 알리는 걸로 퉁 치는걸로. 단순하면서도 효과적인 해결책이라 생각됐다.
여기서 문제는 메시징 서버를 어떤 걸 선택하냐는 것이었다. 대부분 나와 비슷한 문제를 겪고 있는 사람들의 글을 보면 Redis의 Pub/Sub 기능으로 SSE의 Scale Out을 해결했는데, 나는 이 방법을 그리 원하지 않았다. Redis는 지금 프로젝트에서 쓰고 있지 않은데, 이거 기능 하나를 위해서 도입하긴 좀 그렇다. 그것보다 본연의 메시징 기능에 집중한 메시징 솔루션을 선택하는 게 좋아보였다.
지금 프로젝트가 AWS 클라우드 위에 구축되어있으니, AWS 기반 메시징 솔루션을 선택하는 게 자연스러운 흐름 같아 보였다. 처음에는 AWS SQS를 사용하는 걸 생각해봤다. 하지만 SQS에 PUB/SUB (BroadCasting) 기능을 지원하지 않는다는 사실을 알게 되었다. 항상 느끼는 거지만 문제 해결의 구현 시간보다, 어떤 것이 최적의 해결책인지 고민하고, 시도해보고 알아보는 시간이 훨씬 오래 걸린다는 거다. 아래 링크에서 도움을 얻었다.
https://www.youtube.com/watch?v=Dn5irt7bClM&ab_channel=springcamp.io
고민하다, AWS SNS를 사용하기로 결정했다. 지금 상황에서 가장 간단해보이고, 시간이 별로 안 걸리는 해결책이라고 생각했기 때문이다. 그러나 막상 시간을 들여서 구현을 하고 보니, 몇 가지 문제점이 눈에 거슬려서 폐기하기로 했다. 첫 번째 문제는 각 개별 서버가 HTTPS 및 HTTP를 통해 공개적으로 액세스할 수 있어야 한다는 것이다. 지금 서버는 Publis Subnet 에 위치해 있기 때문에, 보안그룹에서 포트를 열어주는 걸로 해결을 할 수 있었지만, 아무래도 찜찜하다..
발행 주소 검증할 때는 webhook.site 가 유용하다..
https://choichumji.tistory.com/123
무엇보다 지금 당장은 로드밸런싱하는 서버가 2대 밖에 없지만, 나중에 서버가 늘어나다 보면, 계속 구독해줘야 하는 서버를 수동으로 늘려야 된다는 것도 부담이었다. 나는 최대한 코드베이스로 구현을 하고 싶었다.
결국 생각을 좀 한 이후에, 다른 메시징 솔루션을 검토해보기로 결정, AMAZON MQ 를 선택했다. Amazon MQ 는 두 가지 종류의 메시징 솔루션을 제공하는데, 나는 RabbitMQ를 선택했다. 별 다른 이유는 없고, 전에 한 번 써본 경험이 있어서이다. 다만 역시 무작정 구현하다보니 문제에 맞닥뜨리게 되었는데..
아 스트레스.. 위의 링크를 보면 알겠지만, RabbitMq는 여러개의 동일 큐에 대한 메시지 전송을 허락하지 않는다. 기본적으로 분배방식은 라운드로빈을 취하고 있으며, 다 같이 받고 싶으면, 별도의 큐이거나 대기열로 지정해줘야 된다. 그러면 서버마다 별도의 queue를 지정해주는 환경변수를 주입해서 해결할 수도 있겠지만..!!!! 나는 그러고 싶지 않다~~ 내가 원하는 방식대로 해결해야한다.
여차저차해서 임시방편 해결책?으로 AnonymousQueue를 활용하는 방안을 알아냈다.
대충 아래처럼 Temporary queues를 활용한 꼼수? 로 해결하는 것 같은데, 지금 내 상황에 적절한 솔루션인 것 같아 채택했다.
https://dirask.com/posts/Spring-Boot-2-broadcast-messages-to-each-application-instance-using-RabbitMQ-1398Vp
Free tier를 지원한다. VPC 세팅하고 간단하게 만들면 된다. 퍼블릭 엑세스 허용, 대충 20분정도 걸린다.
id("org.springframework.boot") version "2.7.6"
id("io.spring.dependency-management") version "1.0.15.RELEASE"
extra["ioCloudVersion"] = "2.4.2"
extra["springCloudVersion"] = "2021.0.5"
dependencyManagement {
imports {
mavenBom("io.awspring.cloud:spring-cloud-aws-dependencies:${property("ioCloudVersion")}")
mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
}
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-amqp")
testImplementation("org.springframework.amqp:spring-rabbit-test")
...
}
@Bean
@Primary
fun connectionFactory(): CachingConnectionFactory {
//RabbitMqConnection
//https://medium.com/tradeshift-engineering/spring-rabbitmq-tuning-f94723598312
//https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/getting-started-rabbitmq.html
val connectionFactory = ConnectionFactory()
//connectionFactory.setAddresses(awsProperty.rabbitmq.host)
connectionFactory.host = awsProperty.rabbitmq.host
connectionFactory.username = awsProperty.rabbitmq.username
connectionFactory.setPassword(awsProperty.rabbitmq.password)
connectionFactory.port = awsProperty.rabbitmq.port
connectionFactory.useSslProtocol()
return CachingConnectionFactory(connectionFactory)
}
@Bean
fun progressQueue(): Queue {
return AnonymousQueue()
}
@Bean
fun broadcastExchange(): FanoutExchange {
return FanoutExchange(fanoutExchangeName)
}
@Bean
fun bindingDefault(progressQueue: Queue, exchange: FanoutExchange): Binding {
return BindingBuilder
.bind(progressQueue)
.to(exchange)
//.with(routingKey)
}
//@Bean
fun bindingAll(queueAll: Queue, exchange: TopicExchange): Binding {
//바인딩 된 Queue 중 Routing key와 매칭되는 Queue로만 메시지를 보냄
return BindingBuilder
.bind(queueAll)
.to(exchange)
.with("routing.*")
}
@Bean
fun rabbitTemplate(connectionFactory: ConnectionFactory, messageConverter: MessageConverter): RabbitTemplate {
val rabbitTemplate = RabbitTemplate(connectionFactory)
rabbitTemplate.messageConverter = messageConverter
//rabbitTemplate.defaultReceiveQueue =
//rabbitTemplate.setReplyAddress(queue().getName());
//rabbitTemplate.setReplyTimeout(replyTimeout);
//rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate
}
@Bean
fun jackson2JsonMessageConverter(mapper: ObjectMapper): MessageConverter {
return Jackson2JsonMessageConverter(mapper)
}
@Bean
fun rabbitListenerContainerFactory(connectionFactory: ConnectionFactory, messageConverter: MessageConverter): SimpleRabbitListenerContainerFactory {
val factory =
SimpleRabbitListenerContainerFactory()
factory.setConnectionFactory(connectionFactory)
factory.setMessageConverter(messageConverter)
factory.setConcurrentConsumers(10)
factory.setMaxConcurrentConsumers(10)
factory.setErrorHandler(errorHandler())
return factory
}
fun amqpAdmin(connectionFactory: ConnectionFactory): AmqpAdmin {
return RabbitAdmin(connectionFactory)
}
fun errorHandler(): ErrorHandler {
return ConditionalRejectingErrorHandler(MyFatalExceptionStrategy())
}
class MyFatalExceptionStrategy() : DefaultExceptionStrategy() {
override fun isFatal(t: Throwable): Boolean {
if (t is ListenerExecutionFailedException) {
val lefe = t
logger.error(
"Failed to process inbound message from queue "
+ lefe.failedMessage.messageProperties.consumerQueue
+ "; failed message: " + lefe.failedMessage, t
)
}
return super.isFatal(t)
}
}
@GetMapping(value = ["/subscribe"], produces = ["text/event-stream"])
fun subscribe(
@AuthenticationPrincipal principalDetails: PrincipalDetail?,
@RequestParam eventName: SseEmitterEvent.SseEventName,
) : SseEmitter {
return rabbitMqProducer.notifyToSubscribe(principalDetails, eventName)
}
fun notifyToSubscribe(principalDetails: PrincipalDetail?, eventName: SseEmitterEvent.SseEventName): SseEmitter {
ServletUtil.getCurrentResponse()?.addHeader("X-Accel-Buffering", "no")
val emitterId =
eventName.getEmitterId(principalDetails)
val emitter = emitterRepository.save(emitterId, SseEmitter(60L * 1000 * 60))
rabbitTemplate.convertAndSend(exchange.name, "",
RabbitMqPayloadDto<Any>(emitterId, eventName, RabbitMqPayloadType.SUBSCRIBE))
return emitter
}
@RabbitListener(queues = ["#{progressQueue.name}"])
fun receiveProgressQueue(payloadDto: RabbitMqPayloadDto<Any?>){
log.info { "Message received from rabbit mq => $payloadDto" }
try {
when (payloadDto.type) {
RabbitMqPayloadType.INCREASE -> {
val event =
mapper.convertValue(payloadDto.data!!, ProgressEvent::class.java)
eventPublisher.publishEvent(event)
}
RabbitMqPayloadType.SUBSCRIBE -> {
eventPublisher.publishEvent(payloadDto)
}
RabbitMqPayloadType.CLOSE -> {
val event =
mapper.convertValue(payloadDto.data, SseEmitterEvent::class.java)
eventPublisher.publishEvent(event)
}
}
}catch (e:Exception){
log.error { e.stackTraceToString() }
}
}
SSE 관련 이벤트가 있을 때는 먼저 RabbitMqProducer에게 알리고, Consumer 쪽이 메시지를 받으면, 해당 이벤트들을 타입에 따라 전파하는 방향으로 바꿨다.
https://stackoverflow.com/questions/68378010/message-all-instances-behind-load-balancer
https://stackoverflow.com/questions/33471766/node-js-server-sent-events-with-load-balancer
https://stackoverflow.com/questions/54427980/aws-load-balancer-for-server-sent-events-or-websockets
https://smjeon.dev/web/sticky-session/
https://www.baeldung.com/aws-queues-java
https://devocean.sk.com/blog/techBoardDetail.do?ID=163301
https://uchupura.tistory.com/109
https://www.daddyprogrammer.org/post/14825/spring-cloud-messaging-sqs/
https://seungpnag.tistory.com/9
https://highjune.dev/springboot/sqs_fifo/
https://flowlog.tistory.com/87
https://www.rabbitmq.com/tutorials/tutorial-three-spring-amqp.html
https://brunch.co.kr/@springboot/298
https://www.baeldung.com/java-rabbitmq-exchanges-queues-bindings