
Apache Kafka Connect는 카프카와 외부 시스템 간의 데이터를 안정적이고 확장 가능하게 스트리밍하기 위한 프레임워크입니다. 복잡한 코드 작성 없이 다양한 데이터 소스와 카프카를 손쉽게 연결할 수 있어, 데이터 파이프라인 구축을 단순화합니다.
기존에는 데이터를 카프카로 보내거나 카프카에서 데이터를 가져오기 위해 매번 Producer와 Consumer 애플리케이션을 직접 개발해야 했습니다. 이런 반복적인 작업은 다음과 같은 문제를 야기했습니다:
Kafka Connect는 이러한 문제를 해결하기 위해 표준화된 방식으로 데이터 통합을 제공합니다.
Kafka Connect는 다음과 같은 주요 컴포넌트로 구성됩니다:
커넥터는 Kafka Connect의 핵심 논리 단위로, 데이터 복사 작업을 정의합니다. 두 가지 유형이 있습니다:
커넥터가 생성하는 실제 작업 단위입니다. 병렬 처리를 위해 여러 태스크로 분산될 수 있습니다.
커넥터와 태스크를 실행하는 프로세스입니다. 두 가지 실행 모드를 지원합니다:
데이터 형식을 변환합니다 (예: JSON, Avro, String).
데이터를 처리하는 과정에서 간단한 변환 작업을 수행합니다.
외부 시스템 → Source Connector → Kafka Topic → Sink Connector → 외부 시스템
Kafka Connect는 다양한 사전 제작된 커넥터를 제공합니다:
관계형 데이터베이스와의 통합을 위한 가장 널리 사용되는 커넥터입니다.
Source Connector:
Sink Connector:
{
"name": "jdbc-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "user",
"connection.password": "password",
"table.whitelist": "users,orders",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "mysql-"
}
}
Kafka의 데이터를 Elasticsearch로 전송하여 검색 및 분석을 가능하게 합니다.
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "logs",
"connection.url": "http://localhost:9200",
"type.name": "_doc"
}
}
Kafka 토픽의 데이터를 Amazon S3에 저장합니다 (데이터 레이크 구축).
NoSQL 데이터베이스인 MongoDB와의 양방향 데이터 전송을 지원합니다.
빅데이터 처리를 위해 Hadoop HDFS에 데이터를 저장합니다.
로컬 파일 시스템과 Kafka 간 데이터 전송 (개발/테스트용).
실시간으로 데이터베이스의 변경사항을 감지하고 Kafka로 스트리밍합니다.
활용 예시:
MySQL (Orders Table) → JDBC Source Connector → Kafka (orders topic)
사용 커넥터: Debezium MySQL Connector, JDBC Source Connector
분산 시스템의 로그를 중앙 집중식으로 수집하고 분석합니다.
활용 예시:
Application Logs → FileStream Source → Kafka → Elasticsearch Sink → Kibana
대량의 데이터를 저렴한 저장소에 보관하고 나중에 분석합니다.
활용 예시:
Kafka Topics → S3 Sink Connector → Amazon S3 → AWS Athena
데이터를 실시간으로 검색 엔진에 색인화합니다.
활용 예시:
Product DB → Source Connector → Kafka → Elasticsearch Sink → 검색 서비스
모든 상태 변경을 이벤트로 저장하는 패턴을 구현합니다.
활용 예시:
운영 데이터베이스의 데이터를 분석용 데이터 웨어하우스로 전송합니다.
활용 예시:
OLTP DB → Source Connector → Kafka → BigQuery Sink → BI Tool (Tableau)
서로 다른 마이크로서비스의 데이터베이스를 동기화합니다.
활용 예시:
IoT 디바이스에서 생성되는 대량의 센서 데이터를 수집합니다.
활용 예시:
개발 및 테스트 환경에 적합합니다.
# Connect 설정 파일
vi config/connect-standalone.properties
# Bootstrap 서버 설정
bootstrap.servers=localhost:9092
# 데이터 형식 변환 설정
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# 오프셋 저장 위치
offset.storage.file.filename=/tmp/connect.offsets
# Standalone 모드로 실행
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/connector-config.properties
프로덕션 환경에서 권장되는 방식입니다.
# Distributed 설정 파일
vi config/connect-distributed.properties
# 클러스터 식별자
group.id=connect-cluster
# Bootstrap 서버
bootstrap.servers=localhost:9092
# 내부 토픽 설정
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
# 복제 팩터
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
# REST API 포트
rest.port=8083
# Distributed 모드로 실행
bin/connect-distributed.sh config/connect-distributed.properties
Kafka Connect는 RESTful API를 제공하여 커넥터를 관리할 수 있습니다.
curl http://localhost:8083/connectors
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "mysql-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "user",
"connection.password": "password",
"table.whitelist": "users",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "mysql-"
}
}'
curl http://localhost:8083/connectors/mysql-source/status
curl -X DELETE http://localhost:8083/connectors/mysql-source
# 일시 중지
curl -X PUT http://localhost:8083/connectors/mysql-source/pause
# 재개
curl -X PUT http://localhost:8083/connectors/mysql-source/resume
{
"name": "mysql-source-users",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/ecommerce",
"connection.user": "kafka_user",
"connection.password": "secure_password",
"table.whitelist": "users,orders",
"mode": "timestamp+incrementing",
"timestamp.column.name": "updated_at",
"incrementing.column.name": "id",
"topic.prefix": "mysql-",
"poll.interval.ms": "5000",
"batch.max.rows": "100"
}
}
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @mysql-source-config.json
# Kafka 토픽 확인
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 메시지 확인
bin/kafka-console-consumer.sh \
--topic mysql-users \
--from-beginning \
--bootstrap-server localhost:9092
프로덕션 환경에서는 항상 분산 모드를 사용하여 고가용성을 확보하세요.
{
"tasks.max": "3" // 병렬 처리를 위해 적절한 수 설정
}
{
"batch.max.rows": "500", // 한 번에 처리할 레코드 수
"poll.interval.ms": "5000" // 폴링 간격
}
{
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"errors.deadletterqueue.topic.name": "dlq-topic"
}
{
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081"
}
기존 커넥터가 요구사항을 만족하지 못할 경우, 커스텀 커넥터를 개발할 수 있습니다.
public class CustomSourceConnector extends SourceConnector {
@Override
public void start(Map<String, String> props) {
// 초기화 로직
}
@Override
public Class<? extends Task> taskClass() {
return CustomSourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
// 태스크 설정 반환
return configs;
}
@Override
public void stop() {
// 정리 로직
}
@Override
public ConfigDef config() {
// 설정 정의 반환
return CONFIG_DEF;
}
@Override
public String version() {
return "1.0.0";
}
}
# 압축 활성화
compression.type=lz4
# 배치 크기 증가
batch.size=32768
linger.ms=10
export KAFKA_HEAP_OPTS="-Xms2G -Xmx4G"
{
"tasks.max": "8", // CPU 코어 수에 맞춰 설정
"consumer.max.poll.records": "1000"
}
# 로그 확인
tail -f logs/connect.log
# 커넥터 상태 확인
curl http://localhost:8083/connectors/my-connector/status
offset.storage.topic 토픽 삭제 후 재시작| 비교 항목 | Kafka Connect | Custom Producer/Consumer | ETL 도구 |
|---|---|---|---|
| 개발 난이도 | 낮음 (설정만) | 높음 (코딩 필요) | 중간 |
| 유지보수 | 쉬움 | 어려움 | 중간 |
| 확장성 | 매우 높음 | 높음 | 중간 |
| 실시간 처리 | 우수 | 우수 | 배치 위주 |
| 표준화 | 높음 | 낮음 | 높음 |
| 커뮤니티 지원 | 우수 | - | 도구별 상이 |
Kafka Connect를 마스터했다면 다음 주제를 학습해보세요:
Kafka Connect는 복잡한 데이터 통합 작업을 간소화하고 표준화하는 강력한 도구입니다. 코드 작성 없이 설정만으로 다양한 시스템과 Kafka를 연결할 수 있어, 개발 시간을 크게 단축하고 유지보수 부담을 줄일 수 있습니다.
프로덕션 환경에서는 분산 모드로 실행하고, 적절한 모니터링과 에러 핸들링을 설정하며, 스키마 레지스트리를 활용하여 데이터 품질을 보장하는 것이 중요합니다.
실시간 데이터 파이프라인 구축을 위해 Kafka Connect를 적극 활용해보세요! 🚀