[Spring Boot] ActiveMQ (2) - Spring Boot에서 ActiveMQ 사용하기

김희정·2024년 7월 6일
0

Spring

목록 보기
15/18

💎 들어가며

이번 포스팅에서는 Spring에서 ActiveMQ를 세팅하고 구현하는 과정에 대해 기술하였습니다.


1. 구현에 앞서

Spring에서 ActiveMQ 클라이언트를 구현하기 앞서 필요한 JMS 주요 개념을 먼저 살펴보겠습니다.


1.1 JMS

ActiveMQ는 Java로 개발된 MQ 미들웨어로, JMS를 기반으로 통신을 제어합니다.
즉, ActiveMQ는 JMS의 구현체입니다.

JMS란?

MOM (메시지 지향 미들웨어)를 구현한 자바 API


JMS의 주요 구성요소

공통 인터페이스

  • ConnectionFactory: Connection 연결 설정 정보 관리 인터페이스.
  • Session: 연결 세션 관리 인터페이스. Producer, Consumer, Listener, Message 등 관리 기능 수행.
  • Destination: 목적지 인터페이스. 메세지를 수신받을 목적지 (Consumer 의 주소)
  • Message: 전송 할 메시지 인터페이스.

메세지 방식 인터페이스

  • Queue: Queue는 특정 Consumer에게 메시지 전송
  • Topic: Topic은 발행(Publish)-구독(Subscribe) 모델로, Topic 구독자에게 메시지 전송

메시지 송수신 인터페이스

  • MessageProducer: 메시지 전송 인터페이스
  • MessageConsumer: 메시지 수신 처리 인터페이스
  • MessageListener: 메시지 수신 처리 콜백 인터페이스

JMS 연결 순서

  1. 메시지 브로커 연결을 위한 Connection Factory 생성
  2. 큐, 토픽 중 방식을 선택하여 Destination 생성
  3. 브로커 서버 연결 ⇒ 세션 취득
  4. 메시지 ProducerConsumer를 생성하여 메시지 송수신
  5. Exception 처리
  6. 세션 및 연결 닫기

Java 예시 코드

@RequiredArgsConstructor
@Slf4j
class MessageService {
  public boolean send(String message) {
      // 1. ActiveMQ ConnectionFactory 생성
      ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
      Connection connection = connectionFactory.createConnection();
      
      // 2. 목적지(Destination) 생성
      // 2.1 큐(Queue) 생성
      Queue queue = new ActiveMQQueue("Queue 이름");
      
      // 2.2 토픽(Topic) 생성
      Topic topic = new ActiveMQTopic("Topic 이름");

      // 6. Try 문에서 객체를 생성하여 Auto-close
      try (ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
           ActiveMQSession session = (ActiveMQSession) connection.createSession(false,  ActiveMQSession.AUTO_ACKNOWLEDGE)) {
          
          // 3. 세션 취득
          connection.start();
          
          // 4. 목적지 전송
          // 4.1 Producer 생성
          MessageProducer producer = session.createProducer(queue);
          producer.setDeliveryMode(DeliveryMode.PERSISTENT);
          
          // 4.2 Message 생성
          Message m = session.createTextMessage(message);
          
          // 4.3 Message 전송
          producer.send(m);

          log.info("ActiveMQ 메세지 전송. 메세지: {}", message);

          return true;
      }
      // 5. Exception 처리
      catch (JMSException e) {
          log.error("JMS 에러 발생. 메시지: {}", e.getMessage());
          return false;
      }
  }
}

JmsTemplate

자바에서 제공하는 기본 API인 JMS를 이용하면 JDBC와 유사하게 연결부터 데이터 송수신까지 수많은 보일러플레이트 코드를 작성해야합니다.

스프링은 템플릿 기반으로 JMS 코드를 단순화하는 솔루션을 제공합니다. JMS 템플릿 클래스 JmsTemplate을 이용하면 소량의 코드만으로도 JMS 메시지를 주고받을 수 있습니다.


2. Spring Boot 설정

2.1 Project Setting

프로젝트 생성

우선 테스트 프로그램을 만들기 위해 Spring Initializr를 이용해 프로젝트를 생성하였습니다.

  • 프로젝트명: mqclient
  • Java: 17
  • Spring Boot: 3.3.1
프로젝트 환경 설정종속성 추가

이번 포스팅에서 다루지는 않을 예정이지만, Spring Web + Thymeleaf + WebSocket을 추가하여 Front에서 기능을 구현하면 실시간 채팅 애플리케이션을 구현할 수 있습니다


라이브러리 추가

추가적으로 Lombok 및 Json 관련 라이브러리를 추가해주었습니다

<properties>
  <java.version>17</java.version>
  <jackson.version>2.17.2</jackson.version>
</properties>

<!-- Lombok -->
<dependencies>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>

  <!-- JSON -->
  <dependency>
    <groupId>org.json</groupId>
    <artifactId>json</artifactId>
    <version>20231013</version>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>${jackson.version}</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>${jackson.version}</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
    <version>${jackson.version}</version>
  </dependency>
</dependencies>

롬복 플러그인 설정

<plugin>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-maven-plugin</artifactId>
  <configuration>
    <excludes>
      <exclude>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
      </exclude>
    </excludes>
  </configuration>
</plugin>

ActiveMQ 5.18.4 다운로드

최신버전은 6.x 지만 라이브러리가 Spring for Apache ActiveMQ 5 이므로 5.x를 다운받았습니다.
ActiveMQ 5.18.4 버전


2.2 Configuration

application.yml

application.yml 파일에 사용할 브로커의 설정 값을 입력합니다.

spring:
  activemq:
    broker-url: tcp://localhost:61616 # ActiveMQ Broker URL
    user: admin       # ActiveMQ 웹 관리 콘솔 아이디
    password: admin   # ActiveMQ 웹 관리 콘솔 비밀번호

이 설정 값들은 ActiveMQProperties 객체로 활용이 가능합니다.

// ActiveMQ Class 내부
@ConfigurationProperties(
    prefix = "spring.activemq"
)
public class ActiveMQProperties {
    private static final String DEFAULT_NETWORK_BROKER_URL
    				= "tcp://localhost:61616";
    private String brokerUrl;
    private String user;
    private String password;
    private Duration closeTimeout = Duration.ofSeconds(15L);
    private boolean nonBlockingRedelivery = false;
    private Duration sendTimeout = Duration.ofMillis(0L);
    @NestedConfigurationProperty
    private final JmsPoolConnectionFactoryProperties pool = 
    				new JmsPoolConnectionFactoryProperties();
    private final Packages packages = new Packages();
}

Config 클래스 생성

ActiveMQ에서 사용할 Bean들을 관리하기 위한 클래스를 생성하고, @Configuration을 이용하여 설정합니다.

ActiveMQ에서 사용할 Bean 들에는 ConnectionFactory, JmsTemplate, Queue, Topic, MessageConverter, JmsListenerContainerFactory 등이 있습니다.


초기 설정에는 ConnectionFactoryJmsTemplate을 추가해주었습니다.

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;

@Configuration
@EnableConfigurationProperties(ActiveMQProperties.class)
public class ActiveMQConfig {
    @Bean
    public ActiveMQConnectionFactory connectionFactory(ActiveMQProperties properties) {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(properties.getBrokerUrl());
        factory.setUserName(properties.getUser());
        factory.setPassword(properties.getPassword());
        factory.setCloseTimeout(properties.getCloseTimeout().toMillisPart());

        return factory;
    }

    @Bean
    public JmsTemplate jmsTemplate(ActiveMQConnectionFactory factory) {
        JmsTemplate template = new JmsTemplate(factory);
        template.setExplicitQosEnabled(true);    // 메시지 전송 시 QOS을 설정
        template.setDeliveryPersistent(false);   // 메시지의 영속성을 설정
        template.setReceiveTimeout(1000 * 3);    // 메시지를 수신하는 동안의 대기 시간을 설정(3초)
        template.setTimeToLive(1000 * 60 * 30);  // 메시지의 유효 기간을 설정(30분)

        return template;
    }
}

@EnableConfigurationProperties 어노테이션을 활용하여 ActiveMQProperties 객체를 바인딩해주었습니다


MessageConverter 등록

Java 객체JSON 형태로 변환하기 위해서는 Converter를 등록해야합니다.
ActiveMQConfig 클래스에 다음과 같이 Converter를 추가해줍니다.

@Bean
public MappingJackson2MessageConverter messageConverter() {
    MappingJackson2MessageConverter messageConverter =
            new MappingJackson2MessageConverter();
    messageConverter.setTypeIdPropertyName("_typeId");
	
    // VO 타입 추가하기
    Map<String, Class<?>> typeIdMappings = new HashMap<>();
    typeIdMappings.put("chat", ChatMessage.class);

     essageConverter.setTypeIdMappings(typeIdMappings);

    return messageConverter;
}

추가한 Converter를 JmsTemplate에 추가해줍니다.

@Bean
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory factory, 
								MappingJackson2MessageConverter messageConverter) {
    JmsTemplate template = new JmsTemplate(factory);
    template.setMessageConverter(messageConverter); // 메시지 Converter 설정
	
    ... (중략)
    
    return template;
}

3. Message 송수신

이제 간단한 Message 송수신을 구현해볼 차례입니다!

3.1 VO 구현

우선, 메시지를 VO 객체로 구현합니다.
저는 Chatting Message를 구현했습니다.

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ChatMessage {
    private String userId;
    private String msg;

    @JsonSerialize(using = CustomLocalDateTimeSerializer.class)
    @JsonDeserialize(using = CustomLocalDateTimeDeserializer.class)
    private LocalDateTime date;
}

LocalDataTime 처리

직렬화, 역직렬화 예시를 보여주기 위해 VO 객체 필드에 많이 사용하는 LocalDateTime 객체를 추가했습니다.

LocalDateTime 객체는 json 형태로 자동 변환을 지원하지 않아서 직렬화 및 역직렬화를 직접 구현해야합니다.

// 직렬화 코드
@JsonComponent
public class CustomLocalDateTimeSerializer extends StdSerializer<LocalDateTime> {
    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss");

    public CustomLocalDateTimeSerializer() {
        this(null);
    }

    protected CustomLocalDateTimeSerializer(Class<LocalDateTime> t) {
        super(t);
    }

    @Override
    public void serialize(LocalDateTime localDateTime, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException {
        jsonGenerator.writeString(localDateTime.format(DATE_TIME_FORMATTER));
    }
}

// 역직렬화 코드
@JsonComponent
public class CustomLocalDateTimeDeserializer extends StdDeserializer<LocalDateTime> {
    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss");

    public CustomLocalDateTimeDeserializer() {
        this(null);
    }

    public CustomLocalDateTimeDeserializer(Class<?> vc) {
        super(vc);
    }

    @Override
    public LocalDateTime deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
        JsonNode node = jsonParser.getCodec().readTree(jsonParser);
        String dateString = node.asText();
        return LocalDateTime.parse(dateString, DATE_TIME_FORMATTER);
    }
}

3.2 Destination 관리

현업에서는 메세지 프로토콜이 이미 정의되어 있기 때문에 Destination (Queue, Topic) 등을 String으로 보낼 필요가 없습니다.

properties 설정

application.yml 파일에 사용할 Queue와 Topic의 이름을 관리합니다.

# 파일명: application.yml

activemq:
  queue:
    chat: chat
  topic:
    chat: chat

환경설정 클래스 수정

환경설정 클래스(ActiveMQConfig)에 Bean을 추가하여 해당 Destination을 싱글톤 객체로 관리해줍니다

@Bean
public ActiveMQQueue chatQueue(@Value("${activemq.queue.chat}") String queueName) {
    return new ActiveMQQueue(queueName);
}

@Bean
public Topic chatTopic(@Value("${activemq.topic.chat}") String topicName) {
    return new ActiveMQTopic(topicName);
}

3.3 메세지 보내기

인터페이스

Topic과 Queue를 보내는 간단한 인터페이스입니다.

public interface ChattingService {
    boolean send(Destination destination, ChatMessage message);
    boolean sendQueue(ChatMessage message);
    boolean sendTopic(ChatMessage message);
}

구현

JmsTemplate을 이용하여 간편하게 구현이 가능합니다.
이미 정의한 Queue, TopicAutowiring 하여 메시지를 전송합니다.

@Service
@RequiredArgsConstructor
@Slf4j
public class ChatServiceImpl implements ChatService {
    private final JmsTemplate template;
    private final Queue chatQueue;
    private final Topic chatTopic;

    @Override
    public boolean send(Destination destination, ChatMessage message) {
        try {
            template.convertAndSend(destination, message);

            return true;
        } catch (JmsException exception) {
            log.error("JMS Exception 발생: {}", exception.getMessage());
            exception.getStackTrace();
            return false;
        }
    }

    @Override
    public boolean sendQueue(ChatMessage message) {
    	log.info("[Queue] send message: {}", message);
        return send(chatQueue, message);
    }

    @Override
    public boolean sendTopic(ChatMessage message) {
    	log.info("[Topic] send message: {}", message);
        return send(chatTopic, message);
    }
}

3.4 메시지 받기

메시지 수신 컨테이너 등록

메시지를 수신받기 위해서는 메시지 수신 컨테이너를 등록해야합니다. 환경설정 클래스에서 JmsListenerContainerFactory 빈을 등록해줍니다.

@Bean
public JmsListenerContainerFactory<?> queueContainerFactory(
        ActiveMQConnectionFactory connectionFactory, MappingJackson2MessageConverter messageConverter) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(messageConverter);
    return factory;
}

@Bean
public JmsListenerContainerFactory<?> topicContainerFactory(
       ActiveMQConnectionFactory connectionFactory,
       MappingJackson2MessageConverter messageConverter) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        
    // Topic 구조로 설정
    factory.setPubSubDomain(true);
        
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(messageConverter);
    return factory;
}

메시지 리스너 구현

메시지 리스너를 구현하는 방법에는 두가지가 있습니다

  1. @JmsListener를 이용하여 메소드 단에서 구현
  2. MessageListener 인터페이스를 구현하여 DefaultMessageListenerContainer에 등록하여 클래스 단에서 구현

메소드 단에서 구현

@JmsListener 어노테이션을 이용하면 리스너를 손쉽게 구현할 수 있습니다

@Component
@Slf4j
public class ChatListener {
    @JmsListener(destination = "${activemq.queue.chat}", containerFactory = "queueContainerFactory")
    public void receiveQueue(ChatMessage message) {
        log.info("[Queue] receive message: {}", message);
    }

    @JmsListener(destination = "${activemq.topic.chat}", containerFactory = "topicContainerFactory")
    public void receiveTopic(ChatMessage message) {
        log.info("[Topic] receive message: {}", message);
    }
}

클래스 단에서 구현

또 다른 방법으로는 MessageListener를 구현하여 DefaultMessageListenerContainer에 등록하는 방법입니다.

먼저 리스너를 구현할 클래스를 생성하여 MessageListener 인터페이스를 implements 합니다.

@Component
@Slf4j
public class TopicListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        log.info("[Topic Object] receive message: {}", message);
    }
}

그런 다음 DefaultMessageListenerContainer 빈을 추가하여 구현한 MessageListener를 등록합니다.

@Bean
public DefaultMessageListenerContainer topicContainerFactory2(
        ActiveMQConnectionFactory factory, Topic topic, TopicListener listener) {
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
    container.setConnectionFactory(factory);
    container.setDestination(topic);
    
    // 1. 구현한 클래스 등록
    container.setMessageListener(listener);
    
    // 2. 람다식 구현
    container.setMessageListener((MessageListener) message -> {
        log.info("[Topic - Lambda] receive message: {}", message);
    });

    return container;
}

꼭 클래스 파일을 생성하지 않아도 람다 익명 객체를 이용하여 메시지 리스너를 구현할 수도 있습니다.

메시지 리스너를 클래스로 구현할 때 한가지 아쉬운 점은 MessageListener가 Message 객체를 처리할 것을 요구하기 때문에 VO로 바로 받기 어렵다는 점입니다

Message 객체를 로깅해보면 아주 아주 많은 정보를 담고 있는 것을 볼 수 있습니다.

ActiveMQBytesMessage {
commandId = 5, responseRequired = false, 
messageId = ID:DESKTOP-05O71ON-63104-1720286743523-1:5:1:1:1, 
originalDestination = null, originalTransactionId = null, 
producerId = ID:DESKTOP-05O71ON-63104-1720286743523-1:5:1:1, 
destination = topic://TOPIC_CHAT, 
transactionId = null, 
deliveryTime = 0, expiration = 1720288545781, timestamp = 1720286745781, arrival = 0, 
brokerInTime = 1720286745782, brokerOutTime = 1720286745794,
correlationId = null, replyTo = null, persistent = false, 
type = null, priority = 4, groupID = null, groupSequence = 0, 
targetConsumerId = null, compressed = false, userID = null, 
content = org.apache.activemq.util.ByteSequence@656d5cea, 
marshalledProperties = org.apache.activemq.util.ByteSequence@38811fe, 
dataStructure = null, redeliveryCounter = 0, size = 0, 
properties = {_typeId=chat}, readOnlyProperties = true, 
readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false} 
ActiveMQBytesMessage{ bytesOut = null, dataOut = null, dataIn = null 
}

Message To Object

이 Message를 parsing하기 위해서는 아래와 같이 구현이 가능합니다.

@Component
@Slf4j
public class TopicListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        log.info("[Topic Object] receive message: {}", message);
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            ChatMessage body = null;
            try {
                body = textMessage.getBody(ChatMessage.class);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
            log.info("[Topic Object] receive body: {}", body);
        }
    }
}

4. Test 코드 작성

메시지 송수신이 잘 이루어지고 있는지 테스트 코드를 작성하여 확인하였습니다.

@SpringBootTest
class MqclientApplicationTests {
    private final ChatService chatService;

    @Autowired
    MqclientApplicationTests(ChatService chatService) {
        this.chatService = chatService;
    }

    @Test
    @DisplayName("Queue 테스트")
    void topicQueue() throws JMSException {
        ChatMessage message = ChatMessage.builder()
                .userId("twinklekhj")
                .msg("안녕하세요!")
                .date(LocalDateTime.now())
                .build();

        Assertions.assertTrue(chatService.sendQueue(message), "Queue 보내기 실패");
    }
    @Test
    @DisplayName("Topic 테스트")
    void topicTopic() throws JMSException {
        ChatMessage message = ChatMessage.builder()
                .userId("twinklekhj")
                .msg("좋은 하루 보내세요~")
                .date(LocalDateTime.now())
                .build();

        Assertions.assertTrue(chatService.sendTopic(message), "Topic 보내기 실패");
    }
}

테스트 로그는 다음과 같습니다!

테스트 화면

안녕하세요~ 좋은 하루보내세요😊🙌


💎 마치며

이미 ActiveMQ 클라이언트 프로그램을 완벽하게 구현했다고 생각했지만, 내용을 정리하면서 기존 코드가 얼마나 깔끔하지 못했는지 다시 한번 회고하게 되는 시간이었습니다😢

profile
Java, Spring 기반 풀스택 개발자의 개발 블로그입니다.

0개의 댓글