
Kafka Connect는 Kafka와 외부 시스템 간의 데이터를 실시간으로 연동하기 위한 표준화된 프레임워크다.
Kafka의 Producer나 Consumer를 직접 구현하지 않고도 데이터베이스, 파일, 클라우드 스토리지 등 다양한 데이터 소스에서 Kafka로 데이터를 수집하거나 Kafka 데이터를 외부 시스템으로 내보낼 수 있다.
데이터 통합(Integration)과 ETL(Extract, Transform, Load)을 단순화하기 위해 만들어졌으며, 확장성, 복원성, 표준화된 인터페이스를 제공한다.
Kafka Connect는 데이터 송수신 과정에서 다음 네 가지 구성요소를 중심으로 동작한다.
Connector는 Kafka와 외부 시스템 간의 데이터 연동 단위다.
두 가지 유형이 있으며 각각 다음 역할을 수행한다.
각 Connector는 여러 개의 Task로 분할되어 병렬로 실행될 수 있으며, 설정은 REST API를 통해 관리된다.

SMT는 단일 메시지 단위에서 수행되는 변환 로직이다.
Kafka Connect는 메시지를 전송하기 전후에 데이터를 변형할 수 있도록 지원하며 변환 규칙은 Connector의 설정(Config)에 직접 명시된다.
{
"transforms": "create_key, extract_key",
"transforms.create_key.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.create_key.fields": "customer_id",
"transforms.extract_key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extract_key.field": "customer_id"
}
| SMT 클래스 | 기능 설명 |
|---|---|
| ValueToKey | Value 레코드의 필드를 Key로 변환 |
| ExtractField | Struct나 Map에서 특정 필드만 추출 |
| RegexRouter | 정규식을 사용해 토픽명 변경 |

Converter는 Kafka Connect 내부에서 메시지를 직렬화(Serialization)·역직렬화(Deserialization) 하는 컴포넌트다.
Kafka Connect는 다양한 포맷의 데이터를 처리하기 위해 JSON, Avro, Protobuf 등을 지원한다.
| Converter | 설명 |
|---|---|
| JsonConverter | JSON 포맷 기반, 구조가 단순하지만 레코드마다 스키마를 포함하면 메시지 크기가 커짐 |
| AvroConverter | Schema Registry를 통해 스키마 중복 제거, 효율적인 바이너리 포맷 |
| ProtobufConverter | 구글 Protobuf 기반, 스키마 관리 및 타입 안정성 우수 |
| StringConverter / ByteArrayConverter | 단순 문자열 또는 바이트 배열 변환용 |
{
"schema": {
"type": "struct",
"fields": [
{"type": "int32", "field": "customer_id"},
{"type": "string", "field": "email_address"},
{"type": "string", "field": "full_name"}
],
"name": "mysql02.oc.customers.Value"
},
"payload": {
"customer_id": 864,
"email_address": "testuser_864",
"full_name": "testuser_864"
}
}
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
schemas.enable=true 설정 시 Schema 정보를 JSON에 포함한다. Connector의 동작은 Config 파일 또는 REST API를 통해 설정된다.
Config에는 Connector의 클래스명, 실행 스레드 수, 대상 토픽, 변환(SMT), Converter 등이 포함된다.
Config 정보는 Kafka의 내부 토픽(connect-configs)에 저장되어, 재기동 후에도 자동으로 복원된다.
Kafka Connect는 분산 환경에서 다음 두 요소로 구성된다.
Connector의 실행 단위 스레드로, 설정된 tasks.max 값에 따라 여러 개 생성된다.
각 Task는 병렬로 데이터 송수신을 수행하며, Worker에 의해 관리된다.
Kafka Connect의 실행 프로세스(JVM) 단위로, Connector와 Task를 관리한다.
Worker는 REST API를 통해 Connector를 생성, 수정, 삭제하며 Standalone 모드(단일 Worker) 또는 Distributed 모드(여러 Worker)로 구성할 수 있다.
Kafka Connect는 REST API를 통해 Connector를 관리한다.
| Method | 설명 |
|---|---|
| GET | 현재 실행 중인 모든 Connector 목록, 설정(config), 상태(status) 조회 |
| POST | Connector 생성 및 등록, 또는 개별 재기동 |
| PUT | Connector 일시정지, 재시작, 설정 갱신 및 유효성 검사 |
| DELETE | Connector 삭제 |
Connector 생성 시 HTTP 201 응답을 받더라도, 실제 로그를 통해 정상 기동 여부를 반드시 확인해야 한다.
POST /connectors) tasks.max에 명시된 수만큼 최대 Task Thread를 생성 Kafka Connect는 Connector의 실행 상태, 설정, 오프셋 정보를 관리하기 위해 내부 토픽을 사용한다.
| 토픽명 | 역할 |
|---|---|
| connect-offsets | Source Connector별 메시지 전송 오프셋 저장 (중복 방지) |
| connect-configs | Connector 설정 정보 저장 (재기동 시 복원) |
| connect-status | Connector 상태 정보 저장 (Running, Paused 등) |
| __consumer_offsets | Sink Connector의 메시지 소비 오프셋 관리 |
Kafka Connect의 연동 대상은 대부분 Schema를 가진 시스템이다.
데이터의 구조를 명확히 정의하지 않으면, Kafka와 외부 시스템 간 데이터 호환성이 떨어진다.
Schema를 사용하면 다음을 보장할 수 있다.
Kafka Connect는 Schema를 기반으로 데이터를 구조화하여 효율적으로 전송하며 Schema Registry와 결합할 경우 중복 Schema 전송 없이 경량화된 데이터 파이프라인을 구성할 수 있다.