이번 포스팅에서는 메세지 큐(Message Queue) 을 알아보자 에 이어서실제로 ActiveMQ
을 Spring boot
에 적용해보겠습니다 🔥
Spring 에 ActiveMQ
을 구축하기 앞서 정의와 필요한 개념을 먼저 살펴보겠습니다 👨💻
ActiveMQ
는 JMS(Java Message Service)
을 기반으로 통신을 제어하는 메세지 큐입니다.
또 다른 메세지 브로커인 RabbitMQ
는 AMQP
기반으로 동작하는 것과 차이가 있으며, 다른 프로토콜을 사용하기 때문에 두 메세지 브로커간 메세지 교환은 불가능합니다 🚫
[ActiveMQ 용어 정리]
메세지 큐(Message Queue) 을 알아보자 포스팅에서도 살펴보았지만 요즘 가장 핫한 이벤트 브로커인 Kafka
는 대용량 분산 시스템 용으로 많이 사용되며, 그 외에 경우에는 ActiveMQ
와 RabbitMQ
을 많이 사용합니다.
Spring 애플리케이션과 함께 간단하게 ActiveMQ
을 적용해보면서 사용법에 대한 더 깊은 이해를 해보겠습니다❗️
implementation 'org.springframework.boot:spring-boot-starter-activemq'
spring:
activemq:
broker-url: tcp://localhost:61616 // ActiveMQ 메세지 브로커 서버 포트
user: admin // ActiveMQ 웹 관리 콘솔 아이디
password: admin // ActiveMQ 웹 관리 콘솔 비밀번호
activemq:
queue:
name: sample-queue // 사용할 Queue 이름
docker run -p 61616:61616 -p 8161:8161 rmohr/activemqdocker
포트 61616
는 ActiveMQ 브로커(Broker) 연결, 포트 8161
는ActiveMQ 웹 관리 콘솔에 사용됩니다.
결국, ActiveMQ
또한 하나의 서버상에서 구동되는 시스템이기 때문에 이와 같이 Docker 을 이용해서 구축해줍니다.
해당 이미지는 실제 ActiveMQ
웹 관리 콘솔창 이미지입니다.
application.yml
에서 설정한 Queue(Sample-Queue) 가 정상적으로 동작하고 있는 것을 확인할 수 있습니다👨💻
뿐만 아니라, 해당 콘솔을 통해 ActiveMQ
의 Producer
,Consumer
,QueueName
등 다양한 정보를 편리하게 확인할 수 있습니다.
AcitveMq
의 정상적인 동작을 위해 설정 클래스를 만듭니다.
// ActiveMqConfig.class
@RequiredArgsConstructor
@Configuration
public class ActiveMqConfig {
@Value("${activemq.queue.name}")
private String queueName;
// spring.activeMq prefix 의 설정값을 가지는 클래스
private final ActiveMQProperties activeMQProperties;
/**
* 지정된 queue 이름으로 Queue 빈을 생성
**/
@Bean
public Queue queue() {
System.out.println(activeMQProperties.getBrokerUrl());
System.out.println(activeMQProperties.getUser());
System.out.println(activeMQProperties.getPassword());
return new ActiveMQQueue(queueName);
}
/**
* activeMQ 는 61616 포트로 구동 중이다.
* Spring application 에서 해당 서버로 접근해야 한다. ActiveConnectionFactory 로 연결
*/
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(activeMQProperties.getBrokerUrl());
activeMQConnectionFactory.setUserName(activeMQProperties.getUser());
activeMQConnectionFactory.setPassword(activeMQProperties.getPassword());
return activeMQConnectionFactory;
}
/**
* JmsTemplate 은 연결 후 실제 작업을 하기 위한 template
*/
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(activeMQConnectionFactory());
jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
jmsTemplate.setExplicitQosEnabled(true); // 메시지 전송 시 QOS을 설정
jmsTemplate.setDeliveryPersistent(false); // 메시지의 영속성을 설정
jmsTemplate.setReceiveTimeout(1000 * 3); // 메시지를 수신하는 동안의 대기 시간을 설정(3초)
jmsTemplate.setTimeToLive(1000 * 60 * 30); // 메시지의 유효 기간을 설정(30분)
return jmsTemplate;
}
/**
* JmsListenerContainerFactory 을 위한 빈을 생성
*/
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory());
factory.setMessageConverter(jacksonJmsMessageConverter());
return factory;
}
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_typeId");
Map<String, Class<?>> typeIdMappings = new HashMap<>();
typeIdMappings.put("message", MessageDto.class);
converter.setTypeIdMappings(typeIdMappings);
return converter;
}
}
해당 클래스를 하나씩 분해해 보겠습니다 ❗️
@Value("${activemq.queue.name}")
private String queueName;
// spring.activeMq prefix 의 설정값을 가지는 클래스
private final ActiveMQProperties activeMQProperties;
@Value
을 통해 application.yml
파일의 설정한 QueueName 값을 가져옵니다.
또한 ActiveMq
라이브러리를 주입받으면 자동으로 ActiveMQProperties
Bean 이 생성됩니다.
// ActiveMQ Class 내부
ConfigurationProperties(prefix = "spring.activemq")
public class ActiveMQProperties {
/**
* URL of the ActiveMQ broker. Auto-generated by default.
*/
private String brokerUrl;
/**
* Whether the default broker URL should be in memory. Ignored if an explicit broker
* has been specified.
*/
private boolean inMemory = true;
/**
* Login user of the broker.
*/
private String user;
/**
* Login password of the broker.
*/
private String password;
/**
* Time to wait before considering a close complete.
*/
private Duration closeTimeout = Duration.ofSeconds(15);
/**
* Whether to stop message delivery before re-delivering messages from a rolled back
* transaction. This implies that message order is not preserved when this is enabled.
*/
private boolean nonBlockingRedelivery = false;
/**
* Time to wait on message sends for a response. Set it to 0 to wait forever.
*/
private Duration sendTimeout = Duration.ofMillis(0);
@ConfigurationProperties
어노테이션을 통해 application.yml
에 spring.activemq
을 prefix 로 하는 설정값들을 가지는 클래스가 만들어지는 것을 확인할 수 있습니다.
추가적으로 @ConfigurationProperties
대상이 되는 클래스를 사용하기 위해서는 @EnableConfigurationProperties
을 꼭 선언해야합니다.
@EnableConfigurationProperties(ActiveMQProperties.class) // 필수 ❗️
@SpringBootApplication
public class RedisApplication {
public static void main(String[] args) {
SpringApplication.run(RedisApplication.class,args);
}
}
/**
* activeMQ 는 61616 포트로 구동 중이다.
* Spring application 에서 해당 서버로 접근해야 한다. ActiveConnectionFactory 로 연결
*/
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(activeMQProperties.getBrokerUrl());
activeMQConnectionFactory.setUserName(activeMQProperties.getUser());
activeMQConnectionFactory.setPassword(activeMQProperties.getPassword());
return activeMQConnectionFactory;
}
/**
* JmsTemplate 은 연결 후 실제 작업을 하기 위한 template
*/
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(activeMQConnectionFactory());
jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
jmsTemplate.setExplicitQosEnabled(true); // 메시지 전송 시 QOS을 설정
jmsTemplate.setDeliveryPersistent(false); // 메시지의 영속성을 설정
jmsTemplate.setReceiveTimeout(1000 * 3); // 메시지를 수신하는 동안의 대기 시간을 설정(3초)
jmsTemplate.setTimeToLive(1000 * 60 * 30); // 메시지의 유효 기간을 설정(30분)
return jmsTemplate;
}
앞서 도커로 띄운 61616
포트의 ActiveMQ
브로커는 ActiveMQConnectionFactory
을 통해 연결을 합니다.
연결을 했으면 ActiveMQ
을 통해 실제 메세지 송/수신을 가능하게 하는 것이 필요하겠죠?
ActiveMQ
는 JMS
기반으로 동작하기 때문에 JMS
와 관련된 template 을 제공합니다.
JmsTemplate
이 해당 역할을 수행합니다 ❗️
/**
* JmsListenerContainerFactory 을 위한 빈을 생성
*
*/
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory());
factory.setMessageConverter(jacksonJmsMessageConverter());
return factory;
}
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_typeId");
Map<String, Class<?>> typeIdMappings = new HashMap<>();
typeIdMappings.put("message", MessageDto.class);
converter.setTypeIdMappings(typeIdMappings);
return converter;
}
JmsListenerContainerFactory
Bean 은 해당 메세지를 소비하는 Listener 에 대한 클래스라고 생각하면 됩니다.
다른 서버끼리의 통신을 통해 메세지를 주고받을 때 메세지를 변환(직렬화)시켜주는 MessageConverter
가 필요하겠죠?
여기서는 많이 사용되는 MappingJackson2MessageConverter
을 사용하고자 합니다.
Spring 에서는 다양한 Message Converter
을 제공하는데 [Cache] Redis 직렬화 방법에 대해서 을 참고해주세요❗️
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class MessageDto {
private String title;
private String content;
}
MessageDto
는 Producer
가 ActiveMQ
을 통해 Consumer
에게 발행하게될 메세지를 나타내는 클래스입니다.
실제 메세지의 전달 및 수신에 대한 로직이 있는 클래스입니다.
@Slf4j
@RequiredArgsConstructor
@Service
public class MessageService {
@Value("${activemq.queue.name}")
private String queueName;
// jmsTemplate 을 통해 메세지 송신 가능
private final JmsTemplate jmsTemplate;
/**
* Queue 로 메세지를 발행
* messageDto -> Producer 가 Queue 발행한 메세지 Class
*/
public void sendMessage(MessageDto messageDto) {
log.info("message sent : {}", messageDto.toString());
// queueName(Sample-queue) 에 메세지 전송
jmsTemplate.convertAndSend(queueName,messageDto);
}
@JmsListener(destination = "${activemq.queue.name}")
public void receiveMessage(MessageDto messageDto) {
log.info("Received message : {}",messageDto.toString());
}
}
저는 간단한 실습을 위해 하나의 서버에 Producer
와 Consumer
을 구현했습니다.
sendMessage
메소드는 Producer
의 로직을 가집니다.
Controller
에서 MessageDto
을 받아서 jmsTemplate 의 convertAndSend
메소드를 통해 앞서 만든 sample-queue
에 메세지를 전송합니다 👨💻
이렇게 메세지를 보낸 후 receiveMessage
메소드에서 수신을 할 것인데
이때 라이브러리에서 제공하는 @JmsListener
어노테이션을 통해 destination 을 지정해줍니다.
이를 통해 해당 메소드는 sample-queue
로 들어오는 메세지를 수신할 수 있습니다.
@Slf4j
@RequiredArgsConstructor
@RestController
public class MessageController {
private final MessageService messageService;
@PostMapping("/send/message")
public ResponseEntity<String> sendMessage(@RequestBody MessageDto messageDto) {
this.messageService.sendMessage(messageDto);
return ResponseEntity.ok("Message send to ActiveMQ!");
}
}
MessageController
을 만들어서 Postman 을 이용해 테스트 해보겠습니다❗️
정상적으로 테스트가 진행된 것을 확인할 수 있습니다 👏
[ActiveMQ] ActiveMQ 란? (feat. 여러 MQ 정리) JMS, MOM
Springboot에서 ActiveMQ로 메세시 송수신하기
[Spring Boot] ActiveMQ 연동하기
좋은 글 감사합니다~