[RabbitMQ] Exchange - fan out, direct

Kim Dae Hyun·2022년 2월 16일
0

RabbitMQ

목록 보기
1/5
post-custom-banner

Exchange 는 자신에게 바인딩 된 Queue에 대해 ExchangeType에 따라 메시지를 전달하는 역할을 수행한다.

Exchange Type은 4가지인데 그 중 2가지를 알아보고 직접 사용해보자.

  • fan out
  • direct

📌 fan out

fan out브로드케스팅과 동일하다.

fan out 타입으로 생성된 Exchange는 자신에게 바인딩 된 모든 Queue로 메시지를 전달한다.

x.Ex라는 이름으로 Exchangefan out 타입으로 생성했다고 가정하고 x.Ex.a, x.Ex.b 두 개 큐를 바인딩 시켰다고 해보자.

아래와 비슷한 형태이다.

x.Ex로 10개 메시지가 전달됐다.

이 10개 메시지는 x.Ex.a로 10개, x.Ex.b로 10개 전달된다.
Queue를 구독하고 있는 Consumer는 그대로 가져다 쓰면 된다.

코드로 간단하게 구현해보자.

📌 fan out 테스트

Exchange 생성

name: x.Ex
Type: fanout

두 개 Queue 생성

name: x.Ex.a
Type: Classic

name: x.Ex.b
Type: Classic


생성된 두 개 Queue를 Exchange에 바인딩

바인딩 된 모든 Queue로 전달되기 때문에 바인딩 된Queue를 구분하기 위해 사용되는 Routing key는 설정하지 않는다.

테스트를 위한 Producer 작성

전달받은 message를 그대로 x.Ex Exchange로 전달하고 있다.
convertAndSend의 두번째 파라미터는 routing key로 브로드케스팅에 해당하는 fan out에는 사용되지 않는다. (null)

@Service
public class TestProducer {

    private static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
    private final RabbitTemplate rabbitTemplate;

    public TestProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(String message) {
        logger.info("producer message = {}", message);
        rabbitTemplate.convertAndSend("x.Ex", null, message);
    }
}

Producer 실행 테스트

구현한 Producerfan out으로 제대로 동작하는지 테스트한다.

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{

	public static void main(String[] args) {
		SpringApplication.run(ProducerApplication.class, args);
	}

	private final TestProducer testProducer;

	public ProducerApplication(TestProducer testProducer) {
		this.testProducer = testProducer;
	}

	@Override
	public void run(String... args) throws Exception {
		for(int i=0;i<10;i++) {
			testProducer.sendMessage("message" + i);
		}
	}
}

10개 메시지를 Producer에 전달하므로 x.Ex에 바인딩 된 모든 queue로 10개 메시지가 전달되어야 한다.


실행결과 두 큐에 10개식 메시지가 큐잉된 것을 확인할 수 있다.
이제 Consumer를 구현하고 두 큐에서 메시지를 정상적으로 받아올 수 있는지 확인한다.

테스트를 위한 Consumer 구현

큐에서 메시지를 받아와서 로그를 찍는 단순한 Consumer이다.

@Service
public class TestAConsumer {

    private static final Logger logger = LoggerFactory.getLogger(TestAConsumer.class);

    @RabbitListener(queues = {"x.Ex.a"})
    public void listener(String message) {
        logger.info("x.Ex.a message={}", message);
    }
}
@Service
public class TestBConsumer {

    private static final Logger logger = LoggerFactory.getLogger(TestBConsumer.class);

    @RabbitListener(queues = {"x.Ex.b"})
    public void listener(String message) {
        logger.info("x.Ex.b message={}", message);
    }
}

Consumer에 해당하는 서비스를 구동했을 때 총 20개 메시지가 로그로 찍혀야 한다.

Start!!

x.Ex에 바인딩 된 두 큐에서 10개씩 메시지를 읽은 결과이다.

fan out -> 브로드케스팅
Exchange에 바인딩 된 모든 Queue로 메시지를 전달한다.


📌 Direct

Direct 타입으로 된 ExchangeQueue를 바인딩 하려면 Routing key 값을 설정해줘야 한다.

바인딩 된 QueueRouting key와 매칭되는 Queue로만 메시지를 보내기 위함이다.

바인딩 된 모든 Queue로 메시지를 보내는 fan out 과는 다르다.

이전과 비슷한 방식으로 Direct의 동작을 테스트한다.

Exchange 생성

name: x.Ex
Type: direct

두 개 Queue 생성

name: x.Ex.a
Type: Classic

name: x.Ex.b
Type: Classic

생성된 두 개 Queue를 Exchange에 바인딩

바인딩 시 각 Queue를 식별할 수 있는 routing key를 설정한다.

message로 전달할 Dto 작성

이번에는 java object를 JSON 포멧으로 바꿔서 전달한다.

이를 위한 Dto 클래스를 하나 작성한다.
type에 따라서 어떤 queue로 메시지를 전달할지 결정할 것이다.

public class TestDto {

    private String name;
    private String type;

    public TestDto() {
    }

    public TestDto(String name, String type) {
        this.name = name;
        this.type = type;
    }
    
    // getter, setter, toString 생략...
}    

Producer 작성

ObjectMapper를 이용해서 obejct를 Json형태로 변환한다.

convertAndSend 에서 dto의 type을 이용해서 routing key를 설정하고 있다. 외부에서 dto의 type을 세팅해서 넘겨줄 때 keyA, keyB 두 값만 넘어온다고 가정한다.

@Service
public class TestProducer {

    private static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
    private final RabbitTemplate rabbitTemplate;
    private final ObjectMapper mapper;

    public TestProducer(RabbitTemplate rabbitTemplate, ObjectMapper mapper) {
        this.rabbitTemplate = rabbitTemplate;
        this.mapper = mapper;
    }

    public void sendMessage(TestDto dto) throws JsonProcessingException {
        logger.info("producer message = {}", dto);
        String dtoJson = mapper.writeValueAsString(mapper);
        
        rabbitTemplate.convertAndSend("x.Ex", dto.getType(), dtoJson);
    }
}

Producer 동작 테스트

구현한 Producer가 의도한 대로 동작하는지 테스트한다.

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{

	public static void main(String[] args) {
		SpringApplication.run(ProducerApplication.class, args);
	}

	private final TestProducer testProducer;

	public ProducerApplication(TestProducer testProducer) {
		this.testProducer = testProducer;
	}

	@Override
	public void run(String... args) throws Exception {
		for(int i=0;i<5;i++) {
			testProducer.sendMessage(new TestDto("name" + i, "keyA"));
		}
		for(int i=0;i<5;i++) {
			testProducer.sendMessage(new TestDto("name" + i, "keyB"));
		}
	}
}

typekeyA인 메시지 5개와 keyB인 메시지 5개를 생성하고 있다.

x.Ex.ax.Ex.b에 각각 5개의 메시지가 들어가야 한다.


실행결과 각 큐에 5개씩 메시지가 큐잉된 것을 확인할 수 있다.

Consumer 테스트

이전과 동일한 Consumer 클래스를 빈으로 등록하고 실행하면 각자의 큐에서 5개의 메시지를 읽어올 것이다.

direct -> routing key로 queue를 구분
routing key와 매칭되는 queue에만 메시지를 전달한다.

profile
좀 더 천천히 까먹기 위해 기록합니다. 🧐
post-custom-banner

0개의 댓글