[서버] SpringBoot 을 이용한 ActiveMQ 구축하기

최동근·2023년 8월 13일
1

메세지 큐

목록 보기
1/4
post-thumbnail

이번 포스팅에서는 메세지 큐(Message Queue) 을 알아보자 에 이어서실제로 ActiveMQSpring boot 에 적용해보겠습니다 🔥

0. 들어가기 앞서

Spring 에 ActiveMQ 을 구축하기 앞서 정의와 필요한 개념을 먼저 살펴보겠습니다 👨‍💻
ActiveMQJMS(Java Message Service) 을 기반으로 통신을 제어하는 메세지 큐입니다.
또 다른 메세지 브로커인 RabbitMQAMQP 기반으로 동작하는 것과 차이가 있으며, 다른 프로토콜을 사용하기 때문에 두 메세지 브로커간 메세지 교환은 불가능합니다 🚫

[ActiveMQ 용어 정리]

  • Message Broker : 메세지를 목적지로 건네주는 중개자
  • Destination : 메세지를 수신받을 주소지(Consumer 의 주소)
  • Queue : 큐의 메세지를 Consumer 간 경쟁하여 가져가는 모델
  • Topic : Topic 에 메세지를 발행 및 구독하여 메세지를 교환하는 모델, Topic 구독자는 모두 동일하게 메세지를 가져감

메세지 큐(Message Queue) 을 알아보자 포스팅에서도 살펴보았지만 요즘 가장 핫한 이벤트 브로커인 Kafka 는 대용량 분산 시스템 용으로 많이 사용되며, 그 외에 경우에는 ActiveMQRabbitMQ 을 많이 사용합니다.

Spring 애플리케이션과 함께 간단하게 ActiveMQ 을 적용해보면서 사용법에 대한 더 깊은 이해를 해보겠습니다❗️


1. Spring boot ActiveMQ 설정하기

build.gradle에 ActiveMQ 의존성 추가하기

implementation 'org.springframework.boot:spring-boot-starter-activemq'

application.yml 설정값 추가하기

spring:
  activemq:
    broker-url: tcp://localhost:61616 // ActiveMQ 메세지 브로커 서버 포트
    user: admin // ActiveMQ 웹 관리 콘솔 아이디
    password: admin // ActiveMQ 웹 관리 콘솔 비밀번호

activemq:
  queue:
    name: sample-queue // 사용할 Queue 이름

2. docker 을 이용한 ActiveMQ 서버 구동하기

docker run -p 61616:61616 -p 8161:8161 rmohr/activemqdocker

포트 61616는 ActiveMQ 브로커(Broker) 연결, 포트 8161는ActiveMQ 웹 관리 콘솔에 사용됩니다.
결국, ActiveMQ 또한 하나의 서버상에서 구동되는 시스템이기 때문에 이와 같이 Docker 을 이용해서 구축해줍니다.

해당 이미지는 실제 ActiveMQ 웹 관리 콘솔창 이미지입니다.
application.yml 에서 설정한 Queue(Sample-Queue) 가 정상적으로 동작하고 있는 것을 확인할 수 있습니다👨‍💻
뿐만 아니라, 해당 콘솔을 통해 ActiveMQProducer,Consumer,QueueName 등 다양한 정보를 편리하게 확인할 수 있습니다.


3. ActiveMqConfig.class

AcitveMq 의 정상적인 동작을 위해 설정 클래스를 만듭니다.

// ActiveMqConfig.class
@RequiredArgsConstructor
@Configuration
public class ActiveMqConfig {

    @Value("${activemq.queue.name}")
    private String queueName;

	// spring.activeMq prefix 의 설정값을 가지는 클래스
    private final ActiveMQProperties activeMQProperties;

    /**
     * 지정된 queue 이름으로 Queue 빈을 생성
    **/
    @Bean
    public Queue queue() {
        System.out.println(activeMQProperties.getBrokerUrl());
        System.out.println(activeMQProperties.getUser());
        System.out.println(activeMQProperties.getPassword());
        return new ActiveMQQueue(queueName);
    }

    /**
     * activeMQ 는  61616 포트로 구동 중이다.
     * Spring application 에서 해당 서버로 접근해야 한다. ActiveConnectionFactory 로 연결
     */

    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL(activeMQProperties.getBrokerUrl());
        activeMQConnectionFactory.setUserName(activeMQProperties.getUser());
        activeMQConnectionFactory.setPassword(activeMQProperties.getPassword());
        return activeMQConnectionFactory;
    }

    /**
     * JmsTemplate 은 연결 후 실제 작업을 하기 위한 template
     */
    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(activeMQConnectionFactory());
        jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
        jmsTemplate.setExplicitQosEnabled(true);    // 메시지 전송 시 QOS을 설정
        jmsTemplate.setDeliveryPersistent(false);   // 메시지의 영속성을 설정
        jmsTemplate.setReceiveTimeout(1000 * 3);    // 메시지를 수신하는 동안의 대기 시간을 설정(3초)
        jmsTemplate.setTimeToLive(1000 * 60 * 30);  // 메시지의 유효 기간을 설정(30분)
        return jmsTemplate;
    }

    /**
     * JmsListenerContainerFactory 을 위한 빈을 생성
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(activeMQConnectionFactory());
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }
    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_typeId");
        Map<String, Class<?>> typeIdMappings = new HashMap<>();
        typeIdMappings.put("message", MessageDto.class);
        converter.setTypeIdMappings(typeIdMappings);
        return converter;
    }
}

해당 클래스를 하나씩 분해해 보겠습니다 ❗️

step 01

@Value("${activemq.queue.name}")
private String queueName;

// spring.activeMq prefix 의 설정값을 가지는 클래스
private final ActiveMQProperties activeMQProperties;

@Value 을 통해 application.yml 파일의 설정한 QueueName 값을 가져옵니다.
또한 ActiveMq 라이브러리를 주입받으면 자동으로 ActiveMQProperties Bean 이 생성됩니다.

// ActiveMQ Class 내부
ConfigurationProperties(prefix = "spring.activemq")
public class ActiveMQProperties {

	/**
	 * URL of the ActiveMQ broker. Auto-generated by default.
	 */
	private String brokerUrl;

	/**
	 * Whether the default broker URL should be in memory. Ignored if an explicit broker
	 * has been specified.
	 */
	private boolean inMemory = true;

	/**
	 * Login user of the broker.
	 */
	private String user;

	/**
	 * Login password of the broker.
	 */
	private String password;

	/**
	 * Time to wait before considering a close complete.
	 */
	private Duration closeTimeout = Duration.ofSeconds(15);

	/**
	 * Whether to stop message delivery before re-delivering messages from a rolled back
	 * transaction. This implies that message order is not preserved when this is enabled.
	 */
	private boolean nonBlockingRedelivery = false;

	/**
	 * Time to wait on message sends for a response. Set it to 0 to wait forever.
	 */
	private Duration sendTimeout = Duration.ofMillis(0);

@ConfigurationProperties 어노테이션을 통해 application.ymlspring.activemq 을 prefix 로 하는 설정값들을 가지는 클래스가 만들어지는 것을 확인할 수 있습니다.

추가적으로 @ConfigurationProperties 대상이 되는 클래스를 사용하기 위해서는 @EnableConfigurationProperties 을 꼭 선언해야합니다.

@EnableConfigurationProperties(ActiveMQProperties.class) // 필수 ❗️
@SpringBootApplication
public class RedisApplication {

    public static void main(String[] args) {
        SpringApplication.run(RedisApplication.class,args);
    }
}

step 02


	/**
     * activeMQ 는  61616 포트로 구동 중이다.
     * Spring application 에서 해당 서버로 접근해야 한다. ActiveConnectionFactory 로 연결
     */

    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL(activeMQProperties.getBrokerUrl());
        activeMQConnectionFactory.setUserName(activeMQProperties.getUser());
        activeMQConnectionFactory.setPassword(activeMQProperties.getPassword());
        return activeMQConnectionFactory;
    }

    /**
     * JmsTemplate 은 연결 후 실제 작업을 하기 위한 template
     */
    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(activeMQConnectionFactory());
        jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
        jmsTemplate.setExplicitQosEnabled(true);    // 메시지 전송 시 QOS을 설정
        jmsTemplate.setDeliveryPersistent(false);   // 메시지의 영속성을 설정
        jmsTemplate.setReceiveTimeout(1000 * 3);    // 메시지를 수신하는 동안의 대기 시간을 설정(3초)
        jmsTemplate.setTimeToLive(1000 * 60 * 30);  // 메시지의 유효 기간을 설정(30분)
        return jmsTemplate;
    }

앞서 도커로 띄운 61616 포트의 ActiveMQ 브로커는 ActiveMQConnectionFactory 을 통해 연결을 합니다.
연결을 했으면 ActiveMQ 을 통해 실제 메세지 송/수신을 가능하게 하는 것이 필요하겠죠?
ActiveMQJMS 기반으로 동작하기 때문에 JMS 와 관련된 template 을 제공합니다.
JmsTemplate 이 해당 역할을 수행합니다 ❗️

step 03

    /**
     * JmsListenerContainerFactory 을 위한 빈을 생성
     *
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(activeMQConnectionFactory());
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }
    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_typeId");
        Map<String, Class<?>> typeIdMappings = new HashMap<>();
        typeIdMappings.put("message", MessageDto.class);
        converter.setTypeIdMappings(typeIdMappings);
        return converter;
    }

JmsListenerContainerFactory Bean 은 해당 메세지를 소비하는 Listener 에 대한 클래스라고 생각하면 됩니다.

다른 서버끼리의 통신을 통해 메세지를 주고받을 때 메세지를 변환(직렬화)시켜주는 MessageConverter 가 필요하겠죠?
여기서는 많이 사용되는 MappingJackson2MessageConverter 을 사용하고자 합니다.

Spring 에서는 다양한 Message Converter 을 제공하는데 [Cache] Redis 직렬화 방법에 대해서 을 참고해주세요❗️


4. MessageDto

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class MessageDto {

    private String title;
    private String content;
}

MessageDtoProducerActiveMQ 을 통해 Consumer 에게 발행하게될 메세지를 나타내는 클래스입니다.


4. MessageService

실제 메세지의 전달 및 수신에 대한 로직이 있는 클래스입니다.

@Slf4j
@RequiredArgsConstructor
@Service
public class MessageService {

    @Value("${activemq.queue.name}")
    private String queueName;

    // jmsTemplate 을 통해 메세지 송신 가능
    private final JmsTemplate jmsTemplate;

    /**
     * Queue 로 메세지를 발행
     * messageDto -> Producer 가 Queue 발행한 메세지 Class
     */
    public void sendMessage(MessageDto messageDto) {
        log.info("message sent : {}", messageDto.toString());
        // queueName(Sample-queue) 에 메세지 전송
        jmsTemplate.convertAndSend(queueName,messageDto);
    }

    @JmsListener(destination = "${activemq.queue.name}")
    public void receiveMessage(MessageDto messageDto) {
        log.info("Received message : {}",messageDto.toString());
    }
}

저는 간단한 실습을 위해 하나의 서버에 ProducerConsumer 을 구현했습니다.

sendMessage 메소드는 Producer 의 로직을 가집니다.
Controller 에서 MessageDto 을 받아서 jmsTemplate 의 convertAndSend 메소드를 통해 앞서 만든 sample-queue 에 메세지를 전송합니다 👨‍💻

이렇게 메세지를 보낸 후 receiveMessage 메소드에서 수신을 할 것인데
이때 라이브러리에서 제공하는 @JmsListener 어노테이션을 통해 destination 을 지정해줍니다.
이를 통해 해당 메소드는 sample-queue 로 들어오는 메세지를 수신할 수 있습니다.


5. MessageController

@Slf4j
@RequiredArgsConstructor
@RestController
public class MessageController {

    private final MessageService messageService;

    @PostMapping("/send/message")
    public ResponseEntity<String> sendMessage(@RequestBody MessageDto messageDto) {
        this.messageService.sendMessage(messageDto);
        return ResponseEntity.ok("Message send to ActiveMQ!");
    }
}

MessageController 을 만들어서 Postman 을 이용해 테스트 해보겠습니다❗️


6. TEST

정상적으로 테스트가 진행된 것을 확인할 수 있습니다 👏


참고

[ActiveMQ] ActiveMQ 란? (feat. 여러 MQ 정리) JMS, MOM
Springboot에서 ActiveMQ로 메세시 송수신하기
[Spring Boot] ActiveMQ 연동하기

profile
비즈니스가치를추구하는개발자

1개의 댓글

comment-user-thumbnail
2023년 12월 6일

좋은 글 감사합니다~

답글 달기