본격적으로 Kafka 통신을 위해 먼저 Configuration을 진행하겠다.
일전에 작성한 Kafka 완전정복을 시작으로, Kafka를 활용하기 전에 기본적인 작동원리나 개념들을 이해하는 것이 가장 중요한 단계라 생각하여 환경설정에 대한 내용도 기록해야 겠다는 생각을 하였다.
이번의 Configuration 관련 내용도 단순히 지나치기엔 핵심적인 내용이 많아 이 글을 작성하게 되었다.
특히 이번엔 트러블 슈팅이나 개선의 관점보다는 단순히 개념학습적인 부분이 크지만, 향후 Kafka의 활용도를 높여 Architecturing의 수준을 높일 수 있을 것으로 생각하여 최대한 깊게 학습하였고 이에 대해 정리한 내용을 기록한다.
인기글을 선정하여 Client에게 보여주는 "인기글 집계 및 조회"를 위해 hot-article이라는 책임을 별도로 만들어주었다.
기존 Monolithic 구조였다면 단순 hot-article 도메인 추가 및 application.yml 및 build.gradle에 관련 설정 및 의존성을 기재해주었겠지만, 현재의 구조는 각기 다른 Database를 바라보고 별도의 모듈로 구성된 MSA구조이기에 해당 책임을 전임하여 Kafka 기본 설정부터 활용까지 핵심 설정 정보까지 보유할 수 있도록 고려하여 도메인을 구성해주었다.
이처럼 hot-article 도메인을 구성해주었고, Kafka 기본 설정부터 일전에 작성한 Event 객체 등에 대한 책임을 모두 이 도메인에 위임한다.
이제 Kafka 통신을 위한 기본적인 설정정보를 기재하여야 한다.
server의 port 및 application root name 등, Kafka 설정 정보를 제외한 항목들은 여기선 따로 다루지는 않겠다.
server.port: 9004
spring:
application:
name: kuke-board-hot-article-service
data:
redis:
host: 127.0.0.1
port: 6379
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: kuke-board-hot-article-service #인기글 선정 병렬 처리를 위한 group id 지정
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #Kafka 레벨 통신 시 직렬화
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #Kafka 레벨 통신 시 역직렬화
enable-auto-commit: false #다음에 읽을 offset 자동 commit 여부(메시지 유실 위험이 있어 false 설정)
endpoints:
kuke-board-article-service:
url: http://127.0.0.1:9000 #실제 원본 데이터를 받아 세부내용을 추출하기 위한 API URL(end point)
여기서 알아두어야할 개념은 바로 Consumer이다.
먼저, consumer group id를 지정해주는데 1 topic에 여러 partition이 존재할 수 있고, 이 여러 partition들에 대해 offset 위치를 공유하면서 병렬처리를 할 수 있도록 설정해줄 수 있는 포인터이다.
group-id
는 Consumer Group을 식별하는 이름으로, 해당 Consumer는 특정 topic에 대한 partition을 바라보면서 메시지를 읽고 처리하게 된다.정리하면, group-id
를 지정한다는 건 "내 Consumer는 이 그룹의 일원으로 메시지를 소비하겠다"는 선언이기도 하다.
Kafka 내부 동작과정과 연결하기 위해, Consumer Group 동작 방식에 대해 깊게 살펴보도록 하자.
하나의 그룹 = 논리적으로 "하나의 Subscriber"
Offset 관리도 그룹 단위
group-id
별로 관리하며, 반대로 말하면 group-id가 동일한 consumer들은 consumer group 단위로 offset을 공유하면서 partition별 병렬처리를 진행한다.병렬 처리
순서 보장
consumer group id는 메시지를 처리하고 데이터를 구독하기 위한 기본 설정으로, 물리적으로나 논리적으로나 반드시 지정해주어야 하는 작업이다.
group-id
를 지정하지 않으면, Consumer는 Anonymous Consumer처럼 동작한다.article-events
(Partition = 3) 라는 데이터가 Kafka Broker에 있다고 해보자.Case 1. group-id 동일하다면?
group-id = hot-article-service
Consumer1, Consumer2, Consumer3
-> Consumer1,2,3은 Partition 0, 1, 2에 나눠진 메시지들을 각각 도맡아 병렬 처리한다
-> 1 Consumer = 1 Partition(Kafka는 Partition 별로 나누어 Consumer들에게 메시지 처리를 위임한다.
Case 2. group-id 다르다면?
Consumer1 (group-id=hot-article-service)
Consumer2 (group-id=analytics-service)
-> 두 Consumer Group이 각각 모든 Partition을 다 읽는다.
-> 서로 독립적인 Subscriber 역할을 하여, Group id가 달라도 동일한 topic을 구독하며 이때 위임받은 partition들을 consumer가 메시지를 처리한다.
-> 동일한 데이터이지만 서로 처리하는 목적이 다를 수 있다.
consumer group id에 대한 설정 정보를 아래와 같이 정리해볼 수 있겠다.
group-id
는 Consumer가 속한 그룹을 식별하는 값이다.group-id
안에서는 Partition을 나눠서 병렬처리하고, 순서보장이 필요하다면 1 Consumer가 1 Partition에 적재된 메시지들을 순차적으로 처리한다.group-id
를 쓰면 완전히 독립적인 구독자로 동작하여, 보통은 메시지가 같아도(topic) 서로 다른 목적의 처리를 진행할때 group id를 다르게 지정해준다.group-id
가 없으면 메시지 재처리, 유실 방지 같은 관리가 불가능하기에 반드시 설정해주어야 한다.consumer가 메시지를 읽을때 파티션 로그 파일에 있는 byte[] 파일을 바로 읽을 수 없으므로, Kafka level에서 String으로 1차적으로 역직렬화해주는 작업이 application.yml에서 설정한 String Deserializer이다.
이후에 이 String 문자열을 받아와서 Object Mapper의 Event 객체 형태로 2차 역직렬화하는 과정이 있는데, 이것은 common util모듈로 지정한 Data Serializer/Deserializer를 통해 진행하는 것이다.
위에서 볼 수 있듯이 직렬화/역직렬화를 단순히 Object <-> String의 일방향적 전환 과정이 아니라, appliction level과 Kafka level을 각각 나누어 데이터 혹은 메시지를 어떻게 전환하는지 기준점을 잘 구분해주는 것이 좋겠다.
간단하게 살펴보자면 각 Kafka/application level별로 직렬화/역직렬화하는 기준이 다르므로 무조건적인 String화 = 직렬화라 보면 곤란하다.
이와 같이 각각의 기준에서, 최초의 형태(상태)가 다르므로 직렬화 및 역직렬화를 지칭하는 개념 및 방향이 다름에 유의한다.
비즈니스 이벤트 객체(ArticleEvent)를 만들었을 때:
ArticleLikedEvent event = new ArticleLikedEvent(articleId, userId);
이걸 Kafka로 보내려면, JSON 같은 문자열로 변환이 필요하다(=application 입장에선 최초 형태가 Object 형태이므로 직렬화).
String payload = objectMapper.writeValueAsString(event); // Object -> JSON String
이게 우리가 일반적으로 말하는 직렬화(JSON 직렬화)이고, 반대로 Consumer에서 메시지를 읽기 위해 Kafka에서 1차적으로 역직렬화한 문자열을 전송받아, JSON String → Event 객체로 바꾸는 건 역직렬화이다.
Kafka는 네트워크로 byte 배열만 전송할 수 있기에, 내부적으로 최초 형태를 byte 배열 형태로 간주한다.
따라서 Producer가 Kafka에 메시지를 보내든, 이 메시지를 파티션 로그 파일에 적재하든 최종적으로는 반드시 byte[] 이어야 한다.
그래서 Kafka는 추가로 Serializer / Deserializer 인터페이스가 필요한 것이고, kafka 통신에서는 byte[]를 최초의 형태로 간주하여 직렬화/역직렬화도 이에 맞게 매핑하여 이해해야 하겠다.
즉, Kafka 통신 레벨에서의 직렬화는 네트워크 전송용 byte 변환이다.
StringSerializer는 JSON String → UTF-8 인코딩된 byte[]
StringDeserializer는 byte[] → JSON String
실시간 처리가 필요할떄는 스트리밍이 필요하고, 반대로 실시간 처리보다는 안정적인 데이터 추출 등의 작업이 필요하거나 트래픽이 그리 크지 않을 경우에는 endpoint를 활용한 API 정보를 기재해야 한다.
위 그림처럼 Kafka 및 Redis에는 필요한 최소한의 데이터(주로 Id값)을 담는다.
endpoints.kuke-board-article-service.url은 "인기글 서비스"가 원본 데이터를 가져올 대상 서비스 주소에 대한 내용이고, 보통 Redis와 마찬가지로, Kafka 이벤트에 들어오는 건 보통 이벤트에 필요한 최소 데이터이기에 별도로 원본 데이터를 추출하기 위한 API 통신 정보가 필요하다.
예: "articleId": 123, "userId": 456, "action": "LIKE_ADDED"
따라서,
Kafka 이벤트를 소비해서 어떤 글이 핫해졌다는 건 알 수 있지만,
실제로 인기글 목록을 구성하려면 해당 글의 상세 데이터가 필요하므로, 그걸 게시글 서비스 API를 호출해서 가져오겠다는 의미이다.
즉, endpoints.kuke-board-article-service.url=http://127.0.0.1:9000로 설정한 것은, "인기글 서비스"는 여기 있는 게시글 서비스 REST API를 통해 원본 데이터를 조회한다는 의미이다.
Kafka의 commit은 Consumer가 메시지를 읽은 후에, 자신이 다음에 처리할 메시지의 위치 포인터인 offset을 갱신하여 기억하는 과정을 의미한다.
이 offset은 topic 내부의 __consumer_offsets에 저장하기에, 동일한 topic을 바라보는 consumer들은 이 offset 정보를 서로 공유한다.
Offset은 너무나도 중요한 개념이기에 다시 정리해보겠다.
이 Offset을 어떻게 Commit(기록)할지 정하는 게 enable-auto-commit이다.
이 offset을 기억하는 시점의 기본값은 주기적으로 메시지를 읽기 위해 poll하는 시점(메시지를 읽는 시점도 아니다)으로, 처리 이후의 시점이 아니기에 데이터 유실을 대비한다면 auto commit을 false로 하는 것이 중요하겠다.
Consumer가 메시지를 poll() 할 때마다 백그라운드에서 일정 주기(auto.commit.interval.ms, 기본 5초)로 자동 커밋한다.
치명적으로, 실제 비즈니스 로직이 끝나기 전에 Offset이 커밋될 수 있으므로, 장애 발생 시 읽지 못한 메시지는 그대로 유실한다.
즉, 처리해야 할 메시지는 아직 DB에 안 저장됐는데 Offset은 커밋한 경우, Consumer를 재시작하면 해당 메시지는 다시 안 읽기에 그대로 유실된다(데이터 유실).
보통은 자동 커밋을 끄고, 애플리케이션이 직접 commitSync() 또는 commitAsync()를 호출하도록 하여 메시지를 읽고 데이터 처리가 모두 끝난 이후에 offset commit을 하는 전략을 주로 채택한다.
그렇기에 메시지 처리 로직이 끝난 후 커밋하도록 제어 가능하여, 최소한의 "처리 완료 보장"을 확보할 수 있고 장애 상황에서도 메시지 유실 가능성을 크게 줄일 수 있겠다.
단 중복 처리(한 메시지를 두 번 처리) 가능성 있는 점만 유의하여 처리한다.
Kafka Config파일을 바탕으로 application level에서 Spring이 이를 인식하여 Factory 및 Container를 실행해주고, 최종적으로 Kafka와 통신할 수 있도록 해줄 수 있다.
@Configuration
public class KafkaConfig {
/*
* 메시지를 소비하는 Consumer를 구성하고 동작하는 전략을 구성해줄 수 있는 설정 클래스.
* ConsumerFactory: application.yml 기반으로 KafkaConsumer 인스턴스를 만들어주는 설정
* ConcurrentKafkaListenerContainerFactory: Consumer를 실행하는 ListenerContainer를 생성하는 공장 → 동작 전략(AckMode, concurrency 등)을 커스터마이징할 수 있음.
* KafkaConfig 클래스: 이 Factory를 Bean으로 등록해, @KafkaListener가 어떤 실행 전략으로 메시지를 소비할지 지정.
* */
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory
) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); //manual commit
return factory;
}
}
이때 return값이 factory인데, factory는 Spring Application 측에서 Consumer 인스턴스를 생성하여 Kafka로부터 메시지를 읽고 처리하기 위한 하나의 생성인자라 할 수 있겠다.
여기서 application.yml의 설정 값들 (bootstrap-servers, group-id, key-deserializer, value-deserializer, enable-auto-commit 등)을 읽어오고, 최종적으로 application.yml 기반으로 KafkaConsumer를 만들어 주는 클래스이다.
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
용어가 생소하고 어려운데, 풀어 쓰자면
이 Factory는 @KafkaListener 애노테이션이 붙은 메서드가 실행될 때 필요한 Consumer Container를 만들어주는 (말그대로) 공장이고, consumerFactory 파라미터는 이미 스프링이 application.yml의 설정정보를 참조한다.
위에서 보면 알겠지만, ContainerFactory를 만들기 위해서는 ConsumerFactory 인자가 반드시 필요하다.
위에서 application.yml 정보를 참조하여 consumerfactory를 만든다고 하였는데, 말그대로 Consumer 인스턴스를 생성하기 위한 실체라 볼 수 있다.
참고로
의 두가지 과정으로 consumer 정보를 읽어오고 commit 방법을 정한다.
이때 commit 방법의 경우, 기본적으로 auto commit이 false로 되어있어야 세부적인 ackMode 지정이 가능하다.
(참고로 application.yml에서 auto-commit = true일 경우, Kafka 네이티브 Consumer가 poll하는 시점에 바로 commit을 진행하므로 위 설정은 의미가 없어진다.)
이에 대한 이해를 바탕으로 Kafka Consumer 구성과정을 크게 3가지로 구분할 수 있겠다.
ConsumerFactory
→ "Kafka Consumer 객체를 어떻게 만들 건지" 정의 (application.yml 기반 설정을 읽어 실제 KafkaConsumer 인스턴스를 생성)
KafkaListenerContainer (Container)
→ Consumer를 감싸고 메시지를 지속적으로 poll 하면서, Consumer/리스너(@KafkaListener)에 메시지를 전달하는 실행 환경, 결국엔 Consumer 그 자체라고 생각하면 편하다.
ConcurrentKafkaListenerContainerFactory
→ ListenerContainer를 만드는 "공장(Factory)"
→ 병렬성(스레드 수), AckMode(커밋 방식) 같은 동작 전략을 여기서 지정