시스템이 복잡해지면서 여러 이벤트가 함께 처리된다. 이때 이벤트들의 일관성
을 보장하기위해 카프카 트랜잭션이 필요하다. 우리가 잘 알고있는 데이터베이스에서의 트랜잭션 처럼 이벤트 모두 함께 처리 혹은 이벤트 모두 처리하지 않는 원자성이 보장되야한다.
다음 그림은 트랜잭션이 없는 카프카 이벤트 시스템이다. 해당 상황은 앨리스가 밥에게 10달러를 전송하는 상황이다.
1. 앨리스가 밥에게 10달러를 송금하는 이벤트를 컨슈머가 읽는다.
2. Funds Transfer App에서 balances topic으로 앨리스의 10달러를 출금 이벤트를 전송한다.
그런데, 이때 Funds Transfer App에서 장애
가 발생했고 컨슈머 그룹은 커밋되지 못했다. 그리고 서버를 새로 시작했다고 하자.
1. 새로 시작은 Funds Transfer App 서버는 다시 엘리스가 밥에게 10달러를 전송한 이벤트를 읽는다.
2. 다시 한 번 balances topic으로 앨리스의 10달러를 출금 이벤트를 전송한다.
3. 밥이 10달러 받은 이벤트를 입금 브로커의 balances topic으로 전송한다.
4. 컨슈머 오프셋이 커밋된다.
5. 컨슈머가 앨리스와 밥의 이벤트를 처리한다.
앨리스는 총 20달러를 송금하게 되었다.
트랜잭션의 부재로 일관성이 지켜지지 못했다.
transactional.id
는 프로듀서에서 설정되며 애플리케이션을 다시 시작할 때 트랜잭션 프로듀서를 식별할 수 있다.트랜잭션 아이디
, PID
, 에포크
를 가지고 __transaction_state 토픽에 기록힌다.토픽
과 파티션
을 코디네이터에게 알린다.
1. 새 인스턴스는 코디네이터로 PID에 대한 초기화 요청을 전송한다. 코디네이터는 보류 중인 트랜잭션이 있는 것을 확인하고 이전 트랜잭션의 영향을 받는 모든 파티션에 중단 마커를 추가한다. 또한 에포크도 생성힌다. 새 인스턴스는 PID와 새 에포크를 수신하고 정상적인 처리를 계속한다.
2. 컨슈머의 isolation.level
을 read_committed
로 설정하여 문제가 생긴 이벤트를 무시한다.
각 단계를 정상적으로 수행하면 트랜잭션 코디네이터 내부 _transaction_state 토픽과 _consumer_offsets 토픽에 커밋 마커를 추기한다. 소비자는 이 마커를 보고 이벤트를 처리한다.
시나리오를 크게 두 가지로 나눌 수 있겠다.
이렇게 사용하기 위해서는 카프카 프로듀서
에서 transaction-id-prefix
를 설정하고 카프카 컨슈머
에서 isolation.level
을 read_committed
로 변경해야한다.
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
스프링 이벤트
기반으로 처리해 보겠다.
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
를 사용하여 커밋 후 카프카 메시지를 샌딩하는 방법