[Kafka] 로컬 환경 클러스터 구축기 feat. Debezium CDC

neo-the-cow·2024년 4월 19일
0

Kafka

목록 보기
1/1
post-thumbnail

0. 들어가기 전에

  • 2024년 4월 드디어 Apache에서 Kafka 도커 이미지를 공식 지원하기 시작했습니다. - 3.7.0 릴리즈 노트
  • 로컬환경에서 사용하던 오픈소스 이미지bitnami/kafkaapache/kafka 이미지로 마이그레이션했습니다.
  • Zookeeper에 의존하지 않는 Kraft 모드로 Kafka Cluster를 구성합니다.
  • docker-compose로 작성하고 컨테이너를 활성화 합니다.

🛠️ 실습 환경

OS: macOS 14.4.1

📝 실습 목표

Apache 공식 Kafka 이미지로 Kraft 모드 Kafka 클러스터 구축하기
부록: Debezium Connector 설정

1. Kafka Cluster 구축하기

1.1 docker-compose.yml 작성하기

  • 로컬 환경에서 실행할 파일 docker-compose-local.yml을 생성하고 다음과 같이 작성합니다.
version: "3.9"

services:

  # ...
  # 다른 서비스들 설정 작성 ex) mysql, mongodb, redis or some others you needed...
  # ...

  # Kafka
  kafka-00:
    image: apache/kafka:3.7.0
    ports:
      - "9092:9092"
    volumes:
      - ./docker-volume/kafka/secrets:/etc/kafka/secrets
      - ./docker-volume/kafka/config:/mnt/shared/config
    environment:
      CLUSTER_ID: "event-broker"
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-00:29092,2@kafka-01:29093,3@kafka-02:29094"
      KAFKA_LISTENERS: "PLAINTEXT://:19092,CONTROLLER://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-00:19092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT'
      KAFKA_PROCESS_ROLES: 'broker,controller'
    networks:
      - local-infra

  kafka-01:
    image: apache/kafka:3.7.0
    ports:
      - "9093:9093"
    volumes:
      - ./docker-volume/kafka/secrets:/etc/kafka/secrets
      - ./docker-volume/kafka/config:/mnt/shared/config
    environment:
      CLUSTER_ID: "event-broker"
      KAFKA_NODE_ID: 2
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-00:29092,2@kafka-01:29093,3@kafka-02:29094"
      KAFKA_LISTENERS: "PLAINTEXT://:19093,CONTROLLER://:29093,EXTERNAL://:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-01:19093,EXTERNAL://localhost:9093"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT'
      KAFKA_PROCESS_ROLES: 'broker,controller'
    networks:
      - local-infra

  kafka-02:
    image: apache/kafka:3.7.0
    ports:
      - "9094:9094"
    volumes:
      - ./docker-volume/kafka/secrets:/etc/kafka/secrets
      - ./docker-volume/kafka/config:/mnt/shared/config
    environment:
      CLUSTER_ID: "event-broker"
      KAFKA_NODE_ID: 3
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-00:29092,2@kafka-01:29093,3@kafka-02:29094"
      KAFKA_LISTENERS: "PLAINTEXT://:19094,CONTROLLER://:29094,EXTERNAL://:9094"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-02:19094,EXTERNAL://localhost:9094"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT'
      KAFKA_PROCESS_ROLES: 'broker,controller'
    networks:
      - local-infra

  # Kafka UI
  # Just for monitoring...
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka-00
      - kafka-01
      - kafka-02
    ports:
      - "9090:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-00:19092,kafka-01:19093,kafka-02:19094
    networks:
      - local-infra

networks:
  local-infra:
    driver: bridge

1.1.1 Kafka Environments

  • CLUSTER_ID: Kafka 클러스터의 이름을 지정합니다. 클러스터 내에서 유니크해야합니다.
  • KAFKA_NODE_ID: 카프카 인스턴스의 식별자 입니다. 카프카 클러스터 내에서 유니크해야합니다.
  • KAFKA_CONREOLLER_LISTENER_NAMES: 카프카 컨트롤러의 이름입니다. Kraft모드에서 필수값입니다.
  • KAFKA_CONTROLLER_QUORUM_VOTERS: 동작중인 프라이머리 노드가 죽었을때 투표할 인스턴스를 등록합니다.
  • KAFKA_LISTENERS: 카프카 브로커가 내부적으로 요청을 바인딩하는 주소입니다.
  • KAFKA_ADVERTIESE_LISTENERS: 카프카 브로커를 외부(컨슈머)에 노출 시킬 주소입니다.
  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 각 리스너에서 통신을 수행할 네트워크 인터페이스를 정의합니다.
  • KAFKA_PROCESS_ROLES: 카프카 인스턴스가 어떤 역할을 수행할 지 정의합니다.

1.1.2 Kafka-ui

  • 브로커의 상태와 토픽, 메시지 등 모니터링의 편의성을 위해 오픈소스 gui 툴을 사용합니다.
  • 필요없다면 정의하지 않아도 됩니다.

1.2 컨테이너 실행

  • 위에서 작성한 docker-compose-local.yml 파일이 있는 디렉토리에서 다음 명령어로 컨테이너를 실행합니다.
% docker-compose -f docker-compose-local.yml up -d
  • 컨테이너가 정상적으로 준비되면 kafka-ui를 통해 상태를 확인 합니다.
  • localhost:9090 또는 kafka-ui를 설정한 주소로 접속합니다.

2. 부록: Debezium Connect

  • 읽기DB쓰기DB를 분리하는 CQRS패턴을 공부하며 두 DB간 데이터 동기화하기위해 CDC를 구축하려했습니다.
  • 쓰기DB의 변경사항을 Kafka Event로 발행하고, 발행된 이벤트를 소비하며 읽기DB에 동기화합니다.
  • Debezium쓰기DB의 변경사항을 실시간으로 감지해 Kafka로 이벤트를 발행합니다.

🛠️ 실습 환경

OS: macOS 14.4.1
쓰기DB: MySQL 8.0.33
CDC 도구: Debezium connect 2.6

📝 실습 목표

Debezium을 활용해 쓰기DB의 변경 사항을 Kafka 이벤트로 발행하는 CDC(Change Data Capture)환경을 구축합니다.

💬 읽기DB로의 동기화 환경은 구축하지 않나요?

  • 읽기DB와 쓰기DB가 이종의 데이터베이스일지라도 같은 테이블구조(스키마)를 가진다면 Debezium Sink Connector로 처리가 가능합니다.
  • 하지만, 예를들어 A, B, C 테이블을 가지는 쓰기DBA-B테이블을 반정규화한 AAC 테이블을 가지는 읽기DB의 경우 변경사항을 조합해 직접 처리해주는 동기화 과정이 필요합니다.
    따라서 이번 포스팅에서는 데이터 변경을 감지하고 Kafka에 발행하는 과정까지만 다루겠습니다.

2.1 데이터 흐름

2.2 docker-compose.yml 작성하기

  • 이전에 작성해둔 docker-compose-local.ymlDebeziumMySQL 컨테이너 내용을 추가로 작성합니다.

  # ...
  # 앞서 작성한 내용들
  # ...

  # MySQL
  mysql:
    image: mysql:8.0.33
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: local
      TZ: UTC
      MYSQL_DATABASE: test
    command:
      - --character-set-server=utf8mb4
      - --collation-server=utf8mb4_0900_ai_ci
    volumes:
      - ./docker-volume/mysql:/var/lib/mysql
    networks:
      - local-infra

  # Debezium
  debezium:
    image: debezium/connect:2.6
    ports:
      - "8083:8083"
    depends_on:
      - mysql
      - kafka-00
      - kafka-01
      - kafka-02
    environment:
      - BOOTSTRAP_SERVERS=kafka-00:19092,kafka-01:19093,kafka-02:19094
      - GROUP_ID=debezium-00
      - CONFIG_STORAGE_TOPIC=DEBEZIUM_CONNECT_CONFIGS
      - OFFSET_STORAGE_TOPIC=DEBEZIUM_CONNECT_OFFSETS
      - STATUS_STORAGE_TOPIC=DEBEZIUM_CONNECT_STATUSES
    networks:
      - local-infra

🚨 주의

Debezium의 포트번호는 8083으로 지정합니다.
다른 포트를 바인딩 하는 경우 DebeziumRest Api가 제대로 동작하지 않는 문제가 있었습니다.
추측컨데 내부적으로 8083포트를 사용하기때문으로 보입니다.
혹시 원인을 알고계시거나 다른 포트로 설정해도 된다면 알려주세요... ㅠㅠ

2.3 Debezium Properties 설정하기

  • /connectors 경로로 POST 요청을 보냅니다.
  • 요청 바디에는 설정을 위한 프로퍼티를 Json형태로 작성합니다.
  • 공식문서를 바탕으로 이해한 각 설정값을 간략히 설명하자면 다음과 같습니다.
    • name: 생성하려는 커넥터의 이름입니다. Debezium내에서 유니크해야합니다.
    • connector.class: 연결에 사용할 커넥터입니다. 지원하는 데이터베이스의 드라이버역할인듯 합니다.
    • database.hostname: Debezium이 변경사항을 감시할 DB의 호스트입니다. 로컬 머신에 직접 설치를 했다면 localhost나 DB가 위치한 엔드포인트 주소를 입력합니다.
    • topic.prefix: 커넥터가 생성할 토픽의 접두어를 설정할 수 있습니다. 토픽 네이밍 규칙은 기본적으로 {prefix}.{database}.{tableName}을 따릅니다.
    • tombstones.on.delete: 데이터 레코드가 삭제될 때 이벤트 메시지 밸류가 빈 메시지를 발행할지 여부를 체크합니다. 기본값은 true입니다.
    • provide.transaction.metadata: 이벤트 메시지를 발행할때 트랜잭션의 메타데이터로 포함해서 발행할 지 여부를 나타냅니다. 기본값은 false입니다.

📌 참고

커넥터 생성 요청이 지연된다면 Kafka 클러스터에서 지연이 발생하고있을 가능성이 큽니다.

2.4 토픽 생성 및 메시지 발행 확인

  • 커넥터까지 생성됐다면 준비가 끝났습니다.
  • 쓰기DB의 변경사항에 따른 이벤트 메시지가 정상적으로 발행되는지 kafka-ui나 여타 도구를 활용해 확인합니다.

3. 맺음말

Apache 공식 Kafka 이미지가 배포되고 이를 활용한 로컬 환경 인프라 구축과 Debezium을 통해 Source DB의 변경사항을 감지하고 Kafka 이벤트 메시지를 발행하는 CDC환경을 구축했습니다.
환경 구축 과정에서 이상한점, 문제점 등 이슈가 발견되거나 잘 안되는 부분이 있면 댓글이나 이메일로 알려주세요. 최대한 빨리 확인해서 반영하고 답변하겠습니다. :)

profile
Hi. I'm Neo

0개의 댓글