1️⃣ Kafka 확장 API 개요

Kafka는 기본적으로 ProducerConsumer를 통해
데이터를 송수신하는 메시징 시스템이다.
하지만 이것만으로는 대규모 파이프라인 설계
데이터 품질 관리, 실시간 분석을 구현하기 어렵다.

이를 해결하기 위해 Kafka는 세 가지 확장 API를 제공한다.

확장 API목적주요 역할
Kafka Connect외부 시스템 연동DB·API·파일 등 외부 소스와 싱크를 연결
Kafka Streams실시간 스트림 처리Kafka 내부 토픽 간 데이터 변환·집계
Schema Registry스키마 관리Avro/JSON/Protobuf 데이터 형식 검증 및 버전 관리

이 세 구성요소를 결합하면,
Kafka는 완전한 데이터 파이프라인 플랫폼으로 확장된다.


2️⃣ Kafka Connect: 데이터 파이프라인 자동화

Kafka Connect는 코드 없이 데이터를 주고받는 프레임워크다.
기존 프로듀서·컨슈머를 직접 작성하지 않아도
데이터 소스와 싱크를 연결할 수 있다.

📘 구조 개요

[Source Connector] → [Kafka Cluster] → [Sink Connector]
  • Source Connector: 외부 시스템 → Kafka
    (예: PostgreSQL, Wikimedia, Twitter)
  • Sink Connector: Kafka → 외부 시스템
    (예: ElasticSearch, S3, MongoDB)

📦 실습 예시: Wikimedia → Kafka → OpenSearch

① Wikimedia Source Connector

위키미디아의 SSE 스트림을 Kafka에 자동으로 입력한다.

  • 커넥터 클래스: io.conduktor.kafka.connect.wikimedia.WikimediaSourceConnector
  • 설정 파일 예시:
name=wikimedia-source
connector.class=io.conduktor.kafka.connect.wikimedia.WikimediaSourceConnector
topic.name=wikimedia.recentchange.connect

② ElasticSearch Sink Connector

Kafka의 wikimedia.recentchange 데이터를
OpenSearch 인덱스로 자동 전송한다.

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
topics=wikimedia.recentchange
connection.url=http://localhost:9200
key.ignore=true
type.name=kafka-connect

📌 실행 명령

connect-standalone.sh \
config/connect-standalone.properties \
config/wikimedia.properties \
config/elasticsearch.properties

Kafka Connect는 ETL(Extract–Transform–Load) 의 “E”와 “L”을 담당한다.
소스와 싱크만 정의하면, 데이터 이동은 자동으로 수행된다.


💡 장점 요약

항목내용
코드 불필요설정 파일만으로 데이터 이동
커넥터 재사용Confluent Hub에서 200+ 커넥터 제공
확장성Worker 노드 추가로 수평 확장 가능
안정성내장 오프셋 저장, 재시도·재개 가능

💡 Tip:
새로운 커넥터를 직접 만들기보다,
Confluent Hub에서 검증된 커넥터를 우선 확인하라.


3️⃣ Kafka Streams: 실시간 데이터 처리 API

Kafka Streams는 Kafka 내부 토픽 간 변환·집계를 위한 라이브러리다.
별도의 클러스터가 필요 없고,
일반 Java 애플리케이션 수준으로 스트림 분석을 구현할 수 있다.

📘 기본 구조

Input Topic → Stream Processing → Output Topic

💡 주요 기능

기능설명
Transform데이터 필터링·매핑·그룹화
Aggregation실시간 집계 및 윈도우 연산
Join여러 토픽의 데이터를 결합
State Store내부 상태 저장 (changelog 토픽으로 복구 가능)

📊 실습 예시: Wikimedia 스트림 통계

입력: wikimedia.recentchange
출력: wikimedia.stats.bots, wikimedia.stats.websites, wikimedia.stats.timeseries

  • 봇과 사람 편집 수 비교
  • 웹사이트별 변경 횟수 집계
  • 10초 윈도 단위 이벤트 수 계산
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input = builder.stream("wikimedia.recentchange");

// 로봇 여부 기준 분기
KStream<String, String>[] branches = input.branch(
  (key, value) -> value.contains("\"bot\":true"),
  (key, value) -> value.contains("\"bot\":false")
);

branches[0].to("wikimedia.stats.bots");
branches[1].to("wikimedia.stats.humans");

// 웹사이트별 집계
input.groupByKey()
     .count(Materialized.as("website-counts"))
     .toStream()
     .to("wikimedia.stats.websites");

💡 Kafka Streams의 강점

  • 배치가 아닌 완전 실시간 처리
  • 장애 시 자동 복구 (changelog 토픽)
  • 정확히 한 번(Exactly-once) 처리 보장

4️⃣ Schema Registry: 데이터 품질 보증

Kafka는 메시지를 바이트(byte) 단위로 처리하므로
기본적으로 데이터의 형식·유효성을 검증하지 않는다.

이 때문에 프로듀서가 필드를 잘못 보냈거나
컨슈머가 기대하는 스키마가 달라지면 오류가 발생한다.

이를 해결하는 시스템이 Schema Registry다.


📘 구조

Producer ↔ Schema Registry ↔ Kafka ↔ Consumer
  • Producer는 메시지 전송 전 Schema Registry에 스키마 등록
  • Consumer는 메시지 읽기 시 스키마 버전 확인
  • 스키마 호환성을 자동 검증 (Backward/Forward/Full)

🧩 Avro 예시

① 스키마 등록

{
  "type": "record",
  "name": "MyRecord",
  "fields": [
    { "name": "f1", "type": "string" }
  ]
}

② 프로듀서 메시지

{ "f1": "value1" }

→ 유효. 스키마에 정의된 f1 필드 존재.

③ 잘못된 메시지

{ "f2": "value1" }

오류 발생. f2는 정의되지 않은 필드.


🔁 스키마 진화

새로운 필드를 추가할 때 호환성을 유지해야 한다.

{
  "type": "record",
  "name": "MyRecord",
  "fields": [
    { "name": "f1", "type": "string" },
    { "name": "f2", "type": "int", "default": 0 }
  ]
}
  • f2 추가 (기본값 0 지정)
  • Schema Registry에서 Backward compatible 로 인정
  • 구버전 프로듀서/컨슈머 모두 정상 동작

💡 Schema Registry는 데이터 계약(Data Contract) 을 보장한다.
→ “깨지지 않는 데이터 파이프라인”을 위한 핵심 구성 요소.


5️⃣ Kafka 확장 API 선택 가이드

목적추천 API설명
외부 시스템 연동Kafka ConnectJDBC, S3, Elasticsearch 등 연결
실시간 변환/집계Kafka StreamsKafka ↔ Kafka 스트림 처리
데이터 포맷/유효성 관리Schema Registry스키마 기반 데이터 계약 관리
단순 송수신Producer / Consumer API코드 기반 저수준 제어

💡 확장 조합 예시

[Kafka Connect Source] → [Kafka Streams] → [Schema Registry] → [Kafka Connect Sink]

→ 데이터가 들어오고, 변환되고, 검증되고, 저장되는 전체 자동 파이프라인 완성.


💬 결론

Kafka는 단순한 메시징 시스템을 넘어
확장 가능한 데이터 스트리밍 플랫폼으로 발전했다.

  • Kafka Connect → 외부 연동과 ETL 자동화
  • Kafka Streams → 실시간 집계와 분석
  • Schema Registry → 데이터 신뢰성과 호환성 확보

이 세 가지를 통합하면
코드 한 줄 없이도 안정적인 스트림 데이터 파이프라인을 구축할 수 있다.

profile
okorion's Tech Study Blog.

0개의 댓글