Apache Kafka - Kafka Connect, Kafka Streams, Schema Registry

이건·2025년 5월 11일

Kafka

목록 보기
17/18

카프카 생태계는 데이터 파이프라인 구축과 실시간 데이터 처리를 위한 다양한 컴포넌트를 제공한다.


Kafka Connect

Kafka Connect는 Apache Kafka의 오픈소스 구성요소로, 데이터베이스, 키-값 저장소, 검색 인덱스 및 파일 시스템 간의 데이터 통합을 위한 중앙 집중식 데이터 허브 역할을 한다. 쉽게 말해, 카프카를 사용하여 외부 시스템과 데이터를 주고 받기 위한 프레임워크다.

Kafka Connect의 주요 특징

  1. 코드 없는 데이터 통합: 별도의 코딩 없이 JSON 형태의 설정만으로 데이터 파이프라인을 구성할 수 있다.
  2. 다양한 시스템 지원: 다양한 데이터 소스와 타겟 시스템 간의 연동을 지원하는 커넥터를 제공한다.
  3. 확장성: 대용량 데이터 세트를 효율적으로 이동시킬 수 있다.

Kafka Connect의 구성요소

  1. Worker: Connector와 Task를 실행하는 프로세스다. Worker는 두 가지 모드로 운영될 수 있다:

    • 단독 모드(Standalone mode): 단일 Worker 프로세스 내에서 실행되는 모드로, 주로 개발 및 테스트 환경에서 사용된다.
    • 분산 모드(Distributed mode): 여러 Worker 프로세스에서 실행되는 모드로, 프로덕션 환경에서 고가용성과 확장성을 위해 사용된다.
  2. Connector: 데이터 복사 작업을 관리하며, 두 가지 유형이 있다:

    • Source Connector: 외부 시스템에서 Kafka로 데이터를 가져온다.
    • Sink Connector: Kafka에서 외부 시스템으로 데이터를 내보낸다.
  3. Task: 실제로 데이터를 이동시키는 작업 단위다. 하나의 Connector는 여러 Task를 병렬로 실행할 수 있으며, 각 Task는 특정 파티션을 처리한다. Task 수를 늘리면 병렬 처리가 가능해 전체적인 처리량이 향상된다.

Kafka Connect 사용 예시

다음은 Source Connector와 Sink Connector의 간단한 설정 예시:

// Source Connector 예시
{
  "name": "ecommerce-source-connect",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mariadb://localhost:3306/ecommerce",
    "connection.user": "root",
    "connection.password": "2721",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "table.whitelist": "ecommerce.users",
    "topic.prefix": "ecommerce_",
    "tasks.max": "1",
    "topic.creation.default.replication.factor": 1,
    "topic.creation.default.partitions": 1
  }
}
// Sink Connector 예시
{
  "name": "ecommerce-sink-connect",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:mariadb://localhost:3306/ecommerce",
    "connection.user": "root",
    "connection.password": "2721",
    "auto.create": "true",
    "auto.evolve": "true",
    "delete.enabled": "false",
    "tasks.max": "1",
    "topics": "ecommerce_users"
  }
}

Kafka Streams

Kafka Streams는 Kafka 기능 위에 구축된 라이브러리로, 실시간 데이터 스트림을 처리하고 분석하기 위한 간단한 방법을 제공한다. 카프카에서 스트리밍 데이터를 실시간으로 처리하고 분석하기 위한 자바 라이브러리다.

Kafka Streams의 주요 특징

  1. 분산 처리: Kafka Streams 애플리케이션은 분산 시스템으로 쉽게 확장될 수 있다.
  2. 상태 저장: 내부적으로 RocksDB와 같은 내장 상태 저장소를 활용하여 처리 중인 데이터를 유지한다.
  3. 탄력성과 내결함성: 애플리케이션 장애나 Kafka 클러스터의 변화에 강하게 설계되어 있다.
  4. MSA 아키텍처 적합: 마이크로서비스 아키텍처에 적합하며, 높은 처리량과 확장성을 제공한다.

Kafka와 Kafka Streams의 차이점

  1. 처리 유형:

    • Kafka: 기본적인 메시지 전송과 저장에 초점을 맞춘다.
    • Kafka Streams: 복잡한 실시간 스트림 처리 작업(데이터 집계, 변환, 필터링, 조인 등)에 필요하다.
  2. 복잡성:

    • Kafka: 메시지를 생성, 저장, 소비하는 단순한 작업에 적합하다.
    • Kafka Streams: 상태 관리, 윈도잉, 시간 기반 연산 등의 복잡한 스트림 처리를 수행할 수 있다.
  3. 통합:

    • Kafka: 다양한 소스에서 데이터를 수집하거나 다양한 대상 시스템으로 데이터를 전송하는데 사용된다.
    • Kafka Streams: Kafka의 데이터를 처리하여 다른 Kafka 토픽에 결과를 저장하거나 외부 시스템과 통합하는데 사용된다.

Kafka Streams의 핵심 개념: KStream

KStream은 카프카 스트림즈에서 가장 기본이 되는 스트림으로, 레코드(키-값 쌍)의 연속적인 흐름을 나타낸다. 실시간으로 데이터를 처리하는 데 사용되며, 필터링, 매핑, 집계 등의 연산을 수행할 수 있다.

Kafka Streams 예제

다음은 Kafka Streams를 사용하여 텍스트를 단어별로 분리하고, 각 단어의 출현 빈도를 카운트하는 간단한 예제:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Arrays;
import java.util.Properties;

public class App {
    private static String kafkaHost = "localhost";
    private static String kafkaPort = "9092";
    private static String consumerGroupId = "word-count-consumer-group";
    
    public static void main(String[] args) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream textLines = builder.stream("TextLinesTopic");
        
        // TextLinesTopic의 데이터를 출력하는 코드
        textLines.foreach((key, value) -> 
            System.out.println("Key: [" + key + "] Value: [" + value + "]"));
        
        // 단어를 카운트하는 코드
        KTable wordCounts = textLines
            .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word)
            .count(Materialized.as("counts-store"));
        
        // 단어 카운트 결과를 출력하는 코드
        wordCounts.toStream().foreach((word, count) -> {
            System.out.println("Word: [" + word + "] Count: [" + count + "]");
        });
        
        // 단어 카운트 결과를 Kafka WordsWithCountsTopic Topic에 전송하는 코드
        wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(getProperties()));
        streams.start();
    }
    
    private static Properties getProperties() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, consumerGroupId);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost + ":" + kafkaPort);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
        return props;
    }
}

Schema Registry

Schema Registry는 카프카 클라이언트 사이에서 메시지의 스키마를 저장, 관리하는 웹 애플리케이션이다. 카프카는 메시지를 바이트 형태로 전달하기 때문에, 데이터의 구조(스키마)를 관리하는 것이 중요하다.

Schema Registry의 필요성

카프카는 클라이언트 사이의 관계를 끊어 구조적인 결합도를 낮추지만, 메시지 구조에 대한 강한 결합도는 여전히 존재한다. 예를 들어:

  1. 프로듀서는 어떤 컨슈머가 메시지를 가져갈지 모른다.
  2. 컨슈머는 어떤 프로듀서가 메시지를 보냈는지 모른다.
  3. 프로듀서가 스키마를 변경하여 메시지를 발행하면, 컨슈머는 이 상황을 알지 못하고 메시지 처리 중 장애가 발생할 수 있다.

Schema Registry의 주요 기능

  1. 토픽별 메시지 스키마 버전 관리: 토픽별로 메시지 Key 또는 Value의 스키마 버전을 관리한다.
  2. 스키마 호환성 규칙 강제: 스키마 버전 간 호환성을 강제하여 개발 운영 규칙을 설정한다.
  3. 스키마 버전 조회: REST API를 통해 스키마 버전을 조회할 수 있다.

스키마 호환성 유형

Schema Registry는 다음과 같은 호환성 유형을 지원한다:

  1. Backward: 컨슈머는 새 스키마로 메시지를 처리하지만 이전 스키마도 처리할 수 있다. 필드 삭제나 기본값이 있는 필드 추가 시 적용된다.
  2. Forward: 컨슈머는 이전 스키마로 메시지를 처리하지만 새 스키마도 처리할 수 있다. 필드 추가나 기본값이 있는 필드 삭제 시 적용된다.
  3. Full: Backward와 Forward를 모두 가진다. 기본값이 있는 필드를 추가하거나 삭제할 때 적용된다.
  4. None: 스키마 호환성을 체크하지 않는다.

Schema Registry의 작동 방식

  1. 스키마 등록: 프로듀서는 메시지 스키마를 Schema Registry에 등록한다. 각 스키마는 고유한 Schema ID를 부여받는다.
  2. 메시지 직렬화: 프로듀서는 메시지를 직렬화할 때 해당 스키마의 Schema ID를 포함시킨다.
  3. 메시지 역직렬화: 컨슈머는 메시지를 역직렬화할 때 Schema ID를 사용하여 적절한 스키마를 식별하고, 메시지를 원래의 데이터 형식으로 변환한다.
  4. 로컬 캐시 활용: 매번 Schema Registry를 확인하는 작업은 지연을 초래하므로, 로컬 캐시에서 매핑되는 스키마 및 Schema ID를 찾아 성능을 향상시킨다.

Schema Evolution 예시

다음은 스키마 진화(Schema Evolution)의 예시:

// 초기 스키마
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"}
  ]
}
// 스키마 변경: 이메일 필드를 선택적으로 변경
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": ["null", "string"], "default": null}
  ]
}
// 스키마 변경: 전화번호 필드 추가
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": ["null", "string"], "default": null},
    {"name": "phone", "type": ["null", "string"], "default": null}
  ]
}

Kafka 생태계와 API 선택 가이드

데이터 소스에 따른 API 선택

원천 데이터베이스에서 Kafka로

이미 데이터베이스나 다른 시스템에 저장된 데이터를 Kafka로 가져오려면 Kafka Connect Source를 사용하는 것이 가장 적합하다. Kafka Connect Source는 기존 데이터 시스템과 Kafka 간의 연결을 쉽게 구성할 수 있게 해주는 프레임워크다.

실시간 데이터 생성에서 Kafka로

트럭이나 IoT 장치와 같은 소스에서 생성되는 실시간 데이터를 직접 Kafka로 전송하려면 Kafka Producer API를 사용하는 것이 좋다. 이는 프로그래밍 방식으로 데이터를 Kafka 토픽에 직접 게시할 수 있게 해준다.

Kafka에서 Kafka로

Kafka 토픽의 데이터를 처리하고 결과를 다른 Kafka 토픽으로 보내야 하는 경우, Kafka Streams가 이상적인 선택이다. Kafka Streams는 Kafka 기반 스트리밍 애플리케이션을 개발하기 위한 클라이언트 라이브러리로, 데이터 변환, 집계, 조인 등의 작업을 수행할 수 있다.

SQL 기반 데이터 처리

Kafka 데이터를 SQL 쿼리로 처리하고 싶다면 KSQL을 사용할 수 있다. KSQL은 내부적으로 Kafka Streams를 활용하여 SQL 인터페이스를 제공하는 데이터베이스 시스템이다. 이를 통해 개발자는 복잡한 스트리밍 로직을 SQL 문법으로 쉽게 표현할 수 있다.

데이터 타겟에 따른 API 선택

Kafka에서 외부 저장소로

Kafka의 데이터를 외부 저장소나 분석 시스템으로 내보내려면 Kafka Connect Sink가 적합하다. 이 API는 Kafka에서 다양한 대상 시스템으로 데이터를 효율적으로 전송할 수 있게 해준다.

특정 액션 트리거

단순히 이메일 발송과 같은 특정 액션을 트리거하려면 Kafka Consumer API를 사용하는 것이 더 적합할 수 있다. 이 API를 사용하면 Kafka 토픽에서 메시지를 읽고 필요한 비즈니스 로직을 직접 구현할 수 있다.

데이터 품질 관리

데이터 파이프라인에서 데이터 타입과 형식이 올바른지 확인하려면 Schema Registry를 활용할 수 있다. Schema Registry는 Kafka 메시지의 스키마를 중앙에서 관리하고, 생산자와 소비자 간의 스키마 호환성을 보장하는 역할을 한다.

결론

Kafka 생태계는 다양한 데이터 처리 요구사항을 충족시키기 위한 풍부한 API와 도구를 제공한다. 각 사용 사례에 맞는 적절한 API를 선택함으로써 효율적이고 강력한 데이터 파이프라인을 구축할 수 있다. 이러한 이해는 Kafka를 아키텍처 관점에서 효과적으로 활용하는 데 중요한 기반이 된다.

0개의 댓글