🚨해당 글은 Spring Kafka에 대한 사용방법과 실제 업무의 경험을 곁들인 적용법을 다루고 있습니다.
Apache Kafka에 대한 내용은 다루지 않고 있음을 참고부탁드립니다.
어느덧 개발을 시작한지 만 5년이 지나갔고 나를 되돌아보니 공부에 소홀해지고 있다는 생각이 마구마구들어 다시 마음을 다잡고 새로운 마음가짐으로 처음부터 다시 차근차근 내가 알고있는 것들을 정리해보자라는 생각에 무엇부터 해볼까 생각해보던중 MSA
구조에서 빠질 수 없는 메세지 브로커 Kafka
에 대해 정리를 해야겠다 결정하였다.
초반에는 Spring kafka
문서에 설명되어 있는 Producer
에 대해서 설명하고 이후 Kafka Transaction
에 대해서 설명할 예정이다.
역시나 기본적인 설정 방법은 공식문서에 친절하게 설명되어있다.
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
Producer
의 설정을 Map
으로 구성하여 ProducerFactory
를 생성한 이후 KafkaTemplate
를 만들어 해당 객체를 통해서 Message
를 전송하면 된다.
비동기 전송 방식 - Non Blocking (Async)
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
CompletableFuture<SendResult<Integer, String>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
}
else {
handleFailure(data, record, ex);
}
});
}
동기 전송 방식 - Blocking (Sync)
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
KafkaTemplate
의 Message
전송은 기본적으로 CompletableFuture
비동기로 진행되며 위와 같이 Blocking
구현을 통해 동기식으로 전송도 가능하다.
In version
3.0
, the methods that previously returnedListenableFuture
have been changed to returnCompletableFuture
. To facilitate the migration, the 2.9 version added a methodusingCompletableFuture()
which provided the same methods withCompletableFuture
return types; this method is no longer available.
버전 3.0
에서의 변경점으로 기존 리턴 타입이 ListenableFuture
에서 CompletableFuture
으로 변경되었다고 하니 참고하자.
Apache Kafka
에서는 ProducerInterceptor
를 제공하며 3.0
부터는 Spring Bean
으로 관리가 가능하다.
onSend
ProducerRecord
가 publish
되기 이전에 실행되며 ProducerRecord
에 접근가능하며 수정이 가능하다.
onAcknowledgement
Message
전송 결과에 따라 메소드가 호출된다. 정상 전송된 경우 Exception
이 null
이며 전송 실패의 경우 Exception
에 해당 예외가 전달되어 호출된다.
@Slf4j
@Component
public class KafkaProducerInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
log.info("message body -> {}", record.value());
log.info("message header -> {}", record.headers());
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
log.info("topic -> {}", metadata.topic());
log.info("partition -> {}", metadata.partition());
if (exception != null) {
log.error("error ->{}", exception);
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(KafkaProducerInterceptor kafkaProducerInterceptor) {
KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory());
kafkaTemplate.setProducerInterceptor(kafkaProducerInterceptor);
return kafkaTemplate;
}
Spring Kafka
에서는 Producer
전송 결과를 수신하는 ProducerListener
를 제공한다.
onSuccess
Message
가 전송된 이후 정상 처리된 경우 호출된다.
onError
Message
가 전송에 실패한 경우 호출된다.
@Component
@Slf4j
public class KafkaProducerListener implements ProducerListener {
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
ProducerListener.super.onSuccess(producerRecord, recordMetadata);
log.info("message body -> {}", producerRecord.value());
log.info("message header -> {}", producerRecord.headers());
log.info("message topic -> {}", recordMetadata.topic());
log.info("message offset -> {}", recordMetadata.offset());
}
@Override
public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
ProducerListener.super.onError(producerRecord, recordMetadata, exception);
log.info("message body -> {}", producerRecord.value());
log.info("message header -> {}", producerRecord.headers());
log.info("message topic -> {}", recordMetadata.topic());
log.info("message offset -> {}", recordMetadata.offset());
log.error("message exception -> {}", exception);
}
}
public KafkaTemplate<String, String> kafkaTemplate2(KafkaProducerInterceptor kafkaProducerInterceptor, KafkaProducerListener kafkaProducerListener) {
KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory2());
kafkaTemplate.setProducerListener(kafkaProducerListener);
return kafkaTemplate;
}
이제부터는 프로젝트에서 Kafka Producer
를 사용하면서 Transaction
관련하여 이슈가 있었던 부분들에 대해 정리를 해보겠다.
백엔드의 로직은 Transaction
과의 싸움이라고 생각한다. 다음 그림과 시나리오 같이 이벤트의 정의는 Zero Payload 방식으로 사용자 등록 이벤트
를 발행하여 타 서비스들이 해당 이벤트를 구독하고 있다고 가정해보겠다.
- 사용자 서비스에 사용자가 등록된다.
- 사용자 서비스에서
사용자 등록 이벤트
를 발행한다.- 이를 구독하고 있는 서비스는 Zero Payload로 구성된
사용자 등록 이벤트
에 포함된userId
로 사용자 서비스를 조회하여 각 서비스의 로직을 구현했다.
사용자 서비스의 로직은 다음과 같을것이다.
@Transactional
public void case1() {
User user1 = new User(UUID.randomUUID().toString(), 1);
userRepository.save(user1);
kafkaTemplate.send("TEST1", new UserEvent(EventType.Created, user1.getId()));
... // 엄청 복잡 오래걸리는 로직
}
위와 같은 코드에서 기본 Producer
방식으로 해당 시나리오가 정상적으로 동작할것 같은가?
정답은 문제가 발생한다 이다.
문제가 발생하는 부분은 kafkaTemplate.send
와 Spring Transaction
의 실행 시점 차이이다.
- 사용자 서비스에서
사용자 저장
이후사용자 등록 이벤트
발행- 하지만 아직
Spring Transaction Commit
이전 상황이기 때문에DB Commit
이 발생하지 않았지만사용자 등록 이벤트
를 발행사용자 등록 이벤트
를 구독하고 있는 상품, 주문 서비스에서 해당userId
조회시 예외 발생!!
위와 같이 Spring Transaction
이후 이벤트 발행을 하기 위해서는 Kafka Transaction
를 설정해야한다. 이 역시 공식문서에 상세히 설명이 나와있다.
@Bean
public DefaultKafkaProducerFactory<String, String> producerFactory() {
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
producerFactory.setTransactionIdPrefix("tx-");
return producerFactory;
}
DefaultKafkaProducerFactory
에 TransactionIdPrefix
를 설정하게 된다면 Kafka Transaction
으로 동작하게되며 이벤트는 kafkaTemplate.send
라인에서 바로 발행되지만 Spring Transaction
이후 해당 이벤트가 Commit
된다.
Kafka Transaction
설정 이전2023-12-23T14:29:13.360+09:00 TRACE 31664 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [io.velog.youmakemesmile.kafka.ProducerController.case1]
2023-12-23T14:29:13.361+09:00 TRACE 31664 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2023-12-23T14:29:13.364+09:00 TRACE 31664 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2023-12-23T14:29:13.365+09:00 TRACE 31664 --- [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=io.velog.youmakemesmile.kafka.UserEvent@492bc113, timestamp=null)
2023-12-23T14:29:13.365+09:00 TRACE 31664 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@18f60398] send(ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=io.velog.youmakemesmile.kafka.UserEvent@492bc113, timestamp=null))
2023-12-23T14:29:13.365+09:00 TRACE 31664 --- [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 118, 101, 108, 111, 103, 46, 121, 111, 117, 109, 97, 107, 101, 109, 101, 115, 109, 105, 108, 101, 46, 107, 97, 102, 107, 97, 46, 85, 115, 101, 114, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=io.velog.youmakemesmile.kafka.UserEvent@492bc113, timestamp=null)
2023-12-23T14:29:13.368+09:00 TRACE 31664 --- [ad | producer-1] o.s.kafka.core.KafkaTemplate : Sent ok: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 118, 101, 108, 111, 103, 46, 121, 111, 117, 109, 97, 107, 101, 109, 101, 115, 109, 105, 108, 101, 46, 107, 97, 102, 107, 97, 46, 85, 115, 101, 114, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=io.velog.youmakemesmile.kafka.UserEvent@492bc113, timestamp=null), metadata: TEST1-0@211
2023-12-23T14:29:13.368+09:00 TRACE 31664 --- [ad | producer-1] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@18f60398] close(PT5S)
2023-12-23T14:29:16.366+09:00 TRACE 31664 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [io.velog.youmakemesmile.kafka.ProducerController.case1]
모든 Transacation
이 종료되기 이전에 Sent: ProducerRecord
Kafka
이벤트를 발행하는 것을 확인 할 수 있다.
Kafka Transaction
설정 이후2023-12-23T14:32:16.832+09:00 TRACE 31707 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [io.velog.youmakemesmile.kafka.ProducerController.case1]
2023-12-23T14:32:16.832+09:00 TRACE 31707 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2023-12-23T14:32:16.836+09:00 TRACE 31707 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2023-12-23T14:32:16.836+09:00 DEBUG 31707 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@724c462c] beginTransaction()
2023-12-23T14:32:16.836+09:00 TRACE 31707 --- [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=io.velog.youmakemesmile.kafka.UserEvent@70d7f904, timestamp=null)
2023-12-23T14:32:16.836+09:00 TRACE 31707 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@724c462c] send(ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=io.velog.youmakemesmile.kafka.UserEvent@70d7f904, timestamp=null))
2023-12-23T14:32:16.836+09:00 TRACE 31707 --- [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 118, 101, 108, 111, 103, 46, 121, 111, 117, 109, 97, 107, 101, 109, 101, 115, 109, 105, 108, 101, 46, 107, 97, 102, 107, 97, 46, 85, 115, 101, 114, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=io.velog.youmakemesmile.kafka.UserEvent@70d7f904, timestamp=null)
2023-12-23T14:32:16.842+09:00 TRACE 31707 --- [| producer-tx-0] o.s.kafka.core.KafkaTemplate : Sent ok: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 118, 101, 108, 111, 103, 46, 121, 111, 117, 109, 97, 107, 101, 109, 101, 115, 109, 105, 108, 101, 46, 107, 97, 102, 107, 97, 46, 85, 115, 101, 114, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=io.velog.youmakemesmile.kafka.UserEvent@70d7f904, timestamp=null), metadata: TEST1-0@214
2023-12-23T14:32:19.841+09:00 TRACE 31707 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [io.velog.youmakemesmile.kafka.ProducerController.case1]
2023-12-23T14:32:19.859+09:00 DEBUG 31707 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@724c462c] commitTransaction()
2023-12-23T14:32:19.864+09:00 TRACE 31707 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@724c462c] close(PT5S)
위에 로그를 보면 기대와 다르게 무엇인가 이상하다!! Kafka Transaction
설정 이전과 같이 모든 Transacation
이 종료되기 이전에 Sent: ProducerRecord
Kafka
이벤트를 발행하고 있는것이다!!!
로그를 다시 자세히 들여다 보면 설정 이전과 다른 부분이 존재한다.
beginTransaction()
,commitTransaction()
두 개의 과정이 추가되었다.Kafka Transaction
은DB Transaction
과 다르게 우선은 이벤트를 발행한 이후 해당 이벤트에Transaction
결과를 마크하는 방식이다.
2023-12-23T14:52:46.831+09:00 TRACE 32127 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [io.velog.youmakemesmile.kafka.ProducerController.case2]
2023-12-23T14:52:46.833+09:00 DEBUG 32127 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@7495d73a] beginTransaction()
2023-12-23T14:52:46.833+09:00 TRACE 32127 --- [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=io.velog.youmakemesmile.kafka.User@294d7cfb, timestamp=null)
2023-12-23T14:52:46.833+09:00 TRACE 32127 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@7495d73a] send(ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=io.velog.youmakemesmile.kafka.User@294d7cfb, timestamp=null))
2023-12-23T14:52:46.836+09:00 TRACE 32127 --- [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 118, 101, 108, 111, 103, 46, 121, 111, 117, 109, 97, 107, 101, 109, 101, 115, 109, 105, 108, 101, 46, 107, 97, 102, 107, 97, 46, 85, 115, 101, 114])], isReadOnly = true), key=null, value=io.velog.youmakemesmile.kafka.User@294d7cfb, timestamp=null)
2023-12-23T14:52:46.836+09:00 TRACE 32127 --- [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [io.velog.youmakemesmile.kafka.ProducerController.case2]
2023-12-23T14:52:46.844+09:00 WARN 32127 --- [nio-8080-exec-2] o.m.jdbc.message.server.ErrorPacket : Error: 1062-23000: Duplicate entry 'TEST' for key 'PRIMARY'
2023-12-23T14:52:46.850+09:00 WARN 32127 --- [nio-8080-exec-2] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 1062, SQLState: 23000
2023-12-23T14:52:46.850+09:00 ERROR 32127 --- [nio-8080-exec-2] o.h.engine.jdbc.spi.SqlExceptionHelper : (conn=546) Duplicate entry 'TEST' for key 'PRIMARY'
2023-12-23T14:52:46.855+09:00 TRACE 32127 --- [| producer-tx-0] o.s.kafka.core.KafkaTemplate : Sent ok: ProducerRecord(topic=TEST1, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 118, 101, 108, 111, 103, 46, 121, 111, 117, 109, 97, 107, 101, 109, 101, 115, 109, 105, 108, 101, 46, 107, 97, 102, 107, 97, 46, 85, 115, 101, 114])], isReadOnly = true), key=null, value=io.velog.youmakemesmile.kafka.User@294d7cfb, timestamp=null), metadata: TEST1-0@224
2023-12-23T14:52:46.862+09:00 DEBUG 32127 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@7495d73a] abortTransaction()
2023-12-23T14:52:46.862+09:00 INFO 32127 --- [nio-8080-exec-2] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-tx-0, transactionalId=tx-0] Aborting incomplete transaction
2023-12-23T14:52:46.864+09:00 TRACE 32127 --- [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@7495d73a] close(PT5S)
위의 로그는 예외가 발생하였을 경우이다. 성공때와 동일하게 Kafka
이벤트는 발행하고 있지만 abortTransaction()
가 발생한것을 확인할 수 있다.
지금까지는 Kafka Transaction
동작에 대해서 알아봤다. 그런데 의문점이 있을 것이다. Kafka Transaction
설정 이전에도 이벤트는 발행되고 Spring Transaction
예외 발생시에도 이벤트는 발행된다.
그러하다 어쨌든 이벤트는 무조건 발행되는 것이다. 즉 구독자가 구독하는 방식이 중요하다!!
Consumer Configuration
에 isolation.level
의 값을 read_committed
으로 설정하게 된다면 Kafka Transaction
이 Commit
된 이벤트만 구독하게된다.
이와 같이 Kafka Transaction
를 적용하게 된다면 알아두어야 사항들이 존재한다.
이와 같은 방식으로 Spring Transaction
의 결과에 따라 이벤트의 Transaction
를 처리 할 수 있다.
지금 까지는 Spring Kafka
의 문서에 나와있는 방식이였으며 다음으로 작성할 방법은 현재 회사에서 적용중인 Spring Transaction
결과에 따른 이벤트 발행 방식이다.
현재 우리의 해결과제는 Spring Transaction
이후 Kafka Producer
가 되는 것이다. 그렇다 TransactionalEventListener
를 활용하여 TransactionPhase.AFTER_COMMIT
이후에 kafkaTemplate.send
를 실행시키면 되는것이다.
@Transactional
public void case3() {
User user = new User(UUID.randomUUID().toString(), 1);
entityManager.persist(user);
applicationEventPublisher.publishEvent(new UserEvent(EventType.Created, user.getId()));
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void test(UserEvent userEvent) {
kafkaTemplate.send("TEST1", userEvent);
}
위와 같이 로직을 구현한다면 Transaction
의 Commit
이후에만 이벤트가 발행되며 예외가 발생한 경우에는 아예 이벤트가 발행되지 않게된다.
TransactionalEventListener
를 활용하게 된다면 Kafka Transacation
적용시 구독자가 알아두어야할 사항들에 대해서 고려하지 않아도 된다는 장점이 존재한다.
지금까지는 Kafka Producer
에 대해서 정리를 하였다. 다음 글에서는 Kafka Consumer
에 대한 내용을 정리하겠다.