카프카 생태계는 데이터 파이프라인 구축과 실시간 데이터 처리를 위한 다양한 컴포넌트를 제공한다.
Kafka Connect는 Apache Kafka의 오픈소스 구성요소로, 데이터베이스, 키-값 저장소, 검색 인덱스 및 파일 시스템 간의 데이터 통합을 위한 중앙 집중식 데이터 허브 역할을 한다. 쉽게 말해, 카프카를 사용하여 외부 시스템과 데이터를 주고 받기 위한 프레임워크다.
Worker: Connector와 Task를 실행하는 프로세스다. Worker는 두 가지 모드로 운영될 수 있다:
Connector: 데이터 복사 작업을 관리하며, 두 가지 유형이 있다:
Task: 실제로 데이터를 이동시키는 작업 단위다. 하나의 Connector는 여러 Task를 병렬로 실행할 수 있으며, 각 Task는 특정 파티션을 처리한다. Task 수를 늘리면 병렬 처리가 가능해 전체적인 처리량이 향상된다.
다음은 Source Connector와 Sink Connector의 간단한 설정 예시:
// Source Connector 예시
{
"name": "ecommerce-source-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mariadb://localhost:3306/ecommerce",
"connection.user": "root",
"connection.password": "2721",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "ecommerce.users",
"topic.prefix": "ecommerce_",
"tasks.max": "1",
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 1
}
}
// Sink Connector 예시
{
"name": "ecommerce-sink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mariadb://localhost:3306/ecommerce",
"connection.user": "root",
"connection.password": "2721",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "false",
"tasks.max": "1",
"topics": "ecommerce_users"
}
}
Kafka Streams는 Kafka 기능 위에 구축된 라이브러리로, 실시간 데이터 스트림을 처리하고 분석하기 위한 간단한 방법을 제공한다. 카프카에서 스트리밍 데이터를 실시간으로 처리하고 분석하기 위한 자바 라이브러리다.
처리 유형:
복잡성:
통합:
KStream은 카프카 스트림즈에서 가장 기본이 되는 스트림으로, 레코드(키-값 쌍)의 연속적인 흐름을 나타낸다. 실시간으로 데이터를 처리하는 데 사용되며, 필터링, 매핑, 집계 등의 연산을 수행할 수 있다.
다음은 Kafka Streams를 사용하여 텍스트를 단어별로 분리하고, 각 단어의 출현 빈도를 카운트하는 간단한 예제:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Arrays;
import java.util.Properties;
public class App {
private static String kafkaHost = "localhost";
private static String kafkaPort = "9092";
private static String consumerGroupId = "word-count-consumer-group";
public static void main(String[] args) {
StreamsBuilder builder = new StreamsBuilder();
KStream textLines = builder.stream("TextLinesTopic");
// TextLinesTopic의 데이터를 출력하는 코드
textLines.foreach((key, value) ->
System.out.println("Key: [" + key + "] Value: [" + value + "]"));
// 단어를 카운트하는 코드
KTable wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.as("counts-store"));
// 단어 카운트 결과를 출력하는 코드
wordCounts.toStream().foreach((word, count) -> {
System.out.println("Word: [" + word + "] Count: [" + count + "]");
});
// 단어 카운트 결과를 Kafka WordsWithCountsTopic Topic에 전송하는 코드
wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(getProperties()));
streams.start();
}
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, consumerGroupId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost + ":" + kafkaPort);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
return props;
}
}
Schema Registry는 카프카 클라이언트 사이에서 메시지의 스키마를 저장, 관리하는 웹 애플리케이션이다. 카프카는 메시지를 바이트 형태로 전달하기 때문에, 데이터의 구조(스키마)를 관리하는 것이 중요하다.
카프카는 클라이언트 사이의 관계를 끊어 구조적인 결합도를 낮추지만, 메시지 구조에 대한 강한 결합도는 여전히 존재한다. 예를 들어:
Schema Registry는 다음과 같은 호환성 유형을 지원한다:
다음은 스키마 진화(Schema Evolution)의 예시:
// 초기 스키마
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}
// 스키마 변경: 이메일 필드를 선택적으로 변경
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}
// 스키마 변경: 전화번호 필드 추가
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null},
{"name": "phone", "type": ["null", "string"], "default": null}
]
}
이미 데이터베이스나 다른 시스템에 저장된 데이터를 Kafka로 가져오려면 Kafka Connect Source를 사용하는 것이 가장 적합하다. Kafka Connect Source는 기존 데이터 시스템과 Kafka 간의 연결을 쉽게 구성할 수 있게 해주는 프레임워크다.
트럭이나 IoT 장치와 같은 소스에서 생성되는 실시간 데이터를 직접 Kafka로 전송하려면 Kafka Producer API를 사용하는 것이 좋다. 이는 프로그래밍 방식으로 데이터를 Kafka 토픽에 직접 게시할 수 있게 해준다.
Kafka 토픽의 데이터를 처리하고 결과를 다른 Kafka 토픽으로 보내야 하는 경우, Kafka Streams가 이상적인 선택이다. Kafka Streams는 Kafka 기반 스트리밍 애플리케이션을 개발하기 위한 클라이언트 라이브러리로, 데이터 변환, 집계, 조인 등의 작업을 수행할 수 있다.
Kafka 데이터를 SQL 쿼리로 처리하고 싶다면 KSQL을 사용할 수 있다. KSQL은 내부적으로 Kafka Streams를 활용하여 SQL 인터페이스를 제공하는 데이터베이스 시스템이다. 이를 통해 개발자는 복잡한 스트리밍 로직을 SQL 문법으로 쉽게 표현할 수 있다.
Kafka의 데이터를 외부 저장소나 분석 시스템으로 내보내려면 Kafka Connect Sink가 적합하다. 이 API는 Kafka에서 다양한 대상 시스템으로 데이터를 효율적으로 전송할 수 있게 해준다.
단순히 이메일 발송과 같은 특정 액션을 트리거하려면 Kafka Consumer API를 사용하는 것이 더 적합할 수 있다. 이 API를 사용하면 Kafka 토픽에서 메시지를 읽고 필요한 비즈니스 로직을 직접 구현할 수 있다.
데이터 파이프라인에서 데이터 타입과 형식이 올바른지 확인하려면 Schema Registry를 활용할 수 있다. Schema Registry는 Kafka 메시지의 스키마를 중앙에서 관리하고, 생산자와 소비자 간의 스키마 호환성을 보장하는 역할을 한다.
Kafka 생태계는 다양한 데이터 처리 요구사항을 충족시키기 위한 풍부한 API와 도구를 제공한다. 각 사용 사례에 맞는 적절한 API를 선택함으로써 효율적이고 강력한 데이터 파이프라인을 구축할 수 있다. 이러한 이해는 Kafka를 아키텍처 관점에서 효과적으로 활용하는 데 중요한 기반이 된다.