🚨해당 글은 Spring Kafka에 대한 사용방법과 실제 업무의 경험을 곁들인 적용법을 다루고 있습니다.
Apache Kafka에 대한 내용은 다루지 않고 있음을 참고부탁드립니다.
어느덧 개발을 시작한지 만 5년이 지나갔고 나를 되돌아보니 공부에 소홀해지고 있다는 생각이 마구마구들어 다시 마음을 다잡고 새로운 마음가짐으로 처음부터 다시 차근차근 내가 알고있는 것들을 정리해보자라는 생각에 무엇부터 해볼까 생각해보던중 MSA 구조에서 빠질 수 없는 메세지 브로커 Kafka에 대해 정리를 해야겠다 결정하였다.
초반에는 Spring kafka문서에 설명되어 있는 Producer에 대해서 설명하고 이후 Kafka Transaction에 대해서 설명할 예정이다.
역시나 기본적인 설정 방법은 공식문서에 친절하게 설명되어있다.
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
Producer의 설정을 Map으로 구성하여 ProducerFactory를 생성한 이후 KafkaTemplate를 만들어 해당 객체를 통해서 Message를 전송하면 된다.
비동기 전송 방식 - Non Blocking (Async)
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
CompletableFuture<SendResult<Integer, String>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
}
else {
handleFailure(data, record, ex);
}
});
}
동기 전송 방식 - Blocking (Sync)
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
KafkaTemplate의 Message전송은 기본적으로 CompletableFuture 비동기로 진행되며 위와 같이 Blocking 구현을 통해 동기식으로 전송도 가능하다.
In version
3.0, the methods that previously returnedListenableFuturehave been changed to returnCompletableFuture. To facilitate the migration, the 2.9 version added a methodusingCompletableFuture()which provided the same methods withCompletableFuturereturn types; this method is no longer available.
버전 3.0에서의 변경점으로 기존 리턴 타입이 ListenableFuture에서 CompletableFuture으로 변경되었다고 하니 참고하자.
Apache Kafka에서는 ProducerInterceptor를 제공하며 3.0부터는 Spring Bean으로 관리가 가능하다.
onSend
ProducerRecord가 publish되기 이전에 실행되며 ProducerRecord에 접근가능하며 수정이 가능하다.
onAcknowledgement
Message 전송 결과에 따라 메소드가 호출된다. 정상 전송된 경우 Exception이 null이며 전송 실패의 경우 Exception에 해당 예외가 전달되어 호출된다.
@Slf4j
@Component
public class KafkaProducerInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
log.info("message body -> {}", record.value());
log.info("message header -> {}", record.headers());
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
log.info("topic -> {}", metadata.topic());
log.info("partition -> {}", metadata.partition());
if (exception != null) {
log.error("error ->{}", exception);
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(KafkaProducerInterceptor kafkaProducerInterceptor) {
KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory());
kafkaTemplate.setProducerInterceptor(kafkaProducerInterceptor);
return kafkaTemplate;
}
Spring Kafka에서는 Producer 전송 결과를 수신하는 ProducerListener를 제공한다.
onSuccess
Message가 전송된 이후 정상 처리된 경우 호출된다.
onError
Message가 전송에 실패한 경우 호출된다.
@Component
@Slf4j
public class KafkaProducerListener implements ProducerListener {
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
ProducerListener.super.onSuccess(producerRecord, recordMetadata);
log.info("message body -> {}", producerRecord.value());
log.info("message header -> {}", producerRecord.headers());
log.info("message topic -> {}", recordMetadata.topic());
log.info("message offset -> {}", recordMetadata.offset());
}
@Override
public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
ProducerListener.super.onError(producerRecord, recordMetadata, exception);
log.info("message body -> {}", producerRecord.value());
log.info("message header -> {}", producerRecord.headers());
log.info("message topic -> {}", recordMetadata.topic());
log.info("message offset -> {}", recordMetadata.offset());
log.error("message exception -> {}", exception);
}
}
public KafkaTemplate<String, String> kafkaTemplate2(KafkaProducerInterceptor kafkaProducerInterceptor, KafkaProducerListener kafkaProducerListener) {
KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory2());
kafkaTemplate.setProducerListener(kafkaProducerListener);
return kafkaTemplate;
}
이제부터는 프로젝트에서 Kafka Producer를 사용하면서 Transaction 관련하여 이슈가 있었던 부분들에 대해 정리를 해보겠다.
백엔드의 로직은 Transaction과의 싸움이라고 생각한다. 다음 그림과 시나리오 같이 이벤트의 정의는 Zero Payload 방식으로 사용자 등록 이벤트 를 발행하여 타 서비스들이 해당 이벤트를 구독하고 있다고 가정해보겠다.

- 사용자 서비스에 사용자가 등록된다.
- 사용자 서비스에서
사용자 등록 이벤트를 발행한다.- 이를 구독하고 있는 서비스는 Zero Payload로 구성된
사용자 등록 이벤트에 포함된userId로 사용자 서비스를 조회하여 각 서비스의 로직을 구현했다.
사용자 서비스의 로직은 다음과 같을것이다.
@Transactional
public void case1() {
User user1 = new User(UUID.randomUUID().toString(), 1);
userRepository.save(user1);
kafkaTemplate.send("TEST1", new UserEvent(EventType.Created, user1.getId()));
... // 엄청 복잡 오래걸리는 로직
}
위와 같은 코드에서 기본 Producer 방식으로 해당 시나리오가 정상적으로 동작할것 같은가?
정답은 문제가 발생한다 이다.
문제가 발생하는 부분은 kafkaTemplate.send와 Spring Transaction의 실행 시점 차이이다.
- 사용자 서비스에서
사용자 저장이후사용자 등록 이벤트발행- 하지만 아직
Spring Transaction Commit이전 상황이기 때문에DB Commit이 발생하지 않았지만사용자 등록 이벤트를 발행사용자 등록 이벤트를 구독하고 있는 상품, 주문 서비스에서 해당userId조회시 예외 발생!!

위와 같이 Spring Transaction이후 이벤트 발행을 하기 위해서는 Kafka Transaction를 설정해야한다. 이 역시 공식문서에 상세히 설명이 나와있다.
@Bean
public DefaultKafkaProducerFactory<String, String> producerFactory() {
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
producerFactory.setTransactionIdPrefix("tx-");
return producerFactory;
}
DefaultKafkaProducerFactory에 TransactionIdPrefix를 설정하게 된다면 Kafka Transaction으로 동작하게되며 이벤트는 kafkaTemplate.send 라인에서 바로 발행되지만 Spring Transaction이후 해당 이벤트가 Commit된다.
Kafka Transaction 설정 이전2023-12-23T14:29:13.360+09:00 TRACE 31664 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [io.velog.youmakemesmile.kafka.ProducerController.case1]
2023-12-23T14:29:13.361+09:00 TRACE 31664 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2023-12-23T14:29:13.364+09:00 TRACE 31664 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2023-12-23T14:29:13.365+09:00 TRACE 31664 --- [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=io.velog.youmakemesmile.kafka.UserEvent@492bc113, timestamp=null)
2023-12-23T14:29:13.365+09:00 TRACE 31664 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@18f60398] send(ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=io.velog.youmakemesmile.kafka.UserEvent@492bc113, timestamp=null))
2023-12-23T14:29:13.365+09:00 TRACE 31664 --- [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 118, 101, 108, 111, 103, 46, 121, 111, 117, 109, 97, 107, 101, 109, 101, 115, 109, 105, 108, 101, 46, 107, 97, 102, 107, 97, 46, 85, 115, 101, 114, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=io.velog.youmakemesmile.kafka.UserEvent@492bc113, timestamp=null)
2023-12-23T14:29:13.368+09:00 TRACE 31664 --- [ad | producer-1] o.s.kafka.core.KafkaTemplate : Sent ok: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 118, 101, 108, 111, 103, 46, 121, 111, 117, 109, 97, 107, 101, 109, 101, 115, 109, 105, 108, 101, 46, 107, 97, 102, 107, 97, 46, 85, 115, 101, 114, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=io.velog.youmakemesmile.kafka.UserEvent@492bc113, timestamp=null), metadata: TEST1-0@211
2023-12-23T14:29:13.368+09:00 TRACE 31664 --- [ad | producer-1] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@18f60398] close(PT5S)
2023-12-23T14:29:16.366+09:00 TRACE 31664 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [io.velog.youmakemesmile.kafka.ProducerController.case1]
모든 Transacation이 종료되기 이전에 Sent: ProducerRecord Kafka 이벤트를 발행하는 것을 확인 할 수 있다.
Kafka Transaction 설정 이후2023-12-23T14:32:16.832+09:00 TRACE 31707 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [io.velog.youmakemesmile.kafka.ProducerController.case1]
2023-12-23T14:32:16.832+09:00 TRACE 31707 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2023-12-23T14:32:16.836+09:00 TRACE 31707 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2023-12-23T14:32:16.836+09:00 DEBUG 31707 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@724c462c] beginTransaction()
2023-12-23T14:32:16.836+09:00 TRACE 31707 --- [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=io.velog.youmakemesmile.kafka.UserEvent@70d7f904, timestamp=null)
2023-12-23T14:32:16.836+09:00 TRACE 31707 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@724c462c] send(ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=io.velog.youmakemesmile.kafka.UserEvent@70d7f904, timestamp=null))
2023-12-23T14:32:16.836+09:00 TRACE 31707 --- [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 118, 101, 108, 111, 103, 46, 121, 111, 117, 109, 97, 107, 101, 109, 101, 115, 109, 105, 108, 101, 46, 107, 97, 102, 107, 97, 46, 85, 115, 101, 114, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=io.velog.youmakemesmile.kafka.UserEvent@70d7f904, timestamp=null)
2023-12-23T14:32:16.842+09:00 TRACE 31707 --- [| producer-tx-0] o.s.kafka.core.KafkaTemplate : Sent ok: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 118, 101, 108, 111, 103, 46, 121, 111, 117, 109, 97, 107, 101, 109, 101, 115, 109, 105, 108, 101, 46, 107, 97, 102, 107, 97, 46, 85, 115, 101, 114, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=io.velog.youmakemesmile.kafka.UserEvent@70d7f904, timestamp=null), metadata: TEST1-0@214
2023-12-23T14:32:19.841+09:00 TRACE 31707 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [io.velog.youmakemesmile.kafka.ProducerController.case1]
2023-12-23T14:32:19.859+09:00 DEBUG 31707 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@724c462c] commitTransaction()
2023-12-23T14:32:19.864+09:00 TRACE 31707 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@724c462c] close(PT5S)
위에 로그를 보면 기대와 다르게 무엇인가 이상하다!! Kafka Transaction 설정 이전과 같이 모든 Transacation이 종료되기 이전에 Sent: ProducerRecord Kafka 이벤트를 발행하고 있는것이다!!!
로그를 다시 자세히 들여다 보면 설정 이전과 다른 부분이 존재한다.
beginTransaction(),commitTransaction()두 개의 과정이 추가되었다.Kafka Transaction은DB Transaction과 다르게 우선은 이벤트를 발행한 이후 해당 이벤트에Transaction결과를 마크하는 방식이다.
2023-12-23T14:52:46.831+09:00 TRACE 32127 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [io.velog.youmakemesmile.kafka.ProducerController.case2]
2023-12-23T14:52:46.833+09:00 DEBUG 32127 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@7495d73a] beginTransaction()
2023-12-23T14:52:46.833+09:00 TRACE 32127 --- [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=io.velog.youmakemesmile.kafka.User@294d7cfb, timestamp=null)
2023-12-23T14:52:46.833+09:00 TRACE 32127 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@7495d73a] send(ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=io.velog.youmakemesmile.kafka.User@294d7cfb, timestamp=null))
2023-12-23T14:52:46.836+09:00 TRACE 32127 --- [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 118, 101, 108, 111, 103, 46, 121, 111, 117, 109, 97, 107, 101, 109, 101, 115, 109, 105, 108, 101, 46, 107, 97, 102, 107, 97, 46, 85, 115, 101, 114])], isReadOnly = true), key=null, value=io.velog.youmakemesmile.kafka.User@294d7cfb, timestamp=null)
2023-12-23T14:52:46.836+09:00 TRACE 32127 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [io.velog.youmakemesmile.kafka.ProducerController.case2]
2023-12-23T14:52:46.844+09:00 WARN 32127 --- [nio-8080-exec-2] o.m.jdbc.message.server.ErrorPacket : Error: 1062-23000: Duplicate entry 'TEST' for key 'PRIMARY'
2023-12-23T14:52:46.850+09:00 WARN 32127 --- [nio-8080-exec-2] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 1062, SQLState: 23000
2023-12-23T14:52:46.850+09:00 ERROR 32127 --- [nio-8080-exec-2] o.h.engine.jdbc.spi.SqlExceptionHelper : (conn=546) Duplicate entry 'TEST' for key 'PRIMARY'
2023-12-23T14:52:46.855+09:00 TRACE 32127 --- [| producer-tx-0] o.s.kafka.core.KafkaTemplate : Sent ok: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 118, 101, 108, 111, 103, 46, 121, 111, 117, 109, 97, 107, 101, 109, 101, 115, 109, 105, 108, 101, 46, 107, 97, 102, 107, 97, 46, 85, 115, 101, 114])], isReadOnly = true), key=null, value=io.velog.youmakemesmile.kafka.User@294d7cfb, timestamp=null), metadata: TEST1-0@224
2023-12-23T14:52:46.862+09:00 DEBUG 32127 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@7495d73a] abortTransaction()
2023-12-23T14:52:46.862+09:00 INFO 32127 --- [nio-8080-exec-2] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-tx-0, transactionalId=tx-0] Aborting incomplete transaction
2023-12-23T14:52:46.864+09:00 TRACE 32127 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@7495d73a] close(PT5S)
위의 로그는 예외가 발생하였을 경우이다. 성공때와 동일하게 Kafka 이벤트는 발행하고 있지만 abortTransaction()가 발생한것을 확인할 수 있다.
지금까지는 Kafka Transaction 동작에 대해서 알아봤다. 그런데 의문점이 있을 것이다. Kafka Transaction 설정 이전에도 이벤트는 발행되고 Spring Transaction 예외 발생시에도 이벤트는 발행된다.
그러하다 어쨌든 이벤트는 무조건 발행되는 것이다. 즉 구독자가 구독하는 방식이 중요하다!!

Consumer Configuration에 isolation.level의 값을 read_committed으로 설정하게 된다면 Kafka Transaction이 Commit된 이벤트만 구독하게된다.
이와 같이 Kafka Transaction를 적용하게 된다면 알아두어야 사항들이 존재한다.
이와 같은 방식으로 Spring Transaction의 결과에 따라 이벤트의 Transaction를 처리 할 수 있다.
지금 까지는 Spring Kafka의 문서에 나와있는 방식이였으며 다음으로 작성할 방법은 현재 회사에서 적용중인 Spring Transaction 결과에 따른 이벤트 발행 방식이다.
현재 우리의 해결과제는 Spring Transaction 이후 Kafka Producer가 되는 것이다. 그렇다 TransactionalEventListener를 활용하여 TransactionPhase.AFTER_COMMIT 이후에 kafkaTemplate.send를 실행시키면 되는것이다.
@Transactional
public void case3() {
User user = new User(UUID.randomUUID().toString(), 1);
entityManager.persist(user);
applicationEventPublisher.publishEvent(new UserEvent(EventType.Created, user.getId()));
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void test(UserEvent userEvent) {
kafkaTemplate.send("TEST1", userEvent);
}
위와 같이 로직을 구현한다면 Transaction의 Commit 이후에만 이벤트가 발행되며 예외가 발생한 경우에는 아예 이벤트가 발행되지 않게된다.
TransactionalEventListener를 활용하게 된다면 Kafka Transacation 적용시 구독자가 알아두어야할 사항들에 대해서 고려하지 않아도 된다는 장점이 존재한다.

지금까지는 Kafka Producer에 대해서 정리를 하였다. 다음 글에서는 Kafka Consumer에 대한 내용을 정리하겠다.