메시지를 주고 받는 경우 데이터의 타입이 적절하지 않을 수도 있을 것입니다.
Spring web을 이용했던 경험이 있다면 Jakarta나 Javax의 Annotation을 이용해서 데이터 클래스의 필드에 조건을 걸어 두고, REST API의 핸들러 메소드의 파라미터에 @Valid를 이용하거나 Service 레이어 클래스에 @Validated와 같은 애노테이션을 붙여서 데이터의 검증을 하도록 한 애플리케이션을 본 적이 있을 수 있습니다.
spring cloud stream에서 메시지가 들어올 때 payload에 대해서 데이터를 검증할 필요가 있다면 어떻게 구현할 수 있을까요
현재는 애노테이션과 같이 편리한 방법은 어렵고 메시지를 처리하는 FunctionalInterface 내부의 로직에서 검증을 처리하도록 하는 방법이 있습니다.
이렇게 메시지의 데이터를 검증하고 타입이 적절하지 않을 때 DLQ(Dead Letter Queue)를 이용해서 에러 핸들링를 하는 과정을 진행해보고자 합니다.
@Valid와 같은 애노테이션을 활용한 검증 AOP를 추가하는 것은 @StreamListener를 이용했던 방식에서는 지원했지만 Functional 프로그래밍 방식에서는 따로 지원하지는 않습니다.
따라서 직접 Validator를 활용하도록 코드를 구현하여 작성할 수도 있습니다.
Spring context에서 제공하는 LocalValidateFactoryBean을 이용해서 검증하도록 Receiver의 receiverMessageHandler를 구현해 볼 수 있습니다.
receiver/src/main/java
com.github.questcollector.receiver
ReceiverMessageListener
package com.github.questcollector.receiver;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Validator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Function;
@Configuration
public class ReceiverMessageListener {
private final Logger log = LoggerFactory.getLogger(ReceiverMessageListener.class);
private final Validator validator;
public ReceiverMessageListener(Validator validator) {
this.validator = validator;
}
@Bean
public Function<MyMessage, MyMessage> receiverMessageHandler() {
return myMessage -> {
log.info("received message: " + myMessage.toString());
var violation = validator.validate(myMessage);
if (!violation.isEmpty()) {
throw new ConstraintViolationException(violation);
}
MyMessage newMessage = new MyMessage(
myMessage.id(), myMessage.content() + "2"
);
log.info("publish message to receiverPublish binding");
if (log.isDebugEnabled()) {
log.debug("payload: " + newMessage.toString());
}
return newMessage;
};
}
}
Validator의 validate() 메소드를 호출하면 필드 값이 조건에 부합하는지 검증하고, 조건에 합치되지 않은 사항의 개수만큼 ConstraintViolationImpl이 포함되어 있는 HashSet을 리턴합니다.
검증 결과가 적합하다면 아무 것도 들어있지 않은 HashSet을 리턴합니다.
이 HashSet을 담아서 ConstraintViolationException을 던지면 어떤 조건에 부합하지 않는지 메시지를 구성하여 예외를 던지게 됩니다.
조건은 jakarta의 애노테이션을 이용해서 구성할 수 있습니다.
MyMessage의 id 필드는 양수 값만 갖게 하고, content에는 빈 값이 들어갈 수 없도록 조건을 구성해 보도록 하겠습니다.
receiver/src/main/java
com.github.questcollector.receiver
MyMessage
package com.github.questcollector.receiver;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Positive;
public record MyMessage(
@Positive long id,
@NotBlank String content
) {}
이제 content에 값이 비어있거나 whitespace나 tab과 같은 기호만으로 구성된 문자열이 들어온다면 이후의 작업을 처리하지 않고 예외를 던지게 될 것입니다.
한편, 여러 개의 메시지 핸들러에서 validate 작업을 해야 한다면 AOP를 적용해 보는 것도 하나의 방법일 것입니다.
AOP를 적용하기 위해 receiver의 build.gradle에 spring-aop 라이브러리를 추가합니다.
receiver/build.gradle
...
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
//추가
implementation 'org.springframework.boot:spring-boot-starter-aop'
// spring cloud stream rabbitmq
implementation 'org.springframework.cloud:spring-cloud-starter-stream-rabbit:4.0.4'
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka:4.0.4'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder:4.0.4'
}
...
여러 가지 방식이 있을 수 있지만 Pointcut을 단순하게 잡기 위해 메시지 핸들러의 역할을 하는 FunctionalInterface를 Function, Consumer 등 java.util.function의 인터페이스를 상속하여 function 패키지 안에 만들어 보도록 하겠습니다.
receiver/src/main/java
com.github.questcollector.receiver.function
MessageHandlerFunction
package com.github.questcollector.receiver.function;
import java.util.function.Function;
@FunctionalInterface
public interface MessageHandlerFunction<T, R> extends Function<T, R> {}
데이터 검증 및 예외를 던지는 PayloadValidationAspect 클래스를 생성합니다.
MessageHandlerFunction 패키지가 들어 있는 com.github.questcollector.receiver.function 패키지에 들어 있는 함수형 인터페이스의 메소드가 실행되면 validatePayload() 메소드가 실행되어 payload의 검증을 수행하도록 구성합니다.
receiver/src/main/java
com.github.questcollector.receiver
PayloadValidationAspect
package com.github.questcollector.receiver;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Validator;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
@Component
@Aspect
public class PayloadValidationAspect {
private final Validator validator;
public PayloadValidationAspect(Validator validator) {
this.validator = validator;
}
@Around("execution(* com.github.questcollector.receiver.function.*.*(..))")
public Object validatePayload(ProceedingJoinPoint joinPoint) throws Throwable {
var payload = joinPoint.getArgs()[0];
var violation = validator.validate(payload);
if (!violation.isEmpty()) {
throw new ConstraintViolationException(violation);
}
return joinPoint.proceed(joinPoint.getArgs());
}
}
마지막으로 Function으로 등록되어 있던 메시지 핸들러 Bean의 데이터 타입을 MessageHandlerFunction으로 변경합니다.
receiver/src/main/java
com.github.questcollector.receiver
ReceiverMessageListener
package com.github.questcollector.receiver;
import com.github.questcollector.receiver.function.MessageHandlerFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ReceiverMessageListener {
private final Logger log = LoggerFactory.getLogger(ReceiverMessageListener.class);
@Bean
public MessageHandlerFunction<MyMessage, MyMessage> receiverMessageHandler() {
return myMessage -> {
log.info("received message: " + myMessage.toString());
MyMessage newMessage = new MyMessage(
myMessage.id(), myMessage.content() + "2"
);
log.info("publish message to receiverPublish binding");
if (log.isDebugEnabled()) {
log.debug("payload: " + newMessage.toString());
}
return newMessage;
};
}
}
앞서 만들었던 MessageHandlerFunction에 payload 검증 기능이 포함되어 있어도 처리가 가능할 것 같습니다.
receiver/src/main/java
com.github.questcollector.receiver.function
MessageHandlerFunction
package com.github.questcollector.receiver.function;
ipackage com.github.questcollector.receiver.function;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Validation;
import jakarta.validation.Validator;
import java.util.function.Function;
@FunctionalInterface
public interface MessageHandlerFunction<T, R> extends Function<T, R> {
Validator validator = Validation.buildDefaultValidatorFactory().getValidator();
@Override
default R apply(T t) {
validatePayload(t);
return doApply(t);
};
default void validatePayload(T t) {
var violation = validator.validate(t);
if (!violation.isEmpty()) {
throw new ConstraintViolationException(violation);
}
}
R doApply(T t);
}
Function을 상속한 apply() 메소드는 default 메소드로 만들어서 payload의 검증을 하고 실질적인 액션은 doApply()에서 진행하도록 구현할 수 있습니다.
이렇게 메시지를 전달받았을 때 데이터 검증 오류가 발생했을 경우 어떤 식으로 에러를 처리해야 할까요?
여러 방식이 있습니다. 오류가 난 메시지를 단순히 무시하는 방법이 있을 수 있고, 에러를 핸들링하도록 로직을 따로 작성할 수도 있습니다.
그리고 일반적인 방법으로 사용되는 DLQ(Dead-Letter Queue)를 사용하는 방법도 있습니다.
DLQ는 처리 중 에러가 난 메시지를 다시 메시지 브로커에 전달하여 저장하는 방식입니다. 이후 메시지 브로커에 전달된 메시지는 조건에 맞추어 다양한 방식으로 처리됩니다. 이를테면 다시 원래 메시지 소비자에게 일정 횟수만큼 반복해서 넘겨 주기도 하고, Dead Letter가 발생했다고 운영자에게 알림이 가게 처리할 수도 있습니다.
이번 과정에서는 3번정도 재전송한 다음 해결이 안되면 parkingLot 큐에 쌓아두도록 구성해 보도록 하겠습니다.
rabbitmq-management 서비스에 접속하여 DLQ 처리를 위한 exchange와 queue를 생성하도록 합니다.
exchange 탭에서 test1.DLX의 이름을 가진 direct exchange를 생성합니다.

queue 탭에서 test1.dlq라는 이름의 queue를 생성합니다.

test1.DLX로부터 test1.test1의 routing key로 들어오는 메시지가 test1.dlq 큐에 들어올 수 있도록 binding을 설정합니다.

이렇게 dead letter queue에 대한 자원들을 생성한 다음 receiver의 application.properties에 DLQ에 대한 설정들을 추가합니다.
receiver/src/main/resources/application.properties
server.port=18082
# RabbitMQ
spring.rabbitmq.addresses=${RABBITMQ_HOST:localhost}:${RABBITMQ_PORT:5672}
spring.rabbitmq.username=${RABBITMQ_USER:guest}
spring.rabbitmq.password=${RABBITMQ_PASSWORD:guest}
# Kafka
spring.cloud.stream.kafka.binder.brokers=${KAFKA_HOST:localhost}:${KAFKA_PORT:9092}
# spring cloud stream
spring.cloud.function.definition=receiverMessageHandler
spring.cloud.stream.bindings.receiverMessageHandler-in-0.binder=rabbit
spring.cloud.stream.bindings.receiverMessageHandler-in-0.destination=test1
spring.cloud.stream.bindings.receiverMessageHandler-in-0.group=test1
# 추가
spring.cloud.stream.rabbit.bindings.receiverMessageHandler-in-0.consumer.deadLetterExchange=test1.DLX
spring.cloud.stream.rabbit.bindings.receiverMessageHandler-in-0.consumer.deadLetterQueueName=test1.dlq
spring.cloud.stream.rabbit.bindings.receiverMessageHandler-in-0.consumer.deadLetterRoutingKey=test1.test1
spring.cloud.stream.bindings.receiverMessageHandler-out-0.binder=kafka
spring.cloud.stream.bindings.receiverMessageHandler-out-0.destination=test2
이제 producer에서 DLQ에서의 처리를 구현하도록 합니다.
DLQ를 활용한 에러 핸들링은 다양한 방식으로 활용되기 때문에 spring cloud stream RabbitMQ binder에서는 DLQ의 생성 등만 처리할 뿐 구체적인 구현을 프레임워크에서 다루지는 않습니다.
DLQ의 구현은 Spring-rabbit을 활용하여 작성하였습니다.
producer/src/main/java
com.github.questcollector.producer
ProducerPublishDLQHandler
package com.github.questcollector.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import java.util.Optional;
@Configuration
public class ProducerPublishDLQHandler {
private static final String DLQ_QUEUE = "test1.dlq";
private static final String ORIGINAL_EXCHANGE = "test1";
public static final String X_RETRIES_HEADER = "x-retries";
public static final String DLX = "test1.DLX";
public static final String PARKING_LOT = "parkingLot";
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final RabbitTemplate rabbitTemplate;
public ProducerPublishDLQHandler(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
// parkingLot queue 생성
this.rabbitTemplate.execute(channel -> {
String parkingLotQueueName = "%s.%s".formatted(ORIGINAL_EXCHANGE, PARKING_LOT);
channel.queueDeclare(parkingLotQueueName, true, false, false, null);
channel.queueBind(parkingLotQueueName, DLX, PARKING_LOT, null);
return true;
});
}
@RabbitListener(queues = DLQ_QUEUE)
public void logFailedMessage(Message failedMessage) {
// 알림 및 로깅
// 알림 로직 구현...
log.info(failedMessage.getMessageProperties().getHeaders().toString());
// x-retries에서 값 가져오기
Integer retryCount = Optional.ofNullable(
(Integer) failedMessage.getMessageProperties().getHeader(X_RETRIES_HEADER)
).orElse(0);
log.info("x-retries: " + retryCount);
// 3번까지 재전송 해보기
if (retryCount < 3) {
failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retryCount + 1);
failedMessage.getMessageProperties().getHeaders().remove("x-exception-stacktrace");
this.rabbitTemplate.send(ORIGINAL_EXCHANGE, "", failedMessage);
}
// 그래도 다시 DLQ로 전송되었다면 따로 저장하기
else {
log.info("send failedMessage to parkingLot queue");
this.rabbitTemplate.send(DLX, PARKING_LOT, failedMessage);
}
}
}
초기에 test1.parkingLot 큐를 생성하고 test1.DLX exchange와 parkingLot routing key로 바인딩을 구성합니다.
@RabbitListener로 DLQ에 실패 메시지가 들어오면, x-retries 헤더의 값을 기준으로 3번까지 재전송 한 다음, 4번째로 DLQ에 메시지가 들어오면 test1.parkingLot 큐로 메시지를 전송합니다.
한편 spring cloud stream kafka binder에서는 enableDlq, dlqName 옵션을 이용해서 dlq를 지정할 수 있습니다.
producer/src/main/resources/application.properties
server.port=18081
# RabbitMQ
spring.rabbitmq.addresses=${RABBITMQ_HOST:localhost}:${RABBITMQ_PORT:5672}
spring.rabbitmq.username=${RABBITMQ_USER:guest}
spring.rabbitmq.password=${RABBITMQ_PASSWORD:guest}
# Kafka
spring.cloud.stream.kafka.binder.brokers=${KAFKA_HOST:localhost}:${KAFKA_PORT:9092}
# spring cloud stream
spring.cloud.function.definition=producerConsume
spring.cloud.stream.bindings.producerPublish-out-0.binder=rabbit
spring.cloud.stream.bindings.producerPublish-out-0.destination=test1
spring.cloud.stream.bindings.producerConsume-in-0.binder=kafka
spring.cloud.stream.bindings.producerConsume-in-0.destination=test2
spring.cloud.stream.bindings.producerConsume-in-0.group=test2
# 추가
spring.cloud.stream.kafka.bindings.producerConsume-in-0.consumer.enableDlq=true
spring.cloud.stream.kafka.bindings.producerConsume-in-0.consumer.dlqName=test2.dlq