
DB작업과 메시지 발행을 하나의 묶음(트랜잭션)으로 처리하기 위한 패턴입니다.
DB에 어떤 값을 저장하고, 해당 값이 성공적으로 저장됐다는 이벤트를 외부에 전달하고 싶을 때 트랜잭셔널 아웃박스 패턴을 사용할 수 있습니다.
트랜잭셔널 아웃박스 패턴을 만들어보기 전에 kafka를 활용해 간단한 예시들을 살펴봅시다.
@Service
@RequiredArgsConstructor
public class MyService {
private final MyRepository myRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Transactional
public void save(String message) {
MyEntity entity = MyEntity.builder()
.name(UUID.randomUUID().toString())
.build();
myRepository.save(entity);
kafkaTemplate.send("temp-topic",message);
}
}
myRepository.save()와 카프카에 메시지를 발행하는 kafkaTemplate.send()가 하나의 @Transactional으로 묶여 있습니다.@Service
@RequiredArgsConstructor
public class MyService {
private final MyRepository myRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Transactional
public void save(String message) {
MyEntity entity = MyEntity.builder()
.name(UUID.randomUUID().toString())
.build();
myRepository.save(entity);
kafkaTemplate.send("temp-topic",message);
// 메시지 발행 후 에러 발생
throw new RuntimeException("Err");
}
}
@Service
@RequiredArgsConstructor
public class MyService {
private final MyRepository myRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Transactional
public void save(String message) {
MyEntity entity = MyEntity.builder()
.name(UUID.randomUUID().toString())
.build();
myRepository.save(entity);
kafkaTemplate.send("temp-topic",message);
}
}
카프카 컨테이너의 실행을 중지한 상태에서 실행해 봅시다. 카프카의 동작은 기본적으로 스프링의 트랜잭션과 아무런 관계가 없습니다. TransactionIdPrefix를 통해 Spring Transaction완료 이후에 카프카 이벤트가 커밋되도록 할 수 있지만 이 방법 역시 우리가 생각한대로 스프링 트랜잭션이 롤백됐으면 이벤트 자체가 발행되지 않는 건 아닙니다. 관련된 자세한 내용은 공식 문서와 다른 분이 정리하신 글을 참고하면 좋을거 같습니다.
위 예시를 살펴본 이유는 일반적으로 DB에 데이터를 저장하는 작업과 이벤트를 발행하는 작업이 하나의 묶음으로 처리되지 않는다는 걸 살펴보기 위해서 였습니다.

트랜잭셔널 아웃박스 패턴은 발행하고 싶은 메시지도 DB에 Write합니다. 그리고 메시지가 저장된 테이블을 Message Relay가 읽어 메시지를 발행합니다.
이전과 다른 점은 DB에 쓰기 때문에 @Transactional AOP를 활용할 수 있습니다. 메시지 발행 후 에러가 발생했을 때 두 테이블 모두 롤백되기 때문에 데이터 저장과 메시지 발행이 함께 동작함을 보장할 수 있습니다.
@Service
@RequiredArgsConstructor
public class MyService {
private final MyRepository myRepository;
private final OutboxRepository outboxRepository;
@Transactional
public void save(String message) {
MyEntity entity = MyEntity.builder()
.name(UUID.randomUUID().toString())
.build();
OutboxEntity outboxEntity = OutboxEntity.builder()
.message(message)
.build();
myRepository.save(entity);
outboxRepository.save(outboxEntity);
throw new RuntimeException("Err");
}
}
MessageRelay는 아웃박스 테이블의 데이터를 읽어와 카프카에 메시지를 발행하는 역할을 담당합니다. 아웃박스 테이블을 주기적으로 읽어오는 Polling publisher방식과 CDC를 활용하는 Transaction log tailing방식이 있습니다.
아래는 Polling publisher방식으로 간단한 MessageRelay를 구현한 코드입니다.
@Service
@RequiredArgsConstructor
public class MyMessageRelayService {
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Scheduled(fixedRate = 1000)
public void sendMessage() {
List<OutboxEntity> messages = outboxRepository.findAll();
messages.stream()
.filter(Predicate.not(OutboxEntity::isSend))
.forEach(outboxEntity -> {
kafkaTemplate.send("temp-topic", outboxEntity.getMessage());
outboxEntity.sendMessage();
});
outboxRepository.saveAll(messages);
}
}
간단히 모든 데이터를 가져온 뒤 상태값을 확인하는 방식으로 구현했지만 처음부터 상태값이 false인 데이터만 가져오거나, offset을 통해 가져올 데이터를 확인하는 방식으로 구현할 수 있습니다.
중요 비즈니스 로직은 모두 성공했지만 메시지 발송과 관련된 로직의 문제로 인해 중요 비즈니스 로직이 롤백될 가능성이 있습니다.
비즈니스 로직의 재시도를 최대한 피하고 싶거나, 메시지의 유실이 조금은 허용되는 상황이라면 트랜잭셔널 아웃박스 패턴을 쓰지 않는게 더 좋을 수 있습니다.
DB로부터 메시지를 읽어 브로커에 메시지를 Produce하는 Message Relay에 문제가 생기면 메시지 발행이 지연될 수 있습니다.
MessageRelay서비스가 여러 인스턴스로 동작할 때 메시지 중복 처리 문제가 발생할 수 있습니다.