
최근 카프카를 사용하여 메세지를 송, 수신 하는 모듈을 구현 중에 있다.
간단히 설명하면 프로듀서에서 저장을 한 데이터를 카프카를 통해 전달 하고, 컨슈머에서 이를 받아 데이터를 업데이트 시키는 작업을 하고 있다.
이 때 나는 분명 프로듀서를 통해서 메세지가 발행 된 것을 확인 하고 컨슈머에서도 해당 토픽에 메세지가 정상적으로 소비된 것을 확인 하였다.
그러나 컨슈머에서 이를 받아서 데이터를 업데이트 할 때 프로듀서에서 분명 저장 되었어야 할 데이터가 저장 되어있지 않아 업데이트를 하지 못했다.
결론부터 말하면 제목에서 이야기 했듯이 @TransactionalEventListener(phase = AFTER_COMMIT) 어노테이션의 부재로 인해서 문제가 발생했었다.
이번 글에서는 문제가 발생했던 이유와 해당 어노테이션의 역할에 대해서 이야기 해보려고 한다.
문제 발생 원인에 대해서 이야기 하기 전 예시 코드들을 먼저 이야기 하고자 한다.
@Service
@RequiredArgsConstructor
public class NotificationService {
private final ApplicationEventPublisher publisher;
private final RequestLogRepository requestLogRepository;
@Transactional
public void sendNotification(NotificationRequest request) {
RequestLog savedRequestLog = requestLogRepository.save(...);
NotificationMessage message = new NotificationMessage(
request.getRecipientToken(),
request.getTitle(),
request.getBody(),
savedRequestLog.getId()
);
publisher.publishEvent(message); // Kafka 전송 이벤트 발행
}
}
위 코드는 RequestLog 엔티티를 DB에 저장하고 관련 정보를 카프카에 메세지를 보내기 위해 DTO 메세지 객체를 생성해 이를 이벤트 핸들러로 보내는 역할을 한다.
@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationProducer {
private final KafkaTemplate<String, NotificationMessage> kafkaTemplate;
@EventListener
@Async
public void handleNotificationEvent(NotificationMessage message) {
kafkaTemplate.send("notification-topic", message)
.addCallback(
result -> log.info("Kafka message sent: {}", result.getRecordMetadata()),
error -> log.error("Kafka send failed", error)
);
}
}
이 코드는 위 코드에서 보낸 DTO 메세지 객체 를 받아서 이를 비동기로 카프카에 보내는 역할을 하는 코드이다.
@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationConsumer {
private final RequestLogRepository requestLogRepository;
@KafkaListener(
topics = "notification-topic",
groupId = "notification-group",
containerFactory = "notificationKafkaListenerContainerFactory"
)
@Transactional
public void consume(NotificationMessage message) {
log.info("Consumed message: {}", message.getTitle());
sendToFcm(message);
}
private void sendToFcm(NotificationMessage message) {
RequestLog logEntity = requestLogRepository.findById(message.getRequestLogId())
.orElse(null);
if (logEntity == null) {
return;
}
// 로직 처리
}
컨슈머에서는 프로듀서가 보낸 메세지를 받아 파싱하고, 이 중 RequestLog 엔티티를 DB에서 조회 해서 업데이트를 수행 하는 역할을 한다.
크게 복잡할 것 없는 플로우이다.
이 때 문제 상황을 코드의 흐름을 따라서 가보자.
현재 프로듀서에서 컨슈머까지 메세지는 정상적으로 보내졌음을 log를 통해서 확인할 수 있었다.
그리고 만약 업데이트나 로직을 처리 하는데 있어서 런타임 예외가 발생 했다면 현재 DLQ로 보내도록 설정이 되어있고, 로그도 남도록 해두었기에 빠르게 파악할 수 있었을 것이다.
그렇다면 남은 문제가 발생할 수 있는 부분은 아래 코드 뿐이다.
if (logEntity == null) {
return;
}
해당 부분에서 문제가 발생 했다는 것은 프로듀서에서 DB 저장간 문제가 발생 했다는 것이다.
그러나 카프카 메세지에서는 분명 ID(savedRequestLog.getId())를 반환하고 있었다.
어떤 문제가 발생 한 것일까
프로듀서 코드를 다시 살펴보자.
@Transactional
public void sendNotification(NotificationRequest request) {
RequestLog savedRequestLog = requestLogRepository.save(...);
NotificationMessage message = new NotificationMessage(
request.getRecipientToken(),
request.getTitle(),
request.getBody(),
savedRequestLog.getId()
);
publisher.publishEvent(message); // Kafka 전송 이벤트 발행
}
}
현재 엔티티를 저장 하고, 이를 관심사의 분리를 위해서 이벤트 리스너로 전달 하고 있다.
이를 트랜잭션 관점에서 보면 이벤트 리스너로 메세지를 전달하는 시점에 트랜잭션이 아직 커밋 되지 않았다.
그렇기에 프로듀서에서 엔티티 생성 후 아직 커밋 되지 않은 상태의 데이터의 ID 값을 이벤트 리스너를 통해 카프카에 전송했을 때
이를 컨슈머에서 해당 ID 값으로 조회할 때 DB에 아직 저장이 되어 있지 않아서 분기문으로 return 된 것이다.
위의 문제를 @TransactionalEventListener(phase = AFTER_COMMIT) 어노테이션을 통해서 해결 할 수 있다.
@TransactionalEventListener(phase = AFTER_COMMIT) 어노테이션은 트랜잭션이 성공적으로 커밋 된 이후 리스너 로직을 처리할 수 있도록 트랜잭션의 성공적인 수행을 보장해준다.
위의 문제를 한 문장으로 요약하면 다음과 같다.
아직 커밋 되지 않은 데이터를 이벤트 리스너를 통해 수행 할 때 발생하는 데이터 정합성 문제
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Async
public void handleNotificationEvent(NotificationMessage message) {
kafkaTemplate.send("notification-topic", message)
.addCallback(
result -> log.info("Kafka message sent: {}", result.getRecordMetadata()),
error -> log.error("Kafka send failed", error)
);
}
이를 어노테이션을 통해서 이벤트 호출 메서드에서 완벽히 트랜잭션을 커밋한 후 완전한 데이터를 카프카에 넘겨 문제를 해결 할 수 있었다.
이 어노테이션의 phase 인자 값은 다음의 4가지 옵션을 지정할 수 있다.