[Java] Application level에서 Kafka Configuration(*직렬화/역직렬화부터 Application/Kafka level 통신 및 이를 위해 필요한 Application level Configuration 관련 개념들)

Hyo Kyun Lee·4일 전
0

Java

목록 보기
105/106

1. 개요

본격적으로 Kafka 통신을 위해 먼저 Configuration을 진행하겠다.

일전에 작성한 Kafka 완전정복을 시작으로, Kafka를 활용하기 전에 기본적인 작동원리나 개념들을 이해하는 것이 가장 중요한 단계라 생각하여 환경설정에 대한 내용도 기록해야 겠다는 생각을 하였다.

이번의 Configuration 관련 내용도 단순히 지나치기엔 핵심적인 내용이 많아 이 글을 작성하게 되었다.

특히 이번엔 트러블 슈팅이나 개선의 관점보다는 단순히 개념학습적인 부분이 크지만, 향후 Kafka의 활용도를 높여 Architecturing의 수준을 높일 수 있을 것으로 생각하여 최대한 깊게 학습하였고 이에 대해 정리한 내용을 기록한다.

2. hot-article 도메인 구성

인기글을 선정하여 Client에게 보여주는 "인기글 집계 및 조회"를 위해 hot-article이라는 책임을 별도로 만들어주었다.

기존 Monolithic 구조였다면 단순 hot-article 도메인 추가 및 application.yml 및 build.gradle에 관련 설정 및 의존성을 기재해주었겠지만, 현재의 구조는 각기 다른 Database를 바라보고 별도의 모듈로 구성된 MSA구조이기에 해당 책임을 전임하여 Kafka 기본 설정부터 활용까지 핵심 설정 정보까지 보유할 수 있도록 고려하여 도메인을 구성해주었다.

이처럼 hot-article 도메인을 구성해주었고, Kafka 기본 설정부터 일전에 작성한 Event 객체 등에 대한 책임을 모두 이 도메인에 위임한다.

3. application.yml

이제 Kafka 통신을 위한 기본적인 설정정보를 기재하여야 한다.

server의 port 및 application root name 등, Kafka 설정 정보를 제외한 항목들은 여기선 따로 다루지는 않겠다.

server.port: 9004
spring:
  application:
    name: kuke-board-hot-article-service
  data:
    redis:
      host: 127.0.0.1
      port: 6379
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: kuke-board-hot-article-service #인기글 선정 병렬 처리를 위한 group id 지정
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #Kafka 레벨 통신 시 직렬화
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #Kafka 레벨 통신 시 역직렬화
      enable-auto-commit: false #다음에 읽을 offset 자동 commit 여부(메시지 유실 위험이 있어 false 설정)
endpoints:
  kuke-board-article-service:
    url: http://127.0.0.1:9000 #실제 원본 데이터를 받아 세부내용을 추출하기 위한 API URL(end point)

여기서 알아두어야할 개념은 바로 Consumer이다.

3-1. consumer group id

먼저, consumer group id를 지정해주는데 1 topic에 여러 partition이 존재할 수 있고, 이 여러 partition들에 대해 offset 위치를 공유하면서 병렬처리를 할 수 있도록 설정해줄 수 있는 포인터이다.

  • group-idConsumer Group을 식별하는 이름으로, 해당 Consumer는 특정 topic에 대한 partition을 바라보면서 메시지를 읽고 처리하게 된다.
  • 즉 Consumer group id를 지정하는 것은 메시지를 처리하기 위한 논리적인 구분 단위를 지정해주는 것과 마찬가지이며, Kafka는 Consumer Group 단위로 메시지를 분배(Load Balancing) 하고, Offset을 관리한다.

정리하면, group-id를 지정한다는 건 "내 Consumer는 이 그룹의 일원으로 메시지를 소비하겠다"는 선언이기도 하다.

3-1-1. Consumer Group의 동작 방식

Kafka 내부 동작과정과 연결하기 위해, Consumer Group 동작 방식에 대해 깊게 살펴보도록 하자.

  1. 하나의 그룹 = 논리적으로 "하나의 Subscriber"

    • 같은 group-id에 속한 Consumer들은 협력해서 메시지를 나눠 읽는다.
    • 즉, 그룹 전체가 합쳐져서 "이 토픽은 내가 처리할게"라고 하는 구조, 결국 consumer group id를 지정하여 구독자를 설정해주므로 Kafka를 활용하기 위해 반드시 해주어야 하는 기본 설정.
  2. Offset 관리도 그룹 단위

    • Offset은 group-id 별로 관리하며, 반대로 말하면 group-id가 동일한 consumer들은 consumer group 단위로 offset을 공유하면서 partition별 병렬처리를 진행한다.
    • 같은 토픽이라도 group-id가 다르면 서로 다른 "구독자"처럼 동작하고, 메시지를 각각 읽을 수 있어.
  3. 병렬 처리

    • 한 그룹 안에 Consumer가 여러 개 있으면, Kafka는 Partition 단위로 Consumer에게 메시지를 나눠줌.
    • 즉, Partition 수 ≤ Consumer 수 까지는 병렬처리가 가능해지므로 고가용성 측면에서 유리하다.
    • 하지만 Partition 수 < Consumer 수라면 일부 Consumer는 유휴상태이므로 효율적이지 못하다.
  4. 순서 보장

    • 순서 보장을 위해선 순서보장이 필요한 처리를 하나의 partition 로그 파일에 적재하여 한 consumer가 이를 처리하도록 해야 한다(여러 consumer가 하나의 partition을 바라보지는 않고 유휴상태에서 존재한다).

3-1-2. group-id를 지정하지 않으면?

consumer group id는 메시지를 처리하고 데이터를 구독하기 위한 기본 설정으로, 물리적으로나 논리적으로나 반드시 지정해주어야 하는 작업이다.

  • group-id를 지정하지 않으면, Consumer는 Anonymous Consumer처럼 동작한다.
  • 즉, Offset 관리가 안 되고, 매번 처음부터 읽어야 하거나 임시 설정을 따라가므로 항상 명시적으로 group-id를 주는 것이 맞다(즉, group id를 지정해주지 않는다면 의미가 없다).

3-1-3. consumer group id는 구독/처리 목적을 나타낼 수 있는 정보이기도 하다.

  • 토픽: article-events (Partition = 3) 라는 데이터가 Kafka Broker에 있다고 해보자.

Case 1. group-id 동일하다면?

group-id = hot-article-service
Consumer1, Consumer2, Consumer3

-> Consumer1,2,3은 Partition 0, 1, 2에 나눠진 메시지들을 각각 도맡아 병렬 처리한다
-> 1 Consumer = 1 Partition(Kafka는 Partition 별로 나누어 Consumer들에게 메시지 처리를 위임한다.

Case 2. group-id 다르다면?

Consumer1 (group-id=hot-article-service)
Consumer2 (group-id=analytics-service)

-> 두 Consumer Group이 각각 모든 Partition을 다 읽는다.
-> 서로 독립적인 Subscriber 역할을 하여, Group id가 달라도 동일한 topic을 구독하며 이때 위임받은 partition들을 consumer가 메시지를 처리한다.
-> 동일한 데이터이지만 서로 처리하는 목적이 다를 수 있다.

3-1-4. Consumer group id 설정 정보 관련 정리

consumer group id에 대한 설정 정보를 아래와 같이 정리해볼 수 있겠다.

  • group-id는 Consumer가 속한 그룹을 식별하는 값이다.
  • 같은 group-id 안에서는 Partition을 나눠서 병렬처리하고, 순서보장이 필요하다면 1 Consumer가 1 Partition에 적재된 메시지들을 순차적으로 처리한다.
  • 다른 group-id를 쓰면 완전히 독립적인 구독자로 동작하여, 보통은 메시지가 같아도(topic) 서로 다른 목적의 처리를 진행할때 group id를 다르게 지정해준다.
  • Offset도 그룹 단위로 관리되므로, group-id가 없으면 메시지 재처리, 유실 방지 같은 관리가 불가능하기에 반드시 설정해주어야 한다.

3-2. key/value deserializer

consumer가 메시지를 읽을때 파티션 로그 파일에 있는 byte[] 파일을 바로 읽을 수 없으므로, Kafka level에서 String으로 1차적으로 역직렬화해주는 작업이 application.yml에서 설정한 String Deserializer이다.

이후에 이 String 문자열을 받아와서 Object Mapper의 Event 객체 형태로 2차 역직렬화하는 과정이 있는데, 이것은 common util모듈로 지정한 Data Serializer/Deserializer를 통해 진행하는 것이다.

3-2-1. 직렬화와 역직렬화

위에서 볼 수 있듯이 직렬화/역직렬화를 단순히 Object <-> String의 일방향적 전환 과정이 아니라, appliction level과 Kafka level을 각각 나누어 데이터 혹은 메시지를 어떻게 전환하는지 기준점을 잘 구분해주는 것이 좋겠다.

간단하게 살펴보자면 각 Kafka/application level별로 직렬화/역직렬화하는 기준이 다르므로 무조건적인 String화 = 직렬화라 보면 곤란하다.

  • Kafka 레벨의 직렬화/역직렬화: byte[] ↔ String (네트워크 통신용)
  • 애플리케이션 레벨의 직렬화/역직렬화: String(JSON) ↔ Event 객체 (ObjectMapper)

이와 같이 각각의 기준에서, 최초의 형태(상태)가 다르므로 직렬화 및 역직렬화를 지칭하는 개념 및 방향이 다름에 유의한다.

3-2-2. 구체적인 예시를 통해 직렬화/역직렬화 과정 확실히 이해하기

  1. 애플리케이션 레벨 (공통 util로 구현한 DataSerializer)

비즈니스 이벤트 객체(ArticleEvent)를 만들었을 때:

ArticleLikedEvent event = new ArticleLikedEvent(articleId, userId);

이걸 Kafka로 보내려면, JSON 같은 문자열로 변환이 필요하다(=application 입장에선 최초 형태가 Object 형태이므로 직렬화).

String payload = objectMapper.writeValueAsString(event); // Object -> JSON String

이게 우리가 일반적으로 말하는 직렬화(JSON 직렬화)이고, 반대로 Consumer에서 메시지를 읽기 위해 Kafka에서 1차적으로 역직렬화한 문자열을 전송받아, JSON String → Event 객체로 바꾸는 건 역직렬화이다.

  1. Kafka 통신 레벨 (네트워크 I/O 직렬화)

Kafka는 네트워크로 byte 배열만 전송할 수 있기에, 내부적으로 최초 형태를 byte 배열 형태로 간주한다.

따라서 Producer가 Kafka에 메시지를 보내든, 이 메시지를 파티션 로그 파일에 적재하든 최종적으로는 반드시 byte[] 이어야 한다.

그래서 Kafka는 추가로 Serializer / Deserializer 인터페이스가 필요한 것이고, kafka 통신에서는 byte[]를 최초의 형태로 간주하여 직렬화/역직렬화도 이에 맞게 매핑하여 이해해야 하겠다.

  • Producer: StringSerializer을 통해, JSON String -> byte[]로 변환
  • Broker & Consumer: StringDeserializer을 통해, byte[] -> JSON String 복원

즉, Kafka 통신 레벨에서의 직렬화는 네트워크 전송용 byte 변환이다.
StringSerializer는 JSON String → UTF-8 인코딩된 byte[]
StringDeserializer는 byte[] → JSON String

3-3. 다른 도메인과 통신하는 방법 - endpoint

실시간 처리가 필요할떄는 스트리밍이 필요하고, 반대로 실시간 처리보다는 안정적인 데이터 추출 등의 작업이 필요하거나 트래픽이 그리 크지 않을 경우에는 endpoint를 활용한 API 정보를 기재해야 한다.

위 그림처럼 Kafka 및 Redis에는 필요한 최소한의 데이터(주로 Id값)을 담는다.

endpoints.kuke-board-article-service.url은 "인기글 서비스"가 원본 데이터를 가져올 대상 서비스 주소에 대한 내용이고, 보통 Redis와 마찬가지로, Kafka 이벤트에 들어오는 건 보통 이벤트에 필요한 최소 데이터이기에 별도로 원본 데이터를 추출하기 위한 API 통신 정보가 필요하다.

예: "articleId": 123, "userId": 456, "action": "LIKE_ADDED"

따라서,

Kafka 이벤트를 소비해서 어떤 글이 핫해졌다는 건 알 수 있지만,
실제로 인기글 목록을 구성하려면 해당 글의 상세 데이터가 필요하므로, 그걸 게시글 서비스 API를 호출해서 가져오겠다는 의미이다.

즉, endpoints.kuke-board-article-service.url=http://127.0.0.1:9000로 설정한 것은, "인기글 서비스"는 여기 있는 게시글 서비스 REST API를 통해 원본 데이터를 조회한다는 의미이다.

3-4. offset 유실방지를 위한 정책 - auto-commit = false

Kafka의 commit은 Consumer가 메시지를 읽은 후에, 자신이 다음에 처리할 메시지의 위치 포인터인 offset을 갱신하여 기억하는 과정을 의미한다.

이 offset은 topic 내부의 __consumer_offsets에 저장하기에, 동일한 topic을 바라보는 consumer들은 이 offset 정보를 서로 공유한다.

3-4-1. Offset 개념 재정비하기

Offset은 너무나도 중요한 개념이기에 다시 정리해보겠다.

  • Kafka에서 Consumer가 특정 Partition에서 메시지를 어디까지 읽었는지 표시하는 "위치 포인터"(*Consumer은 Partition 단위로 메시지를 읽고 처리하므로)이다.
  • Offset = Consumer가 다음에 읽을 메시지의 위치, 참고로 Offset은 __consumer_offsets 라는 내부 토픽에 저장하며, topic 단위로 consumer group id를 논리적으로 구분하므로 consumer group의 consumer는 offset 정보를 공유한다.

이 Offset을 어떻게 Commit(기록)할지 정하는 게 enable-auto-commit이다.

3-4-2. auto commit = false의 중요성

이 offset을 기억하는 시점의 기본값은 주기적으로 메시지를 읽기 위해 poll하는 시점(메시지를 읽는 시점도 아니다)으로, 처리 이후의 시점이 아니기에 데이터 유실을 대비한다면 auto commit을 false로 하는 것이 중요하겠다.

  • enable-auto-commit: true (기본값)

Consumer가 메시지를 poll() 할 때마다 백그라운드에서 일정 주기(auto.commit.interval.ms, 기본 5초)로 자동 커밋한다.

치명적으로, 실제 비즈니스 로직이 끝나기 전에 Offset이 커밋될 수 있으므로, 장애 발생 시 읽지 못한 메시지는 그대로 유실한다.

즉, 처리해야 할 메시지는 아직 DB에 안 저장됐는데 Offset은 커밋한 경우, Consumer를 재시작하면 해당 메시지는 다시 안 읽기에 그대로 유실된다(데이터 유실).

  • enable-auto-commit: false

보통은 자동 커밋을 끄고, 애플리케이션이 직접 commitSync() 또는 commitAsync()를 호출하도록 하여 메시지를 읽고 데이터 처리가 모두 끝난 이후에 offset commit을 하는 전략을 주로 채택한다.

그렇기에 메시지 처리 로직이 끝난 후 커밋하도록 제어 가능하여, 최소한의 "처리 완료 보장"을 확보할 수 있고 장애 상황에서도 메시지 유실 가능성을 크게 줄일 수 있겠다.

단 중복 처리(한 메시지를 두 번 처리) 가능성 있는 점만 유의하여 처리한다.

4. Kafka Config

Kafka Config파일을 바탕으로 application level에서 Spring이 이를 인식하여 Factory 및 Container를 실행해주고, 최종적으로 Kafka와 통신할 수 있도록 해줄 수 있다.

@Configuration
public class KafkaConfig {
    /*
    * 메시지를 소비하는 Consumer를 구성하고 동작하는 전략을 구성해줄 수 있는 설정 클래스.
    * ConsumerFactory: application.yml 기반으로 KafkaConsumer 인스턴스를 만들어주는 설정
    * ConcurrentKafkaListenerContainerFactory: Consumer를 실행하는 ListenerContainer를 생성하는 공장 → 동작 전략(AckMode, concurrency 등)을 커스터마이징할 수 있음.
    * KafkaConfig 클래스: 이 Factory를 Bean으로 등록해, @KafkaListener가 어떤 실행 전략으로 메시지를 소비할지 지정.
    * */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory
    ) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); //manual commit
        return factory;
    }
}

이때 return값이 factory인데, factory는 Spring Application 측에서 Consumer 인스턴스를 생성하여 Kafka로부터 메시지를 읽고 처리하기 위한 하나의 생성인자라 할 수 있겠다.

여기서 application.yml의 설정 값들 (bootstrap-servers, group-id, key-deserializer, value-deserializer, enable-auto-commit 등)을 읽어오고, 최종적으로 application.yml 기반으로 KafkaConsumer를 만들어 주는 클래스이다.

4-1. KafkaListenerContainerFactory

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

용어가 생소하고 어려운데, 풀어 쓰자면

  • Concurrent → 병렬성 지원 (멀티 스레드(여러 Consumer들이) 기반으로 여러 Partition을 동시에 소비 가능(다수의 Consumer 인스턴스를 생성한다는 의미))
  • KafkaListenerContainer → 메시지를 poll 하여 @KafkaListener에 대해 필요한 메시지를 처리하기 위한 Consumer 인스턴스 생성 환경(Container)를 생성하며, application.yml에서 생성정보를 그대로 참조한다.
  • Factory → 이 Container를 생성하는 공장으로, Consumer 인스턴스가 생성되는 시점은 이벤트가 발생하여 Consumer가 이를 poll 및 감지하였을 시점이다.

이 Factory는 @KafkaListener 애노테이션이 붙은 메서드가 실행될 때 필요한 Consumer Container를 만들어주는 (말그대로) 공장이고, consumerFactory 파라미터는 이미 스프링이 application.yml의 설정정보를 참조한다.

4-2. ConsumerFactory

위에서 보면 알겠지만, ContainerFactory를 만들기 위해서는 ConsumerFactory 인자가 반드시 필요하다.

위에서 application.yml 정보를 참조하여 consumerfactory를 만든다고 하였는데, 말그대로 Consumer 인스턴스를 생성하기 위한 실체라 볼 수 있다.

참고로

  • factory.setConsumerFactory(consumerFactory);
  • factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

의 두가지 과정으로 consumer 정보를 읽어오고 commit 방법을 정한다.

이때 commit 방법의 경우, 기본적으로 auto commit이 false로 되어있어야 세부적인 ackMode 지정이 가능하다.

  • RECORD: 각 레코드 처리 직후 커밋
  • BATCH: poll() 해서 가져온 레코드 배치 단위로 커밋
  • TIME: 일정 시간마다 커밋
  • COUNT: 일정 개수마다 커밋
  • MANUAL: 직접 Acknowledgment.acknowledge() 호출해야 커밋
  • MANUAL_IMMEDIATE: acknowledge() 호출 즉시 커밋

(참고로 application.yml에서 auto-commit = true일 경우, Kafka 네이티브 Consumer가 poll하는 시점에 바로 commit을 진행하므로 위 설정은 의미가 없어진다.)

4-3. Consume를 만드는 과정 정리 - Kafka Consumer 생성은 3개의 layer로 구성할 수 있다.

이에 대한 이해를 바탕으로 Kafka Consumer 구성과정을 크게 3가지로 구분할 수 있겠다.

ConsumerFactory
→ "Kafka Consumer 객체를 어떻게 만들 건지" 정의 (application.yml 기반 설정을 읽어 실제 KafkaConsumer 인스턴스를 생성)

KafkaListenerContainer (Container)
→ Consumer를 감싸고 메시지를 지속적으로 poll 하면서, Consumer/리스너(@KafkaListener)에 메시지를 전달하는 실행 환경, 결국엔 Consumer 그 자체라고 생각하면 편하다.

ConcurrentKafkaListenerContainerFactory
→ ListenerContainer를 만드는 "공장(Factory)"
→ 병렬성(스레드 수), AckMode(커밋 방식) 같은 동작 전략을 여기서 지정

0개의 댓글