[스프링 인 액션] 8.비동기 메시지 전송하기

김하영·2021년 7월 28일
1
  • 이 장에서 배우는 내용
    비동기 메시지 전송
    JMS, RabbitMQ, 카프카를 사용해서 메시지 전송하기
    브로커에서 메시지 가져오기
    메시지 리스닝하기

이전 장에서는 REST를 사용한 동기화 통신을 알아보았다.
그러나 이것만이 개발자가 사용할 수 있는 애플리케이션 간의 통신 형태는 아니다.

비동기 메시징은 에플리케이션 간에 응답을 기다리지 않고 간접적으로 메시지를 전송하는 방법이다.

비동기 메시징을 사용하는 방법 중에는 스프링이 제공하는 다음의 비동기 메시징을 고려할 수 있다.
바로 JMS / RabbitMQ / AMQP / Apache Kafka다.

스피링의 메시지 기반 POJO 지원에 관래 알아볼 것이다. 이것은 EJB의 MDB(Message Driven Bean)와 유사하게 메시지를 수신하는 방법이다.

  • POJO : 자바의 장점을 살리는 '오래된' 방식의 '순수한' 자바객체

"진정한 POJO란 객체지향적인 원리에 충실하면서, 환경과 기술에 종속되지 않고 필요에 따라 재활용될 수 있는 방식으로 설계된 오브젝트를 말한다." - 토비의 스프링

  • EJB : 기업환경의 시스템을 구현하기 위한 서버측 컴포넌트 모델이다.

즉, EJB는 애플리케이션의 업무 로직을 가지고 있는 서버 애플리케이션이다.

EJB 사양은 Java EE의 자바 API 중 하나로, 주로 웹 시스템에서 JSP는 화면 로직을 처리하고 EJB는 업무 로직을 처리하는 역할을 한다.

  • EJB에는 다음 3가지 종류가 있다.
  1. 세션 빈 (Session Bean) : DB 연동이 필요 없음
  2. 엔티티 빈 (Entity Bean)
    데이터베이스의 데이터를 관리하는 객체
    Insert(삽입), Update(수정), Delete(삭제), Select(조회)
    DB 관련 쿼리는 자동으로 만들어지고 개발자는 고급 업무 처리에 집중할 수 있음
    DB가 수정되면 코드 수정 없이 다시 배포(설정 문서 만들어서 복사)
  3. 메시지 구동 빈 (Message-driven Bean) : JMS로 빈을 날려줌

8.1 JMS로 메세지 전송하기

JMS란?

두 개 이상의 클라이언트 간에 메세지 통신을 위한 공통 API를 정의하는 자바 표준이다.

2001년에 처음 소개되었으며, JMS가 나오기 전에는 클라이언트 간에 메시지 통시을 중개하는 메세지 브로커들이 천편일률적 API를 가지고 있어 호환이 어려웠다.

스프링은 JmsTemplate 기반의 클래스를 통해 JMS를 지원하고, JmsTemplate을 사용하면 프로듀서가 큐(또는 토픽)에 메세지를 전송하고 컨슈머는 그 메세지들을 받을 수 있다.

스프링은 메세지 기반의 POJO는 큐 나 토픽에 도착하는 메세지에 반응하여 비동기 방식으로 메세지를 수신하는 간단한 자바 객체의 의미로 사용된다.

8.1.1 JMS 설정하기

JMS를 사용하기 위해서는 JMS 클라이언트를 프로젝트에 추가하기만 하면 된다.

메세지를 전달해 줄 브로커를 골라야 한다. (ActiveMQ or ActiveMQ Aretemis)

선택해서 의존성을 추가해보자!

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis<artifactId>
</dependency>

Aretemis 브로커를 결정했다면 몇가지 properties 를 설정해야 한다.

spring:
	artemis:
    host: artemis.tacocloud.com
    port: 61617
    user: tacoweb   //선택사항
    password: l3tm31n  //선택사항

프로젝트에 JMS를 추가 했고, 브로커가 한 어플리케이션에서 다른 어플리케이션으로 메세지를 전달하기 위해 대기중이다. 전송 시작할 준비가 되었다.

8.1.2 JmsTemplate을 사용해서 메세지 전송하기

JmsTemplate

JmsTemplate은 JMS로 작업하는데 필요한 코드를 줄여준다.
JmsTemplate은 메세지 브로커와 연결 및 세션을 생성하는 코드와 메세지를전송하는 도중 발생할 수 있는 예외를 처리하는 코드도 구현이 되어있다. 때문에 우리는 메세지 전송 작업에만 집중하면 된다.

send와 convertAndSend가 메세지 전송을 하는 메소드인데

  • send는 원시 형태의 메세지를 전송한다.

  • convertAndSend는 객체를 받아서 변환을 하고, 전송하기 전에 MessagePostProcessor로 추가로 후처리까지 한 후에 메세지를 전송한다.

void send(MessageCreator messageCreator) throws JmsException;
void send(Destination destination, MessageCreator messageCreator) throws JmsException;
void send(String destinationName, MessageCreator messageCreator) throws JmsException;

void convertAndSend(Object message) throws JmsException;
void convertAndSend(Destination destination, Object message) throws JmsException;
void convertAndSend(String destinationName, Object message) throws JmsException;

void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(Destination destination, Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(String destinationName, Object message, MessagePostProcessor postProcessor) throws JmsException;
  • send는 Message 객체를 생성하기 위해 MessageCreator 인자로 받는다.
  • converAndSend는 Object 타입을 인자로 받아 내부적으로 Message 타입으로 변환한다.
  • 메세지를 전송하기 전에 커스터마이징을 할 수 있도록 MessagePostProcessor 또한 인자로 받는다.
  • 목적지에 대한 아무런 인자를 받지 않은 경우: 해당 메세지를 미리 정해둔 기본 도착지로 전송한다.
  • Destination 객체나 String 객체를 이용해 목적지를 설정할 수 있다.

기본형 send(MessageCreator) 를 사용해 주문 데이터 전송하기

package tacos.messaging;

import javax.jms.JMSException;
import javax.jms.Message;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import tacos.Order;

@Service
public class JmsOrderMessagingService implements OrderMessagingService {

  private JmsTemplate jms;

  @Autowired
  public JmsOrderMessagingService(JmsTemplate jms) {
    this.jms = jms;
  }

	// 익명 내부 클래스를 인자로 전달한 예시1
  @Override
  public void sendOrder(Order order) {
    jms.send(new MessageCreator() {
			@Override
			public Message createMessage(Session session) throws JMSException {
						return session.createObjectMessage(order);
			}
  }

	// 람다를 사용해서 더 간단히 표현한 예시2
	@Override
  public void sendOrder(Order order) {
    jms.send(session -> session.createObjectMessage(order));
			}
  
}

예시 1, 2 모두 MessageCreator 인터페이스를 구현해서 send에 인자로 넘기고 있다.

MessageCreator는 함수형 인터페이스이기 때문에 람다로 나타낼 수 있다.
아까 destination 을 설정하지 않은 메소드들은 메세지를 기본 목적지로 보낸다고 했는데 이는 jms의 properties 중 하나를 설정한 것이다.

spring:
  jms:
    template:
      default-destination: tacocloud.order.queue

이처럼 기본 도착지를 지정해서 사용하는 것이 가장 쉬운 방법이다.
도착지 이름을 한 번만 지정하면 코드에서는 메세지가 전송되는 곳을 매번 신경쓰지 않고 전송만 하면 되기 때문이다.

기본 목적지로 메세지를 보내는게 아니라면 send()메소드에서 destination(Destination 객체 또는 String 형태)를 인자로 넘겨줘야 한다.

메세지 변환하고 전송하기 (convertAndSend)

앞에서 말했던 것처럼 convertAndSend는 객체를 인자로 넘겨주기만 하면 Message 객체로 변환 후 전송이 된다.

도착지를 선정하는 방법은 send 메소드와 동일하다.

Message 객체로 변환하는 일은 spring에 정의된 MessgeConverter 인터페이스 구현체가 사용된다.


// convertAndSend 사용 예시
@Override
 public void sendOrder(Order order) {
    jms.convertAndSend("tacocloud.order.queue", order);
  }
public interface MessageConverter {
        Message toMessage(Object object, Session session) throws JMSException, MessageCOnversionException;
        
        Object fromMessage(Message message);
    }
  • MappingJackson2MessageConverter: 메세지를 Json 으로 상호 변환
  • MarshallingMessageConverter: 메세지를 XML로 상호 변환
  • MessagingMessageConverter: 메세지를 Message 객체로 상호 변환
  • SimpleMessageConverter: 문자열을 TextMessage로, byte 배열을 ByteMessage로, Map을 MapMessage로, Serializable 객체를 ObjectMessage 로 상호 변환

후처리 메세지

수익성 좋은 웹 비즈니스 모델에는 항상 추가 요청사항이 오기 마련이다.

만약 예시의 타코 서비스가 오프라인 형태에서 온라인 형태로 사업을 확장한다고 해보자.
그럼 사람들은 1. 가게에서 직접, 2. 웹사이트 를 통해서 타코를 주문할 수 있다.
그럼 주문 메세지가 어디서 작성을 했는지 판별이 가능해야 한다.

이럴때는 Order 클래스의 객체를 (주문하는 쪽/ 주문받는 쪽 aka 주방) 양쪽에서 고치는 것보다 커스텀 헤더를 메세지에 추가하는게 좋다.

어떻게 메세지에 커스텀 헤더를 넣는 후처리를 해줄까?

send()의 경우

비교적 간단하게 전송 전에 Message 객체의 setStringProperty() 를 호출하면 된다.

@Override
    public void sendOrder(Order order) {
        jms.send(
                "tacocloud.order.queue",
                session -> {
                    Message message = session.createObjectMessage(order);
                    message.setStringProperty("X_ORDER_SOURCE", "WEB");
                });
        
    }

converterAndSend()의 경우

우리가 직접 Message 로 변환하는게 아니다 보니 Message 객체에 직접 접근을 할 수 없다.

이럴때는 converterAndSend()의 마지막 인자인 MessagePostProcessor 를 전달하면 Message 객체가 생성된 후 이 객체에 우리가 필요한 후처리를 할 수 있다.

@Override
    public void sendOrder(Order order) {
        jms.convertAndSend("tacocloud.order.queue", order, new MessagePostProcessor() {});
        @Override
        public Message postProcessMessage(Message message) throw JMSException {
            message.setStringProperty("X_ORDER_SOURCE", "WEB");
            return message;
        }
    }

8.1.3 JMS 메세지 수신하기

메세지 수신 방법

  • 풀 모델(pull model): 메세지를 요청하고 요청한 메세지가 도착할 때 까지 기다린다.
  • 푸시 모델(push model): 메세지를 요청하고, 실제로 수신 가능한 상태가 되면 메세지를 전달 받는다.

JmsTemplate의 경우 모든 메소드가 풀 모델을 사용한다.
따라서 메소드를 하나 호출해서 메세지를 요청하면 스레드에서 메세지를 수신할 수 있을 때 까지 한가하게 기다린다.

푸시 형식을 사용하려면 메세지 리스너를 정의해야 한다.
두 모델 모두 가능하나, 스레드 실행을 막지 않는다는 이유로 일반적으로는 푸시 모델이 좋은 선택이다. (많은 메세지가 너무 빨리 도착한다면 과부하가 걸릴 수는 있다).

JmsTemplate을 사용해서 메세지 수신하기

Message receive() throws JmsException;
Message receive(Destination destination) throws JmsException;
Message receive(String destinationName) throws JmsException;

Object receiveAndConvert() throws JmsException;
Object receiveAndConvert(Destination destination) throws JmsException;
Object receiveAndConvert(String destinationName) throws JmsException;
  • JmsTemplate의 send()와 convertAndSend() 메소드에 대응된다.

  • receive 는 원시타입 메세지를 수신, convertAndSend 는 메세지를 도메인 타입으로 변환하기 위해 구성된 메세지 변환기를 사용한다.

  • 마찬가지로 String 또는 Destination 으로 도착지를 설정할 수 있다. 단, 여기서 도착지는 메세지를 가져오는 곳이다.

푸시 모델 처리를 위한 메세지 리스너 선언하기

풀모델이 receive()나 receiveAndConvert()를 호출하는 것과 다르게 푸시 모델의 메세지 리스너는 메세지가 도착할 때 까지 대기하는 수동적 컴포넌트다.

JMS 메세지에 반응하는 메세지 리스트를 생성하기 위해서는 @JmsListener 어노테이션을 사용하면 된다.

JMS는 표준 바자 명세에 정의되어 있고 여러 브로커에서 지원되지만 자바 어플리케이션에서만 사용할 수 있다는 단점이 있다.

JVM 외의 다른 플렛폼에서도 사용할 수 있는 RabbitMQ와 카프카를 알아보자.

8.2 RabbitMQ와 AMQP 사용하기

AMQP가 구현한 RabbitMQ 는 JMS 보다 향상된 메세지 라우팅 전략을 제공한다.

JMS 메세지는 수신자가 가져갈 메세지 도착지의 이름을 주소로 사용했는데
RabbitMQ 메세지는 수신자가 리스닝하는 큐와 분리되어 있는 거래소 이름과 라우팅 키를 주소로 한다.

메세지가 RabbitMQ 브로커에 도착하면 주소로 지정된 거래소에 들어간다.
거래소는 하나 이상의 큐에 메세지를 전달할 책임이 있으며 처리 기반은 거래소의 타입, 거래소와 큐 간의 바인딩, 메세지의 라우팅 키 값이다.

거래소 종류

  • 기본: 브로커가 자동으로 생성하는 거래소. 해당 메세지의 라우팅 키와 이름이 같은 큐로 메세지를 전달한다. 또한 모든 큐는 자동으로 기본 거래소와 연결된다.
  • 디렉트: 바인딩 키가 해당 메세지의 라우팅 키와 같은 큐에 메세지를 전달한다.
  • 토픽: 바인딩 키(와일드 카드 포함)가 해당 메세지의 라우팅 키와 같은 큐들에 메세지를 전달한다.
  • 팬아웃:바인딩 키, 라우팅 키와 상관없이 연결된 모든 큐에 메세지를 전달한다.
  • 헤더: 토픽과 유사하나 라우팅 키 대신 메세지 헤더 값을 기반으로 한다.
  • 데드 레터: 전달이 불가능한 메세지들을 모아두는 쓰레기장

간단한 형태인 기본과 팬아웃 거래소는 JMS의 큐 형태와도 유사하다.
그러나 다른 거래소를 사용하면 더 유연한 라우팅 스킬을 정의할 수 있다.

메세지는 라우팅 키를 갖고 거래소로 전달되고 큐에서 읽혀져 소비된다는 것을 이해하는 것이 가장 중요하다!

메세지는 바인딩 정의를 기반으로 거래소로부터 큐로 전달된다.

스프링 어플리케이션에서 메세지를 전송하고 수신하는 방법은 사용하는 거래소 타입과 무관하며, 거래소와 큐의 바인딩을 정의하는 방법과도 관계가 없다. (스프링 단에서 거래소 관련 설정이 없다). RabbitMQ를 사용해서 메세지를 전송/수신하는 코드를 작성해야 한다.

8.2.1 RabbitMQ를 스프링에 추가하기

JMS 예제에서 Artemis를 프로젝트에 추가한 것 처럼 이번에는 RabbitMQ를 구현한 AMQP를 추가해주면 된다.
프로젝트에 빌드를 하면 메세지 전송을 가능하게 해주는 RabbitTemplate을 사용할 수 있다.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Artemis 의 Properties를 지정했던 것처럼 rabbitmq를 위한 properties를 지정해주자.
형태는 Artemis와 유사하다.
개발환경에서는 properties 지정이 큰 상관이 없으나 운영 환경으로 변경할 때 유용하니 설치 초반에 꼭 설정해주자.

spring:
  profiles: prod
  rabbitmq:
    host: rabbit.tacocloud.com
    port: 5673
    username: tacoweb
    password: l3tm31n

RabbitTemplate을 사용해서 메세지 전송하기

abbitTemplate도 JMSTemplate과 유사한 send()와 convertAndSend()를 사용하나, JmsTemplate이 지정된 큐에만 메세지를 전송했다면 RabbitTemplate은 거래소와 라우팅 키를 이용해서 메세지를 전송한다.

void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;

void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;
void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
  • send 메세지는 원시 형태의 Message를 전송, convertAndSend는 내부 변환 후 Message 전송 (JmsTemplate과 동일)
  • convertAndSend에 MessagePostProcessor 를 인자로 넘겨 후처리를 한다. (JmsTemplate과 동일)
  • Destination 객체나 String으로 도착지 정보를 받았던 JmsTemplate과는 다르게 String routingKey, String exchange로 라우팅 키와 거래소를 지정하는 문자열을 인자로 받는다.
  • 라우팅과 거래소를 지정하지 않았다면 기본 라우팅과 거래소가 사용된다. properties 설정을 통해 기본으로 사용되는 라우팅 키와 거래소를 바꿀 수 있다.
spring:
  rabbitmq:
    template:
      exchange: tacocloud.orders
      routing-key: kitchens.central

8.3 카프카 사용하기

JMS와 RabbitMQ는 약간 구형의 기술 모델이다.

아파치 카프카는 가장 새로운 메세징 시스템이고, RabbiMQ(ActiveMQ, Artemis)와 유사한 메세지 브로커지만 특유의 아키텍처를 가지고 있다.

  • 높은 확장성을 제공하는 클로스터로 실행되도록 설계되어있다.
  • 클러스터의 모든 카프카 인스턴스의 토픽을 파티션으로 분할하여 메세지를 관리한다.
  • 카프카는 큐, 거래소를 사용하지 않고 오직 토픽만 사용한다.
  • 카프카 토픽은 클러스터의 모든 브로커에 걸쳐 복제된다.
  • 클러스터의 각 노드는 하나 이상의 토픽에 대한 리더(leader)로 동작한다. 하는 역할로는 토픽 데이터를 관리하고, 클러스터의 다른 노드로 데이터를 복제한다.
  • 주키퍼는 분산 어플리케이션을 위한 코디네이션 시스템이다.
    분산되어있는 각 어플리케이션의 정보를 중앙에 집중하고 수정, 관리, 그룹 관리, 동기화 등을 한다.

8.3.1 카프카 사용을 위한 스프링 설정

스프링에서 카프카를 사용하기 위해서 의존성을 빌드에 추가해주자!

ActiveMQ와 RabbitMQ와는 다르게 spring-boot-starter 시리즈에 들어가있지 않다.

ActiveMQ(JMSTemplate), RabbitMQ(RabbitTemplate)처럼 빌드에 추가하면 KafkaTemplate을 메세지 전송/수신에 사용할 수 있다.

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

카프카를 사용하기 전에 운영환경에서 사용하거나, 사용을 편리하게 해주는 카프카 속성을 properties에 설정해두자.

spring:
  kafka:
    bootstrap-servers:
    - kafka.tacocloud.com:9092

bootstrap-servers에는 카츠카 클러스터 초기 연결에 사용되는 하나 이상의 카프카 서버들의 위치를 설정한다.
여러가지 서버에서 받아온다면 줄 지어서 써주면 된다. bootstrap-servers는 리스트를 받는다.

spring:
  kafka:
    bootstrap-servers:
    - kafka.tacocloud.com:9092
    - kafka.tacocloud.com:9093
    - kafka.tacocloud.com:9094

8.3.2 KafkaTemplate을 사용해서 메세지 전송하기

이전 장에서 나왔던 Template들과는 사용하는 메서드가 살짝 다르다.

ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
  • convetAndSend() 메서드가 없다!
    KafkaTemplate은 제네릭 타입을 사용하고, 메세지를 전송할 때 직접 도메인 타입을 처리할 수 있기 때문에 모든 send() 메서드가 convertAndSend() 기능을 가지고 있다.

  • send()와 sendDefault()에서 많은 매개변수들을 받고 있는데 이는 카프카에서 메세지를 전할때 메세지가 전송되는 방법을 알려주는 매개변수를 전달하기 때문이다.

  • 메세지가 전송될 토픽

  • 토픽 데이터를 사용하는 파티션(optional), 레코드 전송 키 (optional), 타임스탬프 (optional)

  • 페이로드
    → 토픽과 페이로드는 가장 중요한 매개변수이다. 파티션과 키는 send()와 sendDefault()에 매개변수로 제공되는 추가 정보이고 Template 사용에는 거의 영향을 주지 않는다.

→ ProducerRecord를 전송하는 send() 메서드도 있는데 이는 모든 선행하는 매개변수들을 하나의 객체로 담은 것이다.

→ Message 객체를 전송하는 send()도 있는데 이 경우는 예전처럼 우리 도메인 객체를 message 객체로 변환해야 한다.

→ 객체를 추가로 변환해야 하는 ProducerRecord와 Message보다 다른 것을 사용하는게 좋다.

@Service
public class KafkaOrderMessagingService implements OrderMessagingService {
    private KafkaTemplate<String, Order> kafkaTemplate;
    
    @Autowired
    public KafkaOrderMessagingService(KafkaTemplate<String, Order> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    
    @Override
    public void sendOrder(Order order) {
        kafkaTemplate.send("tacocloud.orders.topic", order);
    }

		// 기본으로 설정된 토픽으로 보내기 때문에 따로 명시를 안 했다. 
		@Override
		public void sendOrder(Order order) {
	    kafkaTemplate.sendDefault(order);
		}
}

→ tacocloud.orders.topic 이라는 이름의 토픽으로 order 객제가 전송되었다.

→ 2번째 sendOrder처럼 기본 토픽을 사용하려면 properties에 설정해두면 된다.

spring:
  kafka:
    template:
      default-topic: tacocloud.orders.topic

8.3.3 카프카 리스너 작성하기

이전의 JmsTemplate이나 RabbitTemplate은 receive와 receiveAndConvert등 메세지를 수신해주는 코드를 지원해줬지만 Kafka는 아무런 수신 코드를 제공해주고 있지 않다.

메세지 수신의 유일한 방식은 @KafkaListener 로 리스너를 작성해야 한다. 메세지가 오면 자동으로 메서드를 호출하는 푸시 방식으로만 메세지를 받을 수 있는 것이다.


@Component
public class OrderListener {
    private KitchenUI ui;
    
    @Autowired
    public OrderListener(KitchenUI ui) {
        this.ui = ui;
    }
    
    @KafkaListener(topics="tacocloud.orders.topic")
    public void handle(Order order) {
        ui.displayOrder(order);
    }
}

이 예제에서는 페이로드인 Order만 받고 있지만 필요하다면 ConsumerRecord나 Message 객체를 인자로 받을 수 있다.

밑의 예제는 받은 메세지가 있던 파티션과 타임스탬프를 로그에 찍기 위해 ConsumerRecord를 넘겼다.

@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, ConsumerRecord<Order> record) {
    log.info("Received from partition {} with timestamp {}",
             record.partition(), record.timestamp());
    ui.displayOrder(order);
}

CosumerRecord말고 Message 객체를 넘겨도 같은 일을 할 수 있다.

@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, Message<Order> message) {
    MessageHeaders headers = message.getHeaders();
    log.info("Received from partition {} with timestamp {}",
             headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)
             headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
    ui.displayOrder(order);
}

위 두 예제는 handle의 매개변수로 직접 Order를 요청했지만 ConsumerRecord와 Messge 객체를 넘겨줬다면 ConsumerRecord.value() 와 Message.getPayload() 를 사용해도 받을 수 있다.

요약

  • 애플리케이션 간 비동기 메시지 큐를 이용한 통신 방식은 간접 계층을 제공하므로 애플리케이션 간의 결합도는 낮추면서 확장성은 높인다.
  • 스프링은 JMS, RabbitMQ 또는 아파치 카프카를 사용해서 비동기 메시징을 지원한다.
  • 스프링 애플리케이션은 템플릿 기반의 클라이언트인 JmsTemplate, RabbitTemplate 또는 KafkaTemplate을 사용해서 메시지 브로커를 통한 메시지를 전송할 수 있다.
    ?? 질문 : 이번 카프카 전환 소스는 어디를 보면 알 수 있나요? (분석하고싶어졌습니다~)
  • 메시지 수신 애플리케이션은 같은 템플릿 기반의 클라이언트들을 사용해서 풀 모델 형태의 메시지 소비(가져오기)를 할 수 있다.
  • 메시지 리스너 애노테이션인 @JmsListener, @RabbitListener, @KafkaListener를 빈 메서드에 지정하면 푸시 모델의 형태로 컨슈머에게 메시지가 전송될 수 있다.
profile
Back-end Developer

0개의 댓글