ActiveMQ와 Spring Boot 실전 코드와 성능 최적화

1. ActiveMQ

이전 포스팅에서 다룬 ActiveMQ의 기본 개념을 바탕으로, 이번에는 실제 프로덕션 환경에서 ActiveMQ를 활용하는 심화 내용을 다루겠습니다.

특히 대규모 트래픽 처리, 장애 복구, 모니터링 등 실무에서 마주할 수 있는 다양한 상황들을 중심으로 설명하겠습니다.

2. ActiveMQ 고급 기능 활용

2.1 메시지 지속성 구현

ActiveMQ에서 중요한 메시지의 유실을 방지하기 위한 지속성(Persistence) 구현 방법을 살펴보겠습니다.

import org.apache.activemq.ActiveMQConnectionFactory
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.core.JmsTemplate

@Configuration
class ActiveMQConfig {

    @Bean
    fun connectionFactory(): ActiveMQConnectionFactory {
        return ActiveMQConnectionFactory("tcp://localhost:61616").apply {
            isPersistenceEnabled = true
        }
    }

    @Bean
    fun jmsTemplate(): JmsTemplate {
        return JmsTemplate().apply {
            connectionFactory = connectionFactory()
            isDeliveryPersistent = true
        }
    }
}

2.2 클러스터링 구성

고가용성을 위한 ActiveMQ 클러스터 구성 코드입니다.

# application.yml
spring:
  activemq:
    broker-url: failover:(tcp://broker1:61616,tcp://broker2:61616)?randomize=false
    user: admin
    password: admin
    pool:
      enabled: true
      max-connections: 10

3. 실전 메시징 패턴 구현

3.1 발행-구독(Pub/Sub) 패턴 구현

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.jms.annotation.JmsListener
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Service

@Service
class OrderEventPublisher @Autowired constructor(
    private val jmsTemplate: JmsTemplate
) {

    fun publishOrderEvent(event: OrderEvent) {
        jmsTemplate.convertAndSend("order.topic", event) { message ->
            message.setStringProperty("eventType", event.type)
            message.jmsExpiration = System.currentTimeMillis() + 3600000 // 1시간
            message
        }
    }
}

@Service
class OrderEventSubscriber {

    @JmsListener(destination = "order.topic", selector = "eventType = 'CREATED'")
    fun handleOrderCreated(event: OrderEvent) {
        // 주문 생성 이벤트 처리 로직
        log.info("New order created: {}", event.orderId)
    }

    @JmsListener(destination = "order.topic", selector = "eventType = 'UPDATED'")
    fun handleOrderUpdated(event: OrderEvent) {
        // 주문 수정 이벤트 처리 로직
        log.info("Order updated: {}", event.orderId)
    }
}

3.2 Dead Letter Queue 처리

에러 발생 시 메시지 처리를 위한 DLQ 구현입니다.

import org.apache.activemq.command.ActiveMQQueue
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Service
import javax.jms.JMSException
import javax.jms.Message

@Configuration
class DeadLetterConfig {

    @Bean
    fun orderDLQ(): ActiveMQQueue {
        return ActiveMQQueue("DLQ.order")
    }

    @Service
    class DeadLetterProcessor {

        @JmsListener(destination = "DLQ.order")
        fun processFailedMessages(message: Message) {
            try {
                // 실패한 메시지 재처리 로직
                val messageId = message.jmsMessageID
                log.error("Processing failed message: {}", messageId)
                // 실패 원인 분석 및 처리
            } catch (e: JMSException) {
                log.error("Error processing DLQ message", e)
            }
        }
    }
}

4. 성능 최적화와 모니터링

4.1 성능 모니터링 구현

import io.micrometer.core.instrument.MeterRegistry
import org.apache.activemq.command.ActiveMQQueue
import org.apache.activemq.management.QueueViewMBean
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component

@Component
class ActiveMQMonitor @Autowired constructor(
    private val registry: MeterRegistry
) {

    @Scheduled(fixedRate = 60000) // 1분마다 실행
    fun monitorQueueMetrics() {
        try {
            val queue = ActiveMQQueue("order.queue")
            val queueView: QueueViewMBean = getQueueViewMBean(queue)
            
            // 큐 깊이 모니터링
            registry.gauge("activemq.queue.depth", queueView.queueSize)
            
            // 메시지 처리율 모니터링
            registry.gauge("activemq.queue.enqueue.rate", queueView.enqueueCount)
            
            // 컨슈머 수 모니터링
            registry.gauge("activemq.queue.consumer.count", queueView.consumerCount)
        } catch (e: Exception) {
            log.error("Error monitoring ActiveMQ metrics", e)
        }
    }
}

4.2 성능 최적화 설정

import org.apache.activemq.ActiveMQConnectionFactory
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
class ActiveMQOptimizedConfig {

    @Bean
    fun connectionFactory(): ActiveMQConnectionFactory {
        return ActiveMQConnectionFactory().apply {
            // 연결 풀 설정
            isUseAsyncSend = true
            isOptimizeAcknowledge = true

            // 메모리 사용 최적화
            isOptimizedMessageDispatch = true
            isAlwaysSessionAsync = false

            // 압축 설정
            isUseCompression = true
        }
    }
}

5. 장애 대응과 운영 전략

5.1 재시도 정책 구현

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.connection.ConnectionFactory
import org.springframework.jms.listener.DefaultJmsListenerContainerFactory
import org.springframework.util.backoff.ExponentialBackOff
import org.springframework.amqp.AmqpRejectAndDontRequeueException

@Configuration
class RetryConfig {

    @Bean
    fun jmsListenerContainerFactory(connectionFactory: ConnectionFactory): DefaultJmsListenerContainerFactory {
        return DefaultJmsListenerContainerFactory().apply {
            setConnectionFactory(connectionFactory)

            // 재시도 정책 설정
            setErrorHandler { t ->
                log.error("Failed to process message", t)
                throw AmqpRejectAndDontRequeueException("Failed to process message", t)
            }

            setBackOff(ExponentialBackOff(1000L, 2.0)) // 초기 대기 시간, 배수 설정
        }
    }
}

5.2 서킷브레이커 패턴 구현

import io.github.resilience4j.circuitbreaker.CircuitBreaker
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Service
import java.time.Duration

@Service
class OrderMessageProcessor {

    private val circuitBreaker: CircuitBreaker

    init {
        val config = CircuitBreakerConfig.custom()
            .failureRateThreshold(50)
            .waitDurationInOpenState(Duration.ofMinutes(1))
            .permittedNumberOfCallsInHalfOpenState(3)
            .slidingWindowSize(10)
            .build()

        circuitBreaker = CircuitBreaker.of("orderProcessor", config)
    }

    @JmsListener(destination = "order.queue")
    fun processOrder(message: OrderMessage) {
        circuitBreaker.executeRunnable {
            // 주문 처리 로직
            processOrderWithRetry(message)
        }
    }
}

결론

이번 포스팅에서는 ActiveMQ의 고급 기능들과 실제 운영 환경에서 필요한 다양한 패턴들을 살펴보았습니다.

특히 메시지 지속성, 클러스터링, 성능 모니터링, 장애 대응 등 실무에서 적용할 수 있는 구체적인코드들을 다뤘습니다.

실제 환경에서는 이러한 패턴들을 상황에 맞게 조합하고 최적화하여 사용하는 것이 중요합니다.

또한, 지속적인 모니터링과 성능 최적화를 통해 안정적인 메시징 시스템을 구축하고 운영할 수 있습니다.

다음 포스팅에서는 이러한 패턴들을 실제 마이크로서비스 아키텍처에 적용하는 방법과 대규모 트래픽 처리를 위한 추가적인 최적화 기법들을 다루도록 하겠습니다.

profile
꾸준히, 의미있는 사이드 프로젝트 경험과 문제해결 과정을 기록하기 위한 공간입니다.

0개의 댓글