Exchange
는 자신에게 바인딩 된 Queue
에 대해 Exchange
의 Type
에 따라 메시지를 전달하는 역할을 수행한다.
Exchange Type
은 4가지인데 그 중 2가지를 알아보고 직접 사용해보자.
fan out
은 브로드케스팅
과 동일하다.
fan out
타입으로 생성된 Exchange
는 자신에게 바인딩 된 모든 Queue
로 메시지를 전달한다.
x.Ex
라는 이름으로 Exchange
를 fan out
타입으로 생성했다고 가정하고 x.Ex.a
, x.Ex.b
두 개 큐를 바인딩 시켰다고 해보자.
아래와 비슷한 형태이다.
x.Ex
로 10개 메시지가 전달됐다.
이 10개 메시지는 x.Ex.a
로 10개, x.Ex.b
로 10개 전달된다.
각 Queue
를 구독하고 있는 Consumer
는 그대로 가져다 쓰면 된다.
코드로 간단하게 구현해보자.
name: x.Ex
Type: fanout
name: x.Ex.a
Type: Classic
name: x.Ex.b
Type: Classic
바인딩 된 모든 Queue
로 전달되기 때문에 바인딩 된Queue
를 구분하기 위해 사용되는 Routing key
는 설정하지 않는다.
전달받은 message
를 그대로 x.Ex
Exchange로 전달하고 있다.
convertAndSend
의 두번째 파라미터는 routing key
로 브로드케스팅에 해당하는 fan out
에는 사용되지 않는다. (null)
@Service
public class TestProducer {
private static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
private final RabbitTemplate rabbitTemplate;
public TestProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(String message) {
logger.info("producer message = {}", message);
rabbitTemplate.convertAndSend("x.Ex", null, message);
}
}
구현한 Producer
가 fan out
으로 제대로 동작하는지 테스트한다.
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
private final TestProducer testProducer;
public ProducerApplication(TestProducer testProducer) {
this.testProducer = testProducer;
}
@Override
public void run(String... args) throws Exception {
for(int i=0;i<10;i++) {
testProducer.sendMessage("message" + i);
}
}
}
10개 메시지를 Producer
에 전달하므로 x.Ex
에 바인딩 된 모든 queue
로 10개 메시지가 전달되어야 한다.
실행결과 두 큐에 10개식 메시지가 큐잉된 것을 확인할 수 있다.
이제 Consumer
를 구현하고 두 큐에서 메시지를 정상적으로 받아올 수 있는지 확인한다.
큐에서 메시지를 받아와서 로그를 찍는 단순한 Consumer
이다.
@Service
public class TestAConsumer {
private static final Logger logger = LoggerFactory.getLogger(TestAConsumer.class);
@RabbitListener(queues = {"x.Ex.a"})
public void listener(String message) {
logger.info("x.Ex.a message={}", message);
}
}
@Service
public class TestBConsumer {
private static final Logger logger = LoggerFactory.getLogger(TestBConsumer.class);
@RabbitListener(queues = {"x.Ex.b"})
public void listener(String message) {
logger.info("x.Ex.b message={}", message);
}
}
Consumer
에 해당하는 서비스를 구동했을 때 총 20개 메시지가 로그로 찍혀야 한다.
Start!!
x.Ex
에 바인딩 된 두 큐에서 10개씩 메시지를 읽은 결과이다.
fan out -> 브로드케스팅
Exchange에 바인딩 된 모든 Queue로 메시지를 전달한다.
Direct
타입으로 된 Exchange
에 Queue
를 바인딩 하려면 Routing key
값을 설정해줘야 한다.
바인딩 된 Queue
중 Routing key
와 매칭되는 Queue
로만 메시지를 보내기 위함이다.
바인딩 된 모든 Queue
로 메시지를 보내는 fan out
과는 다르다.
이전과 비슷한 방식으로 Direct
의 동작을 테스트한다.
name: x.Ex
Type: direct
name: x.Ex.a
Type: Classic
name: x.Ex.b
Type: Classic
바인딩 시 각 Queue
를 식별할 수 있는 routing key
를 설정한다.
이번에는 java object를 JSON 포멧으로 바꿔서 전달한다.
이를 위한 Dto 클래스를 하나 작성한다.
type
에 따라서 어떤 queue
로 메시지를 전달할지 결정할 것이다.
public class TestDto {
private String name;
private String type;
public TestDto() {
}
public TestDto(String name, String type) {
this.name = name;
this.type = type;
}
// getter, setter, toString 생략...
}
ObjectMapper
를 이용해서 obejct를 Json형태로 변환한다.
convertAndSend
에서 dto의 type을 이용해서 routing key
를 설정하고 있다. 외부에서 dto의 type을 세팅해서 넘겨줄 때 keyA
, keyB
두 값만 넘어온다고 가정한다.
@Service
public class TestProducer {
private static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
private final RabbitTemplate rabbitTemplate;
private final ObjectMapper mapper;
public TestProducer(RabbitTemplate rabbitTemplate, ObjectMapper mapper) {
this.rabbitTemplate = rabbitTemplate;
this.mapper = mapper;
}
public void sendMessage(TestDto dto) throws JsonProcessingException {
logger.info("producer message = {}", dto);
String dtoJson = mapper.writeValueAsString(mapper);
rabbitTemplate.convertAndSend("x.Ex", dto.getType(), dtoJson);
}
}
구현한 Producer
가 의도한 대로 동작하는지 테스트한다.
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
private final TestProducer testProducer;
public ProducerApplication(TestProducer testProducer) {
this.testProducer = testProducer;
}
@Override
public void run(String... args) throws Exception {
for(int i=0;i<5;i++) {
testProducer.sendMessage(new TestDto("name" + i, "keyA"));
}
for(int i=0;i<5;i++) {
testProducer.sendMessage(new TestDto("name" + i, "keyB"));
}
}
}
type
이 keyA
인 메시지 5개와 keyB
인 메시지 5개를 생성하고 있다.
x.Ex.a
와 x.Ex.b
에 각각 5개의 메시지가 들어가야 한다.
실행결과 각 큐에 5개씩 메시지가 큐잉된 것을 확인할 수 있다.
이전과 동일한 Consumer 클래스를 빈으로 등록하고 실행하면 각자의 큐에서 5개의 메시지를 읽어올 것이다.
direct -> routing key로 queue를 구분
routing key와 매칭되는 queue에만 메시지를 전달한다.