Provisioning, Kafka / Schema registry

Jeonghak Choยท2025๋…„ 5์›” 11์ผ

Provisioning

๋ชฉ๋ก ๋ณด๊ธฐ
23/44

๐Ÿณ๏ธโ€๐ŸŒˆ [๊ถ๊ธˆํ•œ์ ]

  • schema-registry ์„ค์น˜ ๋ฐฉ๋ฒ• ๋ฐ ํ…Œ์ŠคํŠธ ๋ฐฉ๋ฒ•

๐Ÿ”—๋ชฉ์ฐจ

kafka ๋ธŒ๋กœ์ปค ์‹คํ–‰

๋„์ปค๋ฅผ ์ด์šฉํ•œ kafka ์‹คํ–‰ (Standalone)

docker run -d  \
  --name kafka \
  -p 9092:9092 \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_PROCESS_ROLES=broker,controller \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
  -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
  -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
  -e KAFKA_NUM_PARTITIONS=3 \
  apache/kafka:latest

Docker Run ๋ช…๋ น์–ด ์˜ต์…˜ ์„ค๋ช…

์˜ต์…˜์„ค๋ช…
-d-d: ์ปจํ…Œ์ด๋„ˆ๋ฅผ ๋ฐฑ๊ทธ๋ผ์šด๋“œ์—์„œ ์‹คํ–‰
--name kafka์ปจํ…Œ์ด๋„ˆ ์ด๋ฆ„์„ kafka๋กœ ์„ค์ •
-e KAFKA_NODE_ID=1Kafka ๋…ธ๋“œ์˜ ID๋ฅผ 1๋กœ ์„ค์ •. Kafka ํด๋Ÿฌ์Šคํ„ฐ ๋‚ด์—์„œ ๊ณ ์œ ํ•œ ID๋กœ ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค.
-e KAFKA_PROCESS_ROLES=broker,controllerKafka๊ฐ€ broker์™€ controller ์—ญํ• ์„ ๋™์‹œ์— ์ˆ˜ํ–‰ํ•˜๋„๋ก ์„ค์ •.
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093Kafka๊ฐ€ PLAINTEXT์™€ CONTROLLER ๋ฆฌ์Šค๋„ˆ๋กœ ๊ฐ๊ฐ 9092์™€ 9093 ํฌํŠธ์—์„œ ๋ฆฌ์Šค๋‹ํ•˜๋„๋ก ์„ค์ •
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092์™ธ๋ถ€์—์„œ ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๋Š” Kafka ๋ธŒ๋กœ์ปค์˜ ์ฃผ์†Œ๋ฅผ localhost:9092๋กœ ์„ค์ •
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLERKafka ์ปจํŠธ๋กค๋Ÿฌ๊ฐ€ ์‚ฌ์šฉํ•  ๋ฆฌ์Šค๋„ˆ์˜ ์ด๋ฆ„์„ CONTROLLER๋กœ ์„ค์ •.
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT๋ฆฌ์Šค๋„ˆ์— ๋Œ€ํ•ด ์‚ฌ์šฉํ•  ๋ณด์•ˆ ํ”„๋กœํ† ์ฝœ์„ ์„ค์ •.
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093Kafka ์ปจํŠธ๋กค๋Ÿฌ๊ฐ€ ์‚ฌ์šฉํ•  ์ฟผ๋Ÿผ ๋…ธ๋“œ๋ฅผ ์„ค์ •.
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1์˜คํ”„์…‹ ํ† ํ”ฝ์˜ ๋ณต์ œ ๊ณ„์ˆ˜๋ฅผ 1๋กœ ์„ค์ •ํ•˜์—ฌ ๋ณต์ œ๋ณธ์„ 1๊ฐœ๋กœ ์ œํ•œํ•ฉ๋‹ˆ๋‹ค.
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1ํŠธ๋žœ์žญ์…˜ ์ƒํƒœ ๋กœ๊ทธ์˜ ๋ณต์ œ ๊ณ„์ˆ˜๋ฅผ 1๋กœ ์„ค์ •ํ•˜์—ฌ ๋ณต์ œ๋ณธ์„ 1๊ฐœ๋กœ ์ œํ•œํ•ฉ๋‹ˆ๋‹ค.
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1ํŠธ๋žœ์žญ์…˜ ์ƒํƒœ ๋กœ๊ทธ์˜ ์ตœ์†Œ ISR(In-Sync Replicas)์„ ์„ค์ •.
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0์†Œ๋น„์ž ๊ทธ๋ฃน์˜ ์ดˆ๊ธฐ ๋ฆฌ๋ฐธ๋Ÿฐ์Šค๋ฅผ ์ฆ‰์‹œ ์ˆ˜ํ–‰ํ•˜๋„๋ก ์ง€์—ฐ ์‹œ๊ฐ„์„ 0์œผ๋กœ ์„ค์ •.
-e KAFKA_NUM_PARTITIONS=3๊ธฐ๋ณธ์ ์œผ๋กœ ์ƒ์„ฑ๋˜๋Š” Kafka ํ† ํ”ฝ์˜ ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ฅผ 3์œผ๋กœ ์„ค์ •.
apache/kafka:latestDocker์—์„œ ์‹คํ–‰ํ•  Kafka์˜ ์ตœ์‹  ๋ฒ„์ „ ์ด๋ฏธ์ง€๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

kafka ๋ธŒ๋กœ์ปค ํ…Œ์ŠคํŠธ

kafka ์ปจํ…Œ์ด๋„ˆ ์ ‘์†

docker exec -it kafka bash

cf9fccdc5f12:/var/lib/kafka/data$ export PATH=$PATH:/opt/kafka/bin/

์ž๋ฐ” ๋ฒ„์ „ ํ™•์ธ

8938869c94ce:/$ java --version
openjdk 21.0.6 2025-01-21 LTS
OpenJDK Runtime Environment Temurin-21.0.6+7 (build 21.0.6+7-LTS)
OpenJDK 64-Bit Server VM Temurin-21.0.6+7 (build 21.0.6+7-LTS, mixed mode, sharing)

Kafka ๋ฒ„์ „ ํ™•์ธ

26d922b2998b:/$ kafka-topics.sh --version
4.0.0

ํ† ํ”ฝ ์ƒ์„ฑ

cf9fccdc5f12:~$ kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic
Created topic test-topic.

cf9fccdc5f12:~$ kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic2
Created topic test-topic2.

ํ† ํ”ฝ ๋ชฉ๋ก ์กฐํšŒ

cf9fccdc5f12:~$ kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
_schemas
test-topic
test-topic2

ํ† ํ”ฝ ์‚ญ์ œ

cf9fccdc5f12:~$ kafka-topics.sh --delete --topic test-topic2 --bootstrap-server localhost:9092

cf9fccdc5f12:~$ kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
_schemas
test-topic

ํ† ํ”ฝ ์„ค์ • ์กฐํšŒ

cf9fccdc5f12:~$ kafka-configs.sh --describe --topic test-topic --bootstrap-server localhost:9092
Dynamic configs for topic test-topic are:

ํ”„๋กœ๋“€์„œ

cf9fccdc5f12:~$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
>

์ปจ์ˆ˜๋จธ

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

์†Œ๋น„์ž ๊ทธ๋ฃน ์กฐํšŒ

kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
console-consumer-73216
console-consumer-34026

ํด๋ผ์ด์–ธํŠธ ๊ตฌ์„ฑ

acho@DESKTOP-SCOK45O:~$ rm kafka_2.12-3.7.2.tgz
acho@DESKTOP-SCOK45O:~$ tar -xvf kafka_2.13-4.0.0.tgz

๋ฐฑ์—…

Kafka์˜ ๋ฐฑ์—…์€ ์ „ํ†ต์ ์ธ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์ฒ˜๋Ÿผ "์Šค๋ƒ…์ƒท"์„ ์ฐ๋Š” ๋ฐฉ์‹๋ณด๋‹ค๋Š” topic ๋ฐ์ดํ„ฐ๋ฅผ ๋ณต์ œํ•˜๊ฑฐ๋‚˜ ๋‚ด๋ณด๋‚ด๋Š”(export) ๋ฐฉ์‹์œผ๋กœ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ฒŒ ์ผ๋ฐ˜์ ์ด๋‹ค. ์•„๋ž˜๋Š” Kafka ๋ฐฑ์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋Š” ๋Œ€ํ‘œ์ ์ธ ๋ฐฉ๋ฒ•๋“ค์ด๋‹ค

MirrorMaker 2 (MM2)

Kafka์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฅธ Kafka ํด๋Ÿฌ์Šคํ„ฐ๋กœ ๋ณต์ œ(๋ฏธ๋Ÿฌ๋ง)ํ•  ์ˆ˜ ์žˆ๋Š” ๊ณต์‹ ๋„๊ตฌ์ด๋‹ค.
์‹ค์‹œ๊ฐ„ ๋ณต์ œ, ์žฅ์•  ๋ณต๊ตฌ ํด๋Ÿฌ์Šคํ„ฐ ๊ตฌ์„ฑ ๊ฐ€๋Šฅํ•˜๋ฉฐ, DR(์žฌํ•ด ๋ณต๊ตฌ)๋‚˜ ๋‹ค๋ฅธ ์ง€์—ญ์œผ๋กœ ๋™๊ธฐํ™”ํ•  ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค.

Kafka Connect + Sink Connector

Kafka Connect ํ”„๋ ˆ์ž„์›Œํฌ๋ฅผ ์‚ฌ์šฉํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ์™ธ๋ถ€ ์ €์žฅ์†Œ๋กœ ๋‚ด๋ณด๋‚ผ ์ˆ˜ ์žˆ๋‹ค.

  • S3 Sink Connector: AWS S3๋‚˜ MinIO์— ๋ฐฑ์—…
  • HDFS Sink Connector
  • Elasticsearch, PostgreSQL, BigQuery ๋“ฑ

๋‹ค์–‘ํ•œ ์™ธ๋ถ€ ์‹œ์Šคํ…œ ์ง€์›, ํ”Œ๋Ÿฌ๊ทธ์ธ ํ˜•์‹์ด ์กด์žฌํ•˜๋‚˜ Kafka Connect ํด๋Ÿฌ์Šคํ„ฐ ์‚ฌ์šฉ์ด ์š”๊ตฌ๋œ๋‹ค

Kafka Consumer ์Šคํฌ๋ฆฝํŠธ๋กœ ์ง์ ‘ Export

๊ฐ„๋‹จํ•œ Python์ด๋‚˜ Java ์Šคํฌ๋ฆฝํŠธ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ์ง์ ‘ ์ฝ์–ด์„œ ํŒŒ์ผ๋กœ ์ €์žฅํ•  ์ˆ˜ ์žˆ๋‹ค.

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'your-topic',
    bootstrap_servers='kafka:9092',
    auto_offset_reset='earliest',
    enable_auto_commit=False
)

with open('backup.json', 'w') as f:
    for message in consumer:
        f.write(f"{message.value.decode('utf-8')}\n")

์ปค์Šคํ„ฐ๋งˆ์ด์ง• ์‰ฌ์šด ๋ฐ˜๋ฉด ๋Œ€๋Ÿ‰ ๋ฐ์ดํ„ฐ์—๋Š” ๋น„ํšจ์œจ์ ์ด๋‹ค.

Storage Level Backup (๋””์Šคํฌ ๊ธฐ๋ฐ˜)

Kafka์˜ ๋กœ๊ทธ ๋””๋ ‰ํ† ๋ฆฌ(/var/lib/kafka ๋“ฑ)๋ฅผ ์ง์ ‘ ๋ฐฑ์—…ํ•˜๋Š” ๋ฐฉ์‹์ด๋‹ค. kafka.log.dirs์— ์ €์žฅ๋˜๋Š” .log ํŒŒ์ผ์„ tar, rsync, snapshot ๋“ฑ์œผ๋กœ ๋ณต์‚ฌํ•œ๋‹ค.

์ด ๋ฐฉ๋ฒ•์€ Kafka ๋ธŒ๋กœ์ปค๋ฅผ ์ค‘์ง€ํ•œ ํ›„ ๋ฐฑ์—…ํ•ด์•ผ ์ผ๊ด€์„ฑ ๋ณด์žฅ๋œ๋‹ค.

schema-registry ์„ค์น˜

๋„คํŠธ์›Œํฌ ์ƒ์„ฑ

docker network create kafka-net

Zookeeper ์„ค์น˜

docker run -d \
--network kafka-net \
  --name zookeeper \
  -e ZOOKEEPER_CLIENT_PORT=2181 \
  confluentinc/cp-zookeeper:7.5.0

schema-registry ์„ค์น˜

docker run -d \
  --name schema-registry \
  --network kafka-net \
  -p 8081:8081 \
  -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://kafka:9092 \
  -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
  -e SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081 \
  confluentinc/cp-schema-registry:7.5.0

schema-registry ํ™•์ธ

acho@DESKTOP-SCOK45O:~curl -X POST http://localhost:8081/subjects/my-topic-value/versions \ \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}"}'

{"id":1}

schema-registry ํ…Œ์ŠคํŠธ

topic์— ์Šคํ‚ค๋งˆ ๋“ฑ๋ก (Avro ์˜ˆ์ œ)

acho@DESKTOP-SCOK45O:~$ curl -X POST http://localhost:8081/subjects/test-topic-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\": \"name\", \"type\": \"string\"}]}"
  }'

{"id":2}
  • test-topic-value: value์— ๋Œ€ํ•œ ์Šคํ‚ค๋งˆ. key๋ผ๋ฉด test-topic-key
  • schema: Avro JSON ํ˜•์‹ (์ด์ค‘ ์ธ์ฝ”๋”ฉ์— ์ฃผ์˜)

์Šคํ‚ค๋งˆ ๋ชฉ๋ก ์กฐํšŒ

acho@DESKTOP-SCOcurl http://localhost:8081/subjectssubjects

["my-topic-value","test-topic-value"]

ํŠน์ • ์ฃผ์ œ์˜ ์ตœ์‹  ์Šคํ‚ค๋งˆ ์กฐํšŒ

curl http://localhost:8081/subjects/test-topic-value/versions/latest

ํŠน์ • ๋ฒ„์ „์˜ ์Šคํ‚ค๋งˆ ์กฐํšŒ

acho@DESKTOP-SCOK45O:~$ curl http://localhost:8081/subjects/test-topic-value/versions/1

{"subject":"test-topic-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}"}

๋“ฑ๋ก๋œ ์Šคํ‚ค๋งˆ ์‚ญ์ œ

curl -X DELETE http://localhost:8081/subjects/test-topic-value

์Šคํ‚ค๋งˆ ํ˜ธํ™˜์„ฑ ํ™•์ธ

curl -X POST http://localhost:8081/compatibility/subjects/test-topic-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"id\", \"type\": \"int\"}]}"
  }'

schema-registry ์‚ฌ์šฉ๋ฐฉ๋ฒ•

๋ฉ”์‹œ์ง€ ์ „์†ก

echo '{"name": "Alice"}' | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
  • ์ •์ƒ ์ „์†ก

์Šคํ‚ค๋งˆ ๋ณ€๊ฒฝ ๋ฐœ์ƒ

echo '{"name": "Alice", "age": 30}' | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
  • ์ปจ์Šˆ๋จธ๊ฐ€ ๊ตฌ์กฐ ๋ณ€๊ฒฝ ๊ฐ์ง€ ๋ชป ํ•ด โ†’ ์˜ค๋ฅ˜ ๋ฐœ์ƒ ๊ฐ€๋Šฅ์„ฑ ์žˆ์Œ.

์Šคํ‚ค๋งˆ ๋“ฑ๋ก

acho@DESKTOP-SCOK45O:~$ curl -X POST http://localhost:8081/subjects/test-topic-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}"
  }'
{"id":1}

0๊ฐœ์˜ ๋Œ“๊ธ€