Kafka Connect

박태현·2025년 5월 20일

Kafka Connect


Kafka Connect는 Kafka Connector를 실행하는 프레임워크이자, 외부 시스템과 Kafka를 연결해주는 플랫폼

Kafka Cluster의 일부가 아니며, Kafka 외부에서 실행되는 독립적인 서비스입니다.

Kafka Connect의 구성 요소

  • Worker : Connector를 실행하고 관리하는 프로세스 ( Kafka Connect를 실제로 실행하는 서버 프로세스 )

  • Source Connector : 외부 시스템( DB 등 )에서 데이터를 읽어와 Kafka 토픽으로 Publish 합니다. ex ) Debezium

  • Sink Connector : Kafka 토픽에서 데이터를 구독하여 외부 시스템으로 Consume 합니다.

Kafka Connector

Kafka Connect에서 실제로 데이터를 가져오거나 보내는 동작을 수행하는 플러그인 ( 구현체 )

즉, 데이터를 어디서 가져오고 어디로 보낼지를 정의한 도구

Kafka Connect의 구조


Kafka Connect

Kafka Connect는 클러스터 형태로 동작하며, 그 안에는 여러 Worker가 존재하고, 각 Worker는 Connector에 속하는 Task를 사용하여 작업을 실행합니다.

각 Worker에는 Source Connector와 Sink Connector가 존재하고, 각 Connector는 실제 작업을 수행하는 여러 Task를 포함하며, 같은 Connector가 만든 여러 Task를 여러 Worker가 나누어 실행합니다.

이를 통해 부하를 분산하며, 특정 Worker에 장애가 발생하더라도 대비할 수 있습니다.

즉, 작업을 실제로 수행하는 실행 단위는 Connector에 속한 여러 Task이며, 각 Task는 논리적으로 동일한 Connector에 포함되지만, 실행될 때는 여러 Worker에 분산 배치되어 병렬로 처리됩니다.

Kafka Connector

데이터 이동 작업을 정의하는 설정 개체이며, 데이터를 실질적으로 처리하는 task를 관리합니다.

  • Source Connector : 외부 시스템의 데이터를 Kafka로 전송하는 Producer 역할을 하는 커넥터

  • Sink Connector : Kafka에 존재하는 데이터를 외부 시스템으로 내보내는 Consumer 역할을 하는 커넥터

Kafka Connect의 내부 동작 원리


데이터 처리가 시작되면 워커에 배치된 Connector가 해당 작업에 필요한 여러 Task를 생성하고, 이 Task들이 실제 파이프라인을 실행합니다.

각 Task는 외부 시스템에서 데이터를 읽거나 쓰는 과정에서 Converter를 통해 메시지를 변환한 뒤 Kafka로 전송을 수행합니다.

ex ) Debezium MySQL Connector에 의한 MySQL → Kafka 이벤트 발행 흐름

MySQL ( binlog ) → Debezium MySQL Connector → Kafka Connect Worker → Kafka Topic → Consumer

  1. MySQL ( binlog )에서 변경이 발생 ( INSERT, UPDATE, DELETE )

    Debezium은 쿼리를 직접 polling 하지 않고, binlog를 구독하여 변경을 읽습니다.

    ⇒ DB의 부하 적고, 실시간 반영이 가능하며 트랜잭션의 순서를 보장합니다.

  2. Debezium MySQL Connector가 binlog 읽습니다.

    Debezium MySQL Connector가 Kafka Connect Worker에 의해 실행되며 MySQL binlog를 지속적으로 tailing하여 변경사항을 추출하고 Debezium 이벤트 구조로 변환합니다.

  3. Kafka Connect → Kafka Topic에 이벤트 발행

  4. Kafka Consumer가 이벤트를 읽어서 발행 처리

Kafka Connect의 모드


  • Standalone Mode ( 단일 모드 )

    Worker가 1개만 실행되는 방식으로 하나의 프로세스 안에서 Connector와 Task를 전부 실행합니다.

  • Distributed Mode ( 분산 모드 ) 여러 Worker가 클러스터 형태로 실행되는 방식 장애가 발생하면 다른 Worker가 Task를 재할당하여 고가용성을 제공합니다.

Kafka Connect 실행


Debezium Connector for MySQL 플러그인 설치

https://debezium.io/releases/1.5/ Debezium 사이트에서 MySQL 커넥터 플러그인을 다운로드

debezium-connector-mysql-1.5.4.Final-plugin.tar.gz을 다운로드

압축을 풀면 아래와 같은 파일들이 존재

debezium-connector-mysql/CHANGELOG.md
debezium-connector-mysql/CONTRIBUTE.md
debezium-connector-mysql/COPYRIGHT.txt
debezium-connector-mysql/LICENSE-3rd-PARTIES.txt
debezium-connector-mysql/LICENSE.txt
debezium-connector-mysql/README.md
debezium-connector-mysql/README_ZH.md
debezium-connector-mysql/debezium-core-1.5.4.Final.jar
debezium-connector-mysql/debezium-api-1.5.4.Final.jar
debezium-connector-mysql/guava-30.0-jre.jar
debezium-connector-mysql/failureaccess-1.0.1.jar
debezium-connector-mysql/debezium-ddl-parser-1.5.4.Final.jar
debezium-connector-mysql/antlr4-runtime-4.7.2.jar
debezium-connector-mysql/mysql-binlog-connector-java-0.25.1.jar
debezium-connector-mysql/mysql-connector-java-8.0.21.jar
debezium-connector-mysql/debezium-connector-mysql-1.5.4.Final.jar

plugin.path 경로 수정


카프카 컨테이너에 접속하여 /opt/kafka/config/connect-distributed.properties 파일을 수정

( Kafka 커넥터 실행 환경을 구성 )

Kafka Connect는 Connector를 외부 플러그인 형태( JAR )로 동작시킴

⇒ plugin.path는 Kafka Connect가 어디에서 커넥터를 로딩할지 지정하는 설정

kafka 파일/config/connect-distributed.properties 내의 plugin.path를 /Users/taehyun.park/Downloads으로 설정

⇒ 이전에 다운로드 한 debezium-connector-mysql 폴더 내의 jar 파일을 읽도록 함

왜 경로를 …/Downloads/debezium-connector-mysql로 하지 않는가 ??

Kafka Connect는 plugin.path 안의 하위 디렉토리들을 스캔하므로 plugin.path에 지정된 경로 자체에 JAR 파일이 있으면 인식하지 못하기 때문

커넥터를 재시작하고 Kafka Connect에 현재 로드된 커넥터( 플러그인 )의 목록을 확인하는 명령어인

curl -s http://localhost:8083/connector-plugins | jq 를 통해 플러그인 목록을 확인해보면

taehyun.park@bagtaegyeong-ui-MacBookAir ~ % curl -s http://localhost:8083/connector-plugins | jq
[
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "3.6.0"
  },
  {
    "class": "io.debezium.connector.mysql.MySqlConnector",
    "type": "source",
    "version": "1.5.4.Final"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "3.6.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "3.6.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "3.6.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "3.6.0"
  }
]

성공적으로 Debezium MySQL Connector 로드 가능

Kafka Connect 서버( worker ) 실행

터미널에서 Kafka 파일 경로에서

./bin/connect-distributed.sh config/connect-distributed.properties 명령어 실행

Connector 등록 및 실행 ( Debezium MySQL Connector를 설치해야 함 )

curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
  "name": "event-push-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "tiffndla0423",
    "database.server.id": "184054",
    "database.server.name": "queue_system_db",
    "database.allowPublicKeyRetrieval": "true",
    "database.include.list": "queue_system_db",
    "table.include.list": "queue_system_db.outbox",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "dbhistory.queueing_system_db",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "transforms": "unwrap,addTopicPrefix",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.addTopicPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.addTopicPrefix.regex": "(.*)",
    "transforms.addTopicPrefix.replacement": "$1"
  }
}'

[ 성공적으로 실행 완료 ]

Connect 관련 명령어

**특정 Connector 삭제**
curl --location --request DELETE 'http://localhost:8083/connectors/event-push-connector'

**Connector 목록 조회**
curl --location --request GET 'http://localhost:8083/connectors'
profile
꾸준하게

0개의 댓글