이번 포스팅에서는 Spring에서 ActiveMQ를 세팅하고 구현하는 과정에 대해 기술하였습니다.
Spring에서 ActiveMQ 클라이언트를 구현하기 앞서 필요한 JMS 주요 개념을 먼저 살펴보겠습니다.
ActiveMQ는 Java로 개발된 MQ 미들웨어로, JMS를 기반으로 통신을 제어합니다.
즉, ActiveMQ는 JMS의 구현체입니다.
JMS란?
MOM (메시지 지향 미들웨어)를 구현한 자바 API
공통 인터페이스
메세지 방식 인터페이스
메시지 송수신 인터페이스
@RequiredArgsConstructor
@Slf4j
class MessageService {
public boolean send(String message) {
// 1. ActiveMQ ConnectionFactory 생성
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
// 2. 목적지(Destination) 생성
// 2.1 큐(Queue) 생성
Queue queue = new ActiveMQQueue("Queue 이름");
// 2.2 토픽(Topic) 생성
Topic topic = new ActiveMQTopic("Topic 이름");
// 6. Try 문에서 객체를 생성하여 Auto-close
try (ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
ActiveMQSession session = (ActiveMQSession) connection.createSession(false, ActiveMQSession.AUTO_ACKNOWLEDGE)) {
// 3. 세션 취득
connection.start();
// 4. 목적지 전송
// 4.1 Producer 생성
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 4.2 Message 생성
Message m = session.createTextMessage(message);
// 4.3 Message 전송
producer.send(m);
log.info("ActiveMQ 메세지 전송. 메세지: {}", message);
return true;
}
// 5. Exception 처리
catch (JMSException e) {
log.error("JMS 에러 발생. 메시지: {}", e.getMessage());
return false;
}
}
}
자바에서 제공하는 기본 API인 JMS를 이용하면 JDBC와 유사하게 연결부터 데이터 송수신까지 수많은 보일러플레이트 코드를 작성해야합니다.
스프링은 템플릿 기반으로 JMS 코드를 단순화하는 솔루션을 제공합니다. JMS 템플릿 클래스 JmsTemplate을 이용하면 소량의 코드만으로도 JMS 메시지를 주고받을 수 있습니다.
우선 테스트 프로그램을 만들기 위해 Spring Initializr를 이용해 프로젝트를 생성하였습니다.
- 프로젝트명: mqclient
- Java: 17
- Spring Boot: 3.3.1
프로젝트 환경 설정 | 종속성 추가 |
---|---|
이번 포스팅에서 다루지는 않을 예정이지만, Spring Web + Thymeleaf + WebSocket을 추가하여 Front에서 기능을 구현하면 실시간 채팅 애플리케이션을 구현할 수 있습니다
추가적으로 Lombok 및 Json 관련 라이브러리를 추가해주었습니다
<properties>
<java.version>17</java.version>
<jackson.version>2.17.2</jackson.version>
</properties>
<!-- Lombok -->
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- JSON -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20231013</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
</dependencies>
롬복 플러그인 설정
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
최신버전은 6.x 지만 라이브러리가 Spring for Apache ActiveMQ 5 이므로 5.x를 다운받았습니다.
application.yml
파일에 사용할 브로커의 설정 값을 입력합니다.
spring:
activemq:
broker-url: tcp://localhost:61616 # ActiveMQ Broker URL
user: admin # ActiveMQ 웹 관리 콘솔 아이디
password: admin # ActiveMQ 웹 관리 콘솔 비밀번호
이 설정 값들은 ActiveMQProperties 객체로 활용이 가능합니다.
// ActiveMQ Class 내부
@ConfigurationProperties(
prefix = "spring.activemq"
)
public class ActiveMQProperties {
private static final String DEFAULT_NETWORK_BROKER_URL
= "tcp://localhost:61616";
private String brokerUrl;
private String user;
private String password;
private Duration closeTimeout = Duration.ofSeconds(15L);
private boolean nonBlockingRedelivery = false;
private Duration sendTimeout = Duration.ofMillis(0L);
@NestedConfigurationProperty
private final JmsPoolConnectionFactoryProperties pool =
new JmsPoolConnectionFactoryProperties();
private final Packages packages = new Packages();
}
ActiveMQ에서 사용할 Bean들을 관리하기 위한 클래스를 생성하고, @Configuration을 이용하여 설정합니다.
ActiveMQ에서 사용할 Bean 들에는 ConnectionFactory, JmsTemplate, Queue, Topic, MessageConverter, JmsListenerContainerFactory 등이 있습니다.
초기 설정에는 ConnectionFactory 및 JmsTemplate을 추가해주었습니다.
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;
@Configuration
@EnableConfigurationProperties(ActiveMQProperties.class)
public class ActiveMQConfig {
@Bean
public ActiveMQConnectionFactory connectionFactory(ActiveMQProperties properties) {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(properties.getBrokerUrl());
factory.setUserName(properties.getUser());
factory.setPassword(properties.getPassword());
factory.setCloseTimeout(properties.getCloseTimeout().toMillisPart());
return factory;
}
@Bean
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory factory) {
JmsTemplate template = new JmsTemplate(factory);
template.setExplicitQosEnabled(true); // 메시지 전송 시 QOS을 설정
template.setDeliveryPersistent(false); // 메시지의 영속성을 설정
template.setReceiveTimeout(1000 * 3); // 메시지를 수신하는 동안의 대기 시간을 설정(3초)
template.setTimeToLive(1000 * 60 * 30); // 메시지의 유효 기간을 설정(30분)
return template;
}
}
@EnableConfigurationProperties 어노테이션을 활용하여 ActiveMQProperties 객체를 바인딩해주었습니다
Java 객체를 JSON 형태로 변환하기 위해서는 Converter를 등록해야합니다.
ActiveMQConfig 클래스에 다음과 같이 Converter를 추가해줍니다.
@Bean
public MappingJackson2MessageConverter messageConverter() {
MappingJackson2MessageConverter messageConverter =
new MappingJackson2MessageConverter();
messageConverter.setTypeIdPropertyName("_typeId");
// VO 타입 추가하기
Map<String, Class<?>> typeIdMappings = new HashMap<>();
typeIdMappings.put("chat", ChatMessage.class);
essageConverter.setTypeIdMappings(typeIdMappings);
return messageConverter;
}
추가한 Converter를 JmsTemplate에 추가해줍니다.
@Bean
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory factory,
MappingJackson2MessageConverter messageConverter) {
JmsTemplate template = new JmsTemplate(factory);
template.setMessageConverter(messageConverter); // 메시지 Converter 설정
... (중략)
return template;
}
이제 간단한 Message 송수신을 구현해볼 차례입니다!
우선, 메시지를 VO 객체로 구현합니다.
저는 Chatting Message를 구현했습니다.
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ChatMessage {
private String userId;
private String msg;
@JsonSerialize(using = CustomLocalDateTimeSerializer.class)
@JsonDeserialize(using = CustomLocalDateTimeDeserializer.class)
private LocalDateTime date;
}
직렬화, 역직렬화 예시를 보여주기 위해 VO 객체 필드에 많이 사용하는 LocalDateTime 객체를 추가했습니다.
LocalDateTime 객체는 json 형태로 자동 변환을 지원하지 않아서 직렬화 및 역직렬화를 직접 구현해야합니다.
// 직렬화 코드
@JsonComponent
public class CustomLocalDateTimeSerializer extends StdSerializer<LocalDateTime> {
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss");
public CustomLocalDateTimeSerializer() {
this(null);
}
protected CustomLocalDateTimeSerializer(Class<LocalDateTime> t) {
super(t);
}
@Override
public void serialize(LocalDateTime localDateTime, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeString(localDateTime.format(DATE_TIME_FORMATTER));
}
}
// 역직렬화 코드
@JsonComponent
public class CustomLocalDateTimeDeserializer extends StdDeserializer<LocalDateTime> {
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss");
public CustomLocalDateTimeDeserializer() {
this(null);
}
public CustomLocalDateTimeDeserializer(Class<?> vc) {
super(vc);
}
@Override
public LocalDateTime deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
JsonNode node = jsonParser.getCodec().readTree(jsonParser);
String dateString = node.asText();
return LocalDateTime.parse(dateString, DATE_TIME_FORMATTER);
}
}
현업에서는 메세지 프로토콜이 이미 정의되어 있기 때문에 Destination (Queue, Topic) 등을 String으로 보낼 필요가 없습니다.
application.yml 파일에 사용할 Queue와 Topic의 이름을 관리합니다.
# 파일명: application.yml
activemq:
queue:
chat: chat
topic:
chat: chat
환경설정 클래스(ActiveMQConfig)에 Bean을 추가하여 해당 Destination을 싱글톤 객체로 관리해줍니다
@Bean
public ActiveMQQueue chatQueue(@Value("${activemq.queue.chat}") String queueName) {
return new ActiveMQQueue(queueName);
}
@Bean
public Topic chatTopic(@Value("${activemq.topic.chat}") String topicName) {
return new ActiveMQTopic(topicName);
}
Topic과 Queue를 보내는 간단한 인터페이스입니다.
public interface ChattingService {
boolean send(Destination destination, ChatMessage message);
boolean sendQueue(ChatMessage message);
boolean sendTopic(ChatMessage message);
}
JmsTemplate을 이용하여 간편하게 구현이 가능합니다.
이미 정의한 Queue, Topic을 Autowiring 하여 메시지를 전송합니다.
@Service
@RequiredArgsConstructor
@Slf4j
public class ChatServiceImpl implements ChatService {
private final JmsTemplate template;
private final Queue chatQueue;
private final Topic chatTopic;
@Override
public boolean send(Destination destination, ChatMessage message) {
try {
template.convertAndSend(destination, message);
return true;
} catch (JmsException exception) {
log.error("JMS Exception 발생: {}", exception.getMessage());
exception.getStackTrace();
return false;
}
}
@Override
public boolean sendQueue(ChatMessage message) {
log.info("[Queue] send message: {}", message);
return send(chatQueue, message);
}
@Override
public boolean sendTopic(ChatMessage message) {
log.info("[Topic] send message: {}", message);
return send(chatTopic, message);
}
}
메시지를 수신받기 위해서는 메시지 수신 컨테이너를 등록해야합니다. 환경설정 클래스에서 JmsListenerContainerFactory 빈을 등록해줍니다.
@Bean
public JmsListenerContainerFactory<?> queueContainerFactory(
ActiveMQConnectionFactory connectionFactory, MappingJackson2MessageConverter messageConverter) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter);
return factory;
}
@Bean
public JmsListenerContainerFactory<?> topicContainerFactory(
ActiveMQConnectionFactory connectionFactory,
MappingJackson2MessageConverter messageConverter) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// Topic 구조로 설정
factory.setPubSubDomain(true);
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter);
return factory;
}
메시지 리스너를 구현하는 방법에는 두가지가 있습니다
메소드 단에서 구현
@JmsListener 어노테이션을 이용하면 리스너를 손쉽게 구현할 수 있습니다
@Component
@Slf4j
public class ChatListener {
@JmsListener(destination = "${activemq.queue.chat}", containerFactory = "queueContainerFactory")
public void receiveQueue(ChatMessage message) {
log.info("[Queue] receive message: {}", message);
}
@JmsListener(destination = "${activemq.topic.chat}", containerFactory = "topicContainerFactory")
public void receiveTopic(ChatMessage message) {
log.info("[Topic] receive message: {}", message);
}
}
클래스 단에서 구현
또 다른 방법으로는 MessageListener를 구현하여 DefaultMessageListenerContainer에 등록하는 방법입니다.
먼저 리스너를 구현할 클래스를 생성하여 MessageListener 인터페이스를 implements 합니다.
@Component
@Slf4j
public class TopicListener implements MessageListener {
@Override
public void onMessage(Message message) {
log.info("[Topic Object] receive message: {}", message);
}
}
그런 다음 DefaultMessageListenerContainer 빈을 추가하여 구현한 MessageListener를 등록합니다.
@Bean
public DefaultMessageListenerContainer topicContainerFactory2(
ActiveMQConnectionFactory factory, Topic topic, TopicListener listener) {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(factory);
container.setDestination(topic);
// 1. 구현한 클래스 등록
container.setMessageListener(listener);
// 2. 람다식 구현
container.setMessageListener((MessageListener) message -> {
log.info("[Topic - Lambda] receive message: {}", message);
});
return container;
}
꼭 클래스 파일을 생성하지 않아도 람다 익명 객체를 이용하여 메시지 리스너를 구현할 수도 있습니다.
메시지 리스너를 클래스로 구현할 때 한가지 아쉬운 점은 MessageListener가 Message 객체를 처리할 것을 요구하기 때문에 VO로 바로 받기 어렵다는 점입니다
Message 객체를 로깅해보면 아주 아주 많은 정보를 담고 있는 것을 볼 수 있습니다.
ActiveMQBytesMessage {
commandId = 5, responseRequired = false,
messageId = ID:DESKTOP-05O71ON-63104-1720286743523-1:5:1:1:1,
originalDestination = null, originalTransactionId = null,
producerId = ID:DESKTOP-05O71ON-63104-1720286743523-1:5:1:1,
destination = topic://TOPIC_CHAT,
transactionId = null,
deliveryTime = 0, expiration = 1720288545781, timestamp = 1720286745781, arrival = 0,
brokerInTime = 1720286745782, brokerOutTime = 1720286745794,
correlationId = null, replyTo = null, persistent = false,
type = null, priority = 4, groupID = null, groupSequence = 0,
targetConsumerId = null, compressed = false, userID = null,
content = org.apache.activemq.util.ByteSequence@656d5cea,
marshalledProperties = org.apache.activemq.util.ByteSequence@38811fe,
dataStructure = null, redeliveryCounter = 0, size = 0,
properties = {_typeId=chat}, readOnlyProperties = true,
readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
ActiveMQBytesMessage{ bytesOut = null, dataOut = null, dataIn = null
}
Message To Object
이 Message를 parsing하기 위해서는 아래와 같이 구현이 가능합니다.
@Component
@Slf4j
public class TopicListener implements MessageListener {
@Override
public void onMessage(Message message) {
log.info("[Topic Object] receive message: {}", message);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
ChatMessage body = null;
try {
body = textMessage.getBody(ChatMessage.class);
} catch (JMSException e) {
throw new RuntimeException(e);
}
log.info("[Topic Object] receive body: {}", body);
}
}
}
메시지 송수신이 잘 이루어지고 있는지 테스트 코드를 작성하여 확인하였습니다.
@SpringBootTest
class MqclientApplicationTests {
private final ChatService chatService;
@Autowired
MqclientApplicationTests(ChatService chatService) {
this.chatService = chatService;
}
@Test
@DisplayName("Queue 테스트")
void topicQueue() throws JMSException {
ChatMessage message = ChatMessage.builder()
.userId("twinklekhj")
.msg("안녕하세요!")
.date(LocalDateTime.now())
.build();
Assertions.assertTrue(chatService.sendQueue(message), "Queue 보내기 실패");
}
@Test
@DisplayName("Topic 테스트")
void topicTopic() throws JMSException {
ChatMessage message = ChatMessage.builder()
.userId("twinklekhj")
.msg("좋은 하루 보내세요~")
.date(LocalDateTime.now())
.build();
Assertions.assertTrue(chatService.sendTopic(message), "Topic 보내기 실패");
}
}
테스트 로그는 다음과 같습니다!
안녕하세요~ 좋은 하루보내세요😊🙌
이미 ActiveMQ 클라이언트 프로그램을 완벽하게 구현했다고 생각했지만, 내용을 정리하면서 기존 코드가 얼마나 깔끔하지 못했는지 다시 한번 회고하게 되는 시간이었습니다😢