RabbitMQ 사용 시 겪은 에러 모음입니다. 아직 미해결된 문제도 있어 해당 문제에 대한 해결 방법도 같이 알려주시면 감사하겠습니다!
📖 publisher에서 object 타입을 메시지에 담아 보낼 때 consumer에서 발생하는 에러
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message
TestAVo
로 보냄 → consumer에는 TestAVo
의 클래스가 없기 때문에 메시지에 들어 있는 object를 적합한 형태로 변환하지 못해 문제가 발생MSA에서 메시지를 주고 받을 때 공통으로 사용할 클래스 생성 => 여기서는 SagaEventMessage 클래스라는 custom class 사용
public class SagaEventMessage {
private String messageId;
private Map<String, Object> header = new HashMap<>();
private Object payload;
private String correlationId = "";
private Object correlationData = null;
public SagaEventMessage() {
this.messageId = UUID.randomUUID().toString();
}
public SagaEventMessage messageId(String messageId) {
this.messageId = messageId;
return this;
}
public SagaEventMessage header(Map<String, Object> header) {
this.header = header;
return this;
}
//..
public <T> T getPayloadAsType(Class<T> clazz) {
Gson gson = new Gson();
String payload = gson.toJson(this.payload);
return gson.fromJson(payload, clazz);
}
//..
@Override
public String toString() {
return "Message{" +
"messageId='" + messageId + '\'' +
", header=" + header +
", payload=" + payload +
", correlationId='" + correlationId + '\'' +
", correlationData=" + correlationData +
'}';
}
}
@Slf4j
@Component
@AllArgsConstructor
public class SagaEventPublisher {
private final String applicationName;
private final RabbitTemplate rabbitTemplate;
public void send(String routingKey, String replyKey, Object payload, String correlationId, Map<String, Object> header) {
try {
//parameter 값 정리 및 ack 여부 확인
};
SagaEventMessage sagaEventMessage = new SagaEventMessage().payload(payload).header(header).correlationId(id);
rabbitTemplate.convertAndSend("command." + applicationName, routingKey, sagaEventMessage);
} catch (AmqpException e) {
log.error("send AmqpException");
}
}
//메소드 오버로딩
public void send(String routingKey, Object payload, String correlationId, Map<String, Object> header) {
try {
//parameter 값 정리 및 ack 여부 확인
};
SagaEventMessage sagaEventMessage = new SagaEventMessage().payload(payload).header(header).correlationId(id);
rabbitTemplate.convertAndSend("command." + applicationName, routingKey, sagaEventMessage);
} catch (AmqpException e) {
log.error("send AmqpException");
}
}
}
@Slf4j
@Component
@RequiredArgsConstructor
public class ServiceEventHandler {
private final Gson gson;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = 큐이름, durable = "true"),
exchange = @Exchange(value = 익스체인지이름, type = "fanout")
))
public void method1(SagaEventMessage message) {
String payload = message.getPayload().toString();
//SagaEventMessge에 object 타입으로 넣어둔 payload(이게 본체)
//얘를 받는 게 consumer의 목적 -> 사용할 수 있는 String 타입으로 변환해서 사용
}
2021-12-27 19:34:16.952 INFO 1780 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2021-12-27 19:34:16.982 INFO 1780 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#158e9f6e:0/SimpleConnection@6986f93e [delegate=amqp://username@127.0.0.1:5672/, localPort= 59240]
Spring boot 가동 시 rabbitMQ connection이 정상적으로 진행되지 않고 수 차례 시도하는 현상 발생 → 수차례 커넥션 시도 후 Broker not available; cannot force queue declarations during start: java.io.IOException 로그 출력
현재 겪고 있는 문제
service A에서 test.exchange.publishA로 메시지 발행 → 메시지를 받아서 읽어야 하는 얘들은 service B와 C → 그런데 한 놈만 받아서 읽음
예상 : test.exchange.publishA에 하나의 큐만 바인딩 되어 있음 → B 또는 C가 해당 큐에서 메시지를 가져와 처리 → 큐에 남은 메시지가 없기 때문에 남은 녀석은 가만히 있게 됨
⇒ 큐가 하나이기 때문에 해당 큐에서 누군가 메시지를 받아서 처리하면 다른 녀석은 큐가 비게 되면서 처리 하지 못하는 멍때리는 현상 발생
해결점 :
큐를 2개로 나눈다면? ⇒ 현재 exchange type은 direct → direct에서 2개의 큐가 바인딩되어 있으면 2개의 큐에 메시지가 모두 들어갈까? 아니면 하나의 큐에만 들어가는 걸까? → direct 메시지여도 2개의 큐가 바인딩 될 수 있음
(사진 출처 : https://www.rabbitmq.com/tutorials/tutorial-four-python.html)
retry:
enabled: true
initial-interval: 3s
max-attempts: 2
max-interval: 6s
multiplier: 2
retry 속성은 먹히지 않음 → why?
dead letter queue 구현 중 retry 설정 값 수정을 통해 동작해야 하는데 이 부분에서 동작하지 않고 있음