이전 포스팅에서 다룬 ActiveMQ의 기본 개념을 바탕으로, 이번에는 실제 프로덕션 환경에서 ActiveMQ를 활용하는 심화 내용을 다루겠습니다.
특히 대규모 트래픽 처리, 장애 복구, 모니터링 등 실무에서 마주할 수 있는 다양한 상황들을 중심으로 설명하겠습니다.
ActiveMQ에서 중요한 메시지의 유실을 방지하기 위한 지속성(Persistence) 구현 방법을 살펴보겠습니다.
import org.apache.activemq.ActiveMQConnectionFactory
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.core.JmsTemplate
@Configuration
class ActiveMQConfig {
@Bean
fun connectionFactory(): ActiveMQConnectionFactory {
return ActiveMQConnectionFactory("tcp://localhost:61616").apply {
isPersistenceEnabled = true
}
}
@Bean
fun jmsTemplate(): JmsTemplate {
return JmsTemplate().apply {
connectionFactory = connectionFactory()
isDeliveryPersistent = true
}
}
}
고가용성을 위한 ActiveMQ 클러스터 구성 코드입니다.
# application.yml
spring:
activemq:
broker-url: failover:(tcp://broker1:61616,tcp://broker2:61616)?randomize=false
user: admin
password: admin
pool:
enabled: true
max-connections: 10
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.jms.annotation.JmsListener
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Service
@Service
class OrderEventPublisher @Autowired constructor(
private val jmsTemplate: JmsTemplate
) {
fun publishOrderEvent(event: OrderEvent) {
jmsTemplate.convertAndSend("order.topic", event) { message ->
message.setStringProperty("eventType", event.type)
message.jmsExpiration = System.currentTimeMillis() + 3600000 // 1시간
message
}
}
}
@Service
class OrderEventSubscriber {
@JmsListener(destination = "order.topic", selector = "eventType = 'CREATED'")
fun handleOrderCreated(event: OrderEvent) {
// 주문 생성 이벤트 처리 로직
log.info("New order created: {}", event.orderId)
}
@JmsListener(destination = "order.topic", selector = "eventType = 'UPDATED'")
fun handleOrderUpdated(event: OrderEvent) {
// 주문 수정 이벤트 처리 로직
log.info("Order updated: {}", event.orderId)
}
}
에러 발생 시 메시지 처리를 위한 DLQ 구현입니다.
import org.apache.activemq.command.ActiveMQQueue
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Service
import javax.jms.JMSException
import javax.jms.Message
@Configuration
class DeadLetterConfig {
@Bean
fun orderDLQ(): ActiveMQQueue {
return ActiveMQQueue("DLQ.order")
}
@Service
class DeadLetterProcessor {
@JmsListener(destination = "DLQ.order")
fun processFailedMessages(message: Message) {
try {
// 실패한 메시지 재처리 로직
val messageId = message.jmsMessageID
log.error("Processing failed message: {}", messageId)
// 실패 원인 분석 및 처리
} catch (e: JMSException) {
log.error("Error processing DLQ message", e)
}
}
}
}
import io.micrometer.core.instrument.MeterRegistry
import org.apache.activemq.command.ActiveMQQueue
import org.apache.activemq.management.QueueViewMBean
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
@Component
class ActiveMQMonitor @Autowired constructor(
private val registry: MeterRegistry
) {
@Scheduled(fixedRate = 60000) // 1분마다 실행
fun monitorQueueMetrics() {
try {
val queue = ActiveMQQueue("order.queue")
val queueView: QueueViewMBean = getQueueViewMBean(queue)
// 큐 깊이 모니터링
registry.gauge("activemq.queue.depth", queueView.queueSize)
// 메시지 처리율 모니터링
registry.gauge("activemq.queue.enqueue.rate", queueView.enqueueCount)
// 컨슈머 수 모니터링
registry.gauge("activemq.queue.consumer.count", queueView.consumerCount)
} catch (e: Exception) {
log.error("Error monitoring ActiveMQ metrics", e)
}
}
}
import org.apache.activemq.ActiveMQConnectionFactory
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class ActiveMQOptimizedConfig {
@Bean
fun connectionFactory(): ActiveMQConnectionFactory {
return ActiveMQConnectionFactory().apply {
// 연결 풀 설정
isUseAsyncSend = true
isOptimizeAcknowledge = true
// 메모리 사용 최적화
isOptimizedMessageDispatch = true
isAlwaysSessionAsync = false
// 압축 설정
isUseCompression = true
}
}
}
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.connection.ConnectionFactory
import org.springframework.jms.listener.DefaultJmsListenerContainerFactory
import org.springframework.util.backoff.ExponentialBackOff
import org.springframework.amqp.AmqpRejectAndDontRequeueException
@Configuration
class RetryConfig {
@Bean
fun jmsListenerContainerFactory(connectionFactory: ConnectionFactory): DefaultJmsListenerContainerFactory {
return DefaultJmsListenerContainerFactory().apply {
setConnectionFactory(connectionFactory)
// 재시도 정책 설정
setErrorHandler { t ->
log.error("Failed to process message", t)
throw AmqpRejectAndDontRequeueException("Failed to process message", t)
}
setBackOff(ExponentialBackOff(1000L, 2.0)) // 초기 대기 시간, 배수 설정
}
}
}
import io.github.resilience4j.circuitbreaker.CircuitBreaker
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Service
import java.time.Duration
@Service
class OrderMessageProcessor {
private val circuitBreaker: CircuitBreaker
init {
val config = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.waitDurationInOpenState(Duration.ofMinutes(1))
.permittedNumberOfCallsInHalfOpenState(3)
.slidingWindowSize(10)
.build()
circuitBreaker = CircuitBreaker.of("orderProcessor", config)
}
@JmsListener(destination = "order.queue")
fun processOrder(message: OrderMessage) {
circuitBreaker.executeRunnable {
// 주문 처리 로직
processOrderWithRetry(message)
}
}
}
이번 포스팅에서는 ActiveMQ의 고급 기능들과 실제 운영 환경에서 필요한 다양한 패턴들을 살펴보았습니다.
특히 메시지 지속성, 클러스터링, 성능 모니터링, 장애 대응 등 실무에서 적용할 수 있는 구체적인코드들을 다뤘습니다.
실제 환경에서는 이러한 패턴들을 상황에 맞게 조합하고 최적화하여 사용하는 것이 중요합니다.
또한, 지속적인 모니터링과 성능 최적화를 통해 안정적인 메시징 시스템을 구축하고 운영할 수 있습니다.
다음 포스팅에서는 이러한 패턴들을 실제 마이크로서비스 아키텍처에 적용하는 방법과 대규모 트래픽 처리를 위한 추가적인 최적화 기법들을 다루도록 하겠습니다.