Kafka - Connect

develkkm·2025년 11월 12일
post-thumbnail

Kafka Connect란

Kafka Connect는 Kafka와 외부 시스템 간의 데이터를 실시간으로 연동하기 위한 표준화된 프레임워크다.

Kafka의 Producer나 Consumer를 직접 구현하지 않고도 데이터베이스, 파일, 클라우드 스토리지 등 다양한 데이터 소스에서 Kafka로 데이터를 수집하거나 Kafka 데이터를 외부 시스템으로 내보낼 수 있다.

데이터 통합(Integration)과 ETL(Extract, Transform, Load)을 단순화하기 위해 만들어졌으며, 확장성, 복원성, 표준화된 인터페이스를 제공한다.


Kafka Connect의 구성요소

Kafka Connect는 데이터 송수신 과정에서 다음 네 가지 구성요소를 중심으로 동작한다.

Connector

Connector는 Kafka와 외부 시스템 간의 데이터 연동 단위다.
두 가지 유형이 있으며 각각 다음 역할을 수행한다.

  • Source Connector
    외부 시스템에서 데이터를 읽어 Kafka 토픽으로 전송한다.
  • Sink Connector
    Kafka 토픽의 데이터를 외부 시스템으로 전송한다.

각 Connector는 여러 개의 Task로 분할되어 병렬로 실행될 수 있으며, 설정은 REST API를 통해 관리된다.


SMT (Single Message Transform)

SMT는 단일 메시지 단위에서 수행되는 변환 로직이다.

Kafka Connect는 메시지를 전송하기 전후에 데이터를 변형할 수 있도록 지원하며 변환 규칙은 Connector의 설정(Config)에 직접 명시된다.

  • 여러 SMT를 Chain 형태로 연속 적용할 수 있다.
  • Source → Kafka 혹은 Kafka → Sink 단계 어디에서든 적용할 수 있다.
  • 복잡한 데이터 변환에는 한계가 있지만, 필드 추출, 이름 변경, 포맷 변경 등 단순 가공에 적합하다.

SMT 예시 설정

{
  "transforms": "create_key, extract_key",
  "transforms.create_key.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.create_key.fields": "customer_id",
  "transforms.extract_key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
  "transforms.extract_key.field": "customer_id"
}

주요 SMT 클래스

SMT 클래스기능 설명
ValueToKeyValue 레코드의 필드를 Key로 변환
ExtractFieldStruct나 Map에서 특정 필드만 추출
RegexRouter정규식을 사용해 토픽명 변경

Converter

Converter는 Kafka Connect 내부에서 메시지를 직렬화(Serialization)·역직렬화(Deserialization) 하는 컴포넌트다.

Kafka Connect는 다양한 포맷의 데이터를 처리하기 위해 JSON, Avro, Protobuf 등을 지원한다.

주요 Converter 유형

Converter설명
JsonConverterJSON 포맷 기반, 구조가 단순하지만 레코드마다 스키마를 포함하면 메시지 크기가 커짐
AvroConverterSchema Registry를 통해 스키마 중복 제거, 효율적인 바이너리 포맷
ProtobufConverter구글 Protobuf 기반, 스키마 관리 및 타입 안정성 우수
StringConverter / ByteArrayConverter단순 문자열 또는 바이트 배열 변환용

Json + Schema 구조 예시

{
  "schema": {
    "type": "struct",
    "fields": [
      {"type": "int32", "field": "customer_id"},
      {"type": "string", "field": "email_address"},
      {"type": "string", "field": "full_name"}
    ],
    "name": "mysql02.oc.customers.Value"
  },
  "payload": {
    "customer_id": 864,
    "email_address": "testuser_864",
    "full_name": "testuser_864"
  }
}

기본 설정 예시

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
  • schemas.enable=true 설정 시 Schema 정보를 JSON에 포함한다.
  • 메시지 크기 증가를 방지하기 위해 Schema Registry를 이용해 Schema를 중앙 관리할 수 있다.

Config

Connector의 동작은 Config 파일 또는 REST API를 통해 설정된다.

Config에는 Connector의 클래스명, 실행 스레드 수, 대상 토픽, 변환(SMT), Converter 등이 포함된다.

Config 정보는 Kafka의 내부 토픽(connect-configs)에 저장되어, 재기동 후에도 자동으로 복원된다.


Kafka Connect 클러스터 구성요소

Kafka Connect는 분산 환경에서 다음 두 요소로 구성된다.

Task

Connector의 실행 단위 스레드로, 설정된 tasks.max 값에 따라 여러 개 생성된다.
각 Task는 병렬로 데이터 송수신을 수행하며, Worker에 의해 관리된다.

Worker

Kafka Connect의 실행 프로세스(JVM) 단위로, Connector와 Task를 관리한다.

Worker는 REST API를 통해 Connector를 생성, 수정, 삭제하며 Standalone 모드(단일 Worker) 또는 Distributed 모드(여러 Worker)로 구성할 수 있다.


REST API를 통한 관리

Kafka Connect는 REST API를 통해 Connector를 관리한다.

Method설명
GET현재 실행 중인 모든 Connector 목록, 설정(config), 상태(status) 조회
POSTConnector 생성 및 등록, 또는 개별 재기동
PUTConnector 일시정지, 재시작, 설정 갱신 및 유효성 검사
DELETEConnector 삭제

Connector 생성 시 HTTP 201 응답을 받더라도, 실제 로그를 통해 정상 기동 여부를 반드시 확인해야 한다.


Connector 생성 시 수행 프로세스

  1. REST API로 Connector 생성 요청(POST /connectors)
  2. Worker가 Connector Thread를 생성
  3. tasks.max에 명시된 수만큼 최대 Task Thread를 생성
  4. 각 Task Thread가 외부 데이터(Source) 또는 Kafka(Sink)와 연동
  5. Kafka Connect 내부 토픽에 오프셋, 상태, 설정을 저장

내부 토픽 구조

Kafka Connect는 Connector의 실행 상태, 설정, 오프셋 정보를 관리하기 위해 내부 토픽을 사용한다.

토픽명역할
connect-offsetsSource Connector별 메시지 전송 오프셋 저장 (중복 방지)
connect-configsConnector 설정 정보 저장 (재기동 시 복원)
connect-statusConnector 상태 정보 저장 (Running, Paused 등)
__consumer_offsetsSink Connector의 메시지 소비 오프셋 관리

Schema의 필요성

Kafka Connect의 연동 대상은 대부분 Schema를 가진 시스템이다.
데이터의 구조를 명확히 정의하지 않으면, Kafka와 외부 시스템 간 데이터 호환성이 떨어진다.

Schema를 사용하면 다음을 보장할 수 있다.

  • 데이터 타입 일관성 유지
  • 메시지 변환 및 역직렬화 안정성 확보
  • 데이터 포맷 호환성 유지 (JSON ↔ Avro ↔ Protobuf 변환 가능)

Kafka Connect는 Schema를 기반으로 데이터를 구조화하여 효율적으로 전송하며 Schema Registry와 결합할 경우 중복 Schema 전송 없이 경량화된 데이터 파이프라인을 구성할 수 있다.

profile
알던것을 더 확실하게

0개의 댓글