@TransactionalEventListener(phase = AFTER_COMMIT) with KAFKA

Kevin·2025년 6월 17일
1

Spring

목록 보기
27/27
post-thumbnail

서론

최근 카프카를 사용하여 메세지를 송, 수신 하는 모듈을 구현 중에 있다.

간단히 설명하면 프로듀서에서 저장을 한 데이터를 카프카를 통해 전달 하고, 컨슈머에서 이를 받아 데이터를 업데이트 시키는 작업을 하고 있다.

이 때 나는 분명 프로듀서를 통해서 메세지가 발행 된 것을 확인 하고 컨슈머에서도 해당 토픽에 메세지가 정상적으로 소비된 것을 확인 하였다.

그러나 컨슈머에서 이를 받아서 데이터를 업데이트 할 때 프로듀서에서 분명 저장 되었어야 할 데이터가 저장 되어있지 않아 업데이트를 하지 못했다.

결론부터 말하면 제목에서 이야기 했듯이 @TransactionalEventListener(phase = AFTER_COMMIT) 어노테이션의 부재로 인해서 문제가 발생했었다.

이번 글에서는 문제가 발생했던 이유와 해당 어노테이션의 역할에 대해서 이야기 해보려고 한다.


본론

문제 발생 원인에 대해서 이야기 하기 전 예시 코드들을 먼저 이야기 하고자 한다.


카프카 프로듀서

@Service
@RequiredArgsConstructor
public class NotificationService {

    private final ApplicationEventPublisher publisher;
    
    private final RequestLogRepository requestLogRepository;

    @Transactional
    public void sendNotification(NotificationRequest request) {
		    
		    RequestLog savedRequestLog = requestLogRepository.save(...);
    
        NotificationMessage message = new NotificationMessage(
                request.getRecipientToken(),
                request.getTitle(),
                request.getBody(),
                savedRequestLog.getId()
        );

        publisher.publishEvent(message); // Kafka 전송 이벤트 발행
    }
}

위 코드는 RequestLog 엔티티를 DB에 저장하고 관련 정보를 카프카에 메세지를 보내기 위해 DTO 메세지 객체를 생성해 이를 이벤트 핸들러로 보내는 역할을 한다.

@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationProducer {

    private final KafkaTemplate<String, NotificationMessage> kafkaTemplate;

    @EventListener
    @Async
    public void handleNotificationEvent(NotificationMessage message) {
        kafkaTemplate.send("notification-topic", message)
            .addCallback(
                result -> log.info("Kafka message sent: {}", result.getRecordMetadata()),
                error -> log.error("Kafka send failed", error)
            );
    }
}

이 코드는 위 코드에서 보낸 DTO 메세지 객체 를 받아서 이를 비동기로 카프카에 보내는 역할을 하는 코드이다.


카프카 컨슈머

@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationConsumer {

    private final RequestLogRepository requestLogRepository;

    @KafkaListener(
        topics = "notification-topic",
        groupId = "notification-group",
        containerFactory = "notificationKafkaListenerContainerFactory"
    )
    @Transactional
    public void consume(NotificationMessage message) {
        log.info("Consumed message: {}", message.getTitle());

        sendToFcm(message);
    }

    private void sendToFcm(NotificationMessage message) {
        RequestLog logEntity = requestLogRepository.findById(message.getRequestLogId())
            .orElse(null);

        if (logEntity == null) {
            return;
        }

	      // 로직 처리
}

컨슈머에서는 프로듀서가 보낸 메세지를 받아 파싱하고, 이 중 RequestLog 엔티티를 DB에서 조회 해서 업데이트를 수행 하는 역할을 한다.

크게 복잡할 것 없는 플로우이다.

이 때 문제 상황을 코드의 흐름을 따라서 가보자.

현재 프로듀서에서 컨슈머까지 메세지는 정상적으로 보내졌음을 log를 통해서 확인할 수 있었다.

그리고 만약 업데이트나 로직을 처리 하는데 있어서 런타임 예외가 발생 했다면 현재 DLQ로 보내도록 설정이 되어있고, 로그도 남도록 해두었기에 빠르게 파악할 수 있었을 것이다.

그렇다면 남은 문제가 발생할 수 있는 부분은 아래 코드 뿐이다.

if (logEntity == null) {
    return;
}

해당 부분에서 문제가 발생 했다는 것은 프로듀서에서 DB 저장간 문제가 발생 했다는 것이다.

그러나 카프카 메세지에서는 분명 ID(savedRequestLog.getId())를 반환하고 있었다.

어떤 문제가 발생 한 것일까


@TransactionalEventListener(phase = AFTER_COMMIT)

프로듀서 코드를 다시 살펴보자.


    @Transactional
    public void sendNotification(NotificationRequest request) {
		    
		    RequestLog savedRequestLog = requestLogRepository.save(...);
    
        NotificationMessage message = new NotificationMessage(
                request.getRecipientToken(),
                request.getTitle(),
                request.getBody(),
                savedRequestLog.getId()
        );

        publisher.publishEvent(message); // Kafka 전송 이벤트 발행
    }
}

현재 엔티티를 저장 하고, 이를 관심사의 분리를 위해서 이벤트 리스너로 전달 하고 있다.

이를 트랜잭션 관점에서 보면 이벤트 리스너로 메세지를 전달하는 시점에 트랜잭션이 아직 커밋 되지 않았다.

그렇기에 프로듀서에서 엔티티 생성 후 아직 커밋 되지 않은 상태의 데이터의 ID 값을 이벤트 리스너를 통해 카프카에 전송했을 때

이를 컨슈머에서 해당 ID 값으로 조회할 때 DB에 아직 저장이 되어 있지 않아서 분기문으로 return 된 것이다.

위의 문제를 @TransactionalEventListener(phase = AFTER_COMMIT) 어노테이션을 통해서 해결 할 수 있다.

@TransactionalEventListener(phase = AFTER_COMMIT) 어노테이션은 트랜잭션이 성공적으로 커밋 된 이후 리스너 로직을 처리할 수 있도록 트랜잭션의 성공적인 수행을 보장해준다.

위의 문제를 한 문장으로 요약하면 다음과 같다.

아직 커밋 되지 않은 데이터를 이벤트 리스너를 통해 수행 할 때 발생하는 데이터 정합성 문제

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    @Async
    public void handleNotificationEvent(NotificationMessage message) {
        kafkaTemplate.send("notification-topic", message)
            .addCallback(
                result -> log.info("Kafka message sent: {}", result.getRecordMetadata()),
                error -> log.error("Kafka send failed", error)
            );
    }

이를 어노테이션을 통해서 이벤트 호출 메서드에서 완벽히 트랜잭션을 커밋한 후 완전한 데이터를 카프카에 넘겨 문제를 해결 할 수 있었다.

이 어노테이션의 phase 인자 값은 다음의 4가지 옵션을 지정할 수 있다.

  • AFTER_COMMIT (트랜잭션이 성공했을 때 실행)
  • AFTER_ROLLBACK (트랜잭션 롤백시 실행)
  • AFTER_COMPLETE 트랜잭션 완료시 (AFTER_COMMIT+AFTER_ROLLBACK)
  • BEFORE_COMMIT (트랜잭션 commit 되기전에)
profile
Hello, World! \n

0개의 댓글