๐ณ๏ธโ๐ [๊ถ๊ธํ์ ]
๐๋ชฉ์ฐจ
docker run -d \
--name kafka \
-p 9092:9092 \
-e KAFKA_NODE_ID=1 \
-e KAFKA_PROCESS_ROLES=broker,controller \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
-e KAFKA_NUM_PARTITIONS=3 \
apache/kafka:latest
| ์ต์ | ์ค๋ช |
|---|---|
-d | -d: ์ปจํ
์ด๋๋ฅผ ๋ฐฑ๊ทธ๋ผ์ด๋์์ ์คํ |
--name kafka | ์ปจํ
์ด๋ ์ด๋ฆ์ kafka๋ก ์ค์ |
-e KAFKA_NODE_ID=1 | Kafka ๋
ธ๋์ ID๋ฅผ 1๋ก ์ค์ . Kafka ํด๋ฌ์คํฐ ๋ด์์ ๊ณ ์ ํ ID๋ก ์ฌ์ฉ๋ฉ๋๋ค. |
-e KAFKA_PROCESS_ROLES=broker,controller | Kafka๊ฐ broker์ controller ์ญํ ์ ๋์์ ์ํํ๋๋ก ์ค์ . |
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 | Kafka๊ฐ PLAINTEXT์ CONTROLLER ๋ฆฌ์ค๋๋ก ๊ฐ๊ฐ 9092์ 9093 ํฌํธ์์ ๋ฆฌ์ค๋ํ๋๋ก ์ค์ |
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 | ์ธ๋ถ์์ ์ ๊ทผํ ์ ์๋ Kafka ๋ธ๋ก์ปค์ ์ฃผ์๋ฅผ localhost:9092๋ก ์ค์ |
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER | Kafka ์ปจํธ๋กค๋ฌ๊ฐ ์ฌ์ฉํ ๋ฆฌ์ค๋์ ์ด๋ฆ์ CONTROLLER๋ก ์ค์ . |
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT | ๋ฆฌ์ค๋์ ๋ํด ์ฌ์ฉํ ๋ณด์ ํ๋กํ ์ฝ์ ์ค์ . |
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 | Kafka ์ปจํธ๋กค๋ฌ๊ฐ ์ฌ์ฉํ ์ฟผ๋ผ ๋ ธ๋๋ฅผ ์ค์ . |
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 | ์คํ์
ํ ํฝ์ ๋ณต์ ๊ณ์๋ฅผ 1๋ก ์ค์ ํ์ฌ ๋ณต์ ๋ณธ์ 1๊ฐ๋ก ์ ํํฉ๋๋ค. |
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 | ํธ๋์ญ์
์ํ ๋ก๊ทธ์ ๋ณต์ ๊ณ์๋ฅผ 1๋ก ์ค์ ํ์ฌ ๋ณต์ ๋ณธ์ 1๊ฐ๋ก ์ ํํฉ๋๋ค. |
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 | ํธ๋์ญ์ ์ํ ๋ก๊ทธ์ ์ต์ ISR(In-Sync Replicas)์ ์ค์ . |
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 | ์๋น์ ๊ทธ๋ฃน์ ์ด๊ธฐ ๋ฆฌ๋ฐธ๋ฐ์ค๋ฅผ ์ฆ์ ์ํํ๋๋ก ์ง์ฐ ์๊ฐ์ 0์ผ๋ก ์ค์ . |
-e KAFKA_NUM_PARTITIONS=3 | ๊ธฐ๋ณธ์ ์ผ๋ก ์์ฑ๋๋ Kafka ํ ํฝ์ ํํฐ์
์๋ฅผ 3์ผ๋ก ์ค์ . |
apache/kafka:latest | Docker์์ ์คํํ Kafka์ ์ต์ ๋ฒ์ ์ด๋ฏธ์ง๋ฅผ ์ฌ์ฉํฉ๋๋ค. |
docker exec -it kafka bash
cf9fccdc5f12:/var/lib/kafka/data$ export PATH=$PATH:/opt/kafka/bin/
8938869c94ce:/$ java --version
openjdk 21.0.6 2025-01-21 LTS
OpenJDK Runtime Environment Temurin-21.0.6+7 (build 21.0.6+7-LTS)
OpenJDK 64-Bit Server VM Temurin-21.0.6+7 (build 21.0.6+7-LTS, mixed mode, sharing)
26d922b2998b:/$ kafka-topics.sh --version
4.0.0
cf9fccdc5f12:~$ kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic
Created topic test-topic.
cf9fccdc5f12:~$ kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic2
Created topic test-topic2.
cf9fccdc5f12:~$ kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
_schemas
test-topic
test-topic2
cf9fccdc5f12:~$ kafka-topics.sh --delete --topic test-topic2 --bootstrap-server localhost:9092
cf9fccdc5f12:~$ kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
_schemas
test-topic
cf9fccdc5f12:~$ kafka-configs.sh --describe --topic test-topic --bootstrap-server localhost:9092
Dynamic configs for topic test-topic are:
cf9fccdc5f12:~$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
>
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
console-consumer-73216
console-consumer-34026
acho@DESKTOP-SCOK45O:~$ rm kafka_2.12-3.7.2.tgz
acho@DESKTOP-SCOK45O:~$ tar -xvf kafka_2.13-4.0.0.tgz
Kafka์ ๋ฐฑ์ ์ ์ ํต์ ์ธ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ฒ๋ผ "์ค๋ ์ท"์ ์ฐ๋ ๋ฐฉ์๋ณด๋ค๋ topic ๋ฐ์ดํฐ๋ฅผ ๋ณต์ ํ๊ฑฐ๋ ๋ด๋ณด๋ด๋(export) ๋ฐฉ์์ผ๋ก ์ํํ๋ ๊ฒ ์ผ๋ฐ์ ์ด๋ค. ์๋๋ Kafka ๋ฐฑ์ ์ ์ํํ ์ ์๋ ๋ํ์ ์ธ ๋ฐฉ๋ฒ๋ค์ด๋ค
Kafka์ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฅธ Kafka ํด๋ฌ์คํฐ๋ก ๋ณต์ (๋ฏธ๋ฌ๋ง)ํ ์ ์๋ ๊ณต์ ๋๊ตฌ์ด๋ค.
์ค์๊ฐ ๋ณต์ , ์ฅ์ ๋ณต๊ตฌ ํด๋ฌ์คํฐ ๊ตฌ์ฑ ๊ฐ๋ฅํ๋ฉฐ, DR(์ฌํด ๋ณต๊ตฌ)๋ ๋ค๋ฅธ ์ง์ญ์ผ๋ก ๋๊ธฐํํ ๋ ์ฌ์ฉํ๋ค.
Kafka Connect ํ๋ ์์ํฌ๋ฅผ ์ฌ์ฉํด ๋ฐ์ดํฐ๋ฅผ ์ธ๋ถ ์ ์ฅ์๋ก ๋ด๋ณด๋ผ ์ ์๋ค.
๋ค์ํ ์ธ๋ถ ์์คํ ์ง์, ํ๋ฌ๊ทธ์ธ ํ์์ด ์กด์ฌํ๋ Kafka Connect ํด๋ฌ์คํฐ ์ฌ์ฉ์ด ์๊ตฌ๋๋ค
๊ฐ๋จํ Python์ด๋ Java ์คํฌ๋ฆฝํธ๋ฅผ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ๋ฅผ ์ง์ ์ฝ์ด์ ํ์ผ๋ก ์ ์ฅํ ์ ์๋ค.
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'your-topic',
bootstrap_servers='kafka:9092',
auto_offset_reset='earliest',
enable_auto_commit=False
)
with open('backup.json', 'w') as f:
for message in consumer:
f.write(f"{message.value.decode('utf-8')}\n")
์ปค์คํฐ๋ง์ด์ง ์ฌ์ด ๋ฐ๋ฉด ๋๋ ๋ฐ์ดํฐ์๋ ๋นํจ์จ์ ์ด๋ค.
Kafka์ ๋ก๊ทธ ๋๋ ํ ๋ฆฌ(/var/lib/kafka ๋ฑ)๋ฅผ ์ง์ ๋ฐฑ์ ํ๋ ๋ฐฉ์์ด๋ค. kafka.log.dirs์ ์ ์ฅ๋๋ .log ํ์ผ์ tar, rsync, snapshot ๋ฑ์ผ๋ก ๋ณต์ฌํ๋ค.
์ด ๋ฐฉ๋ฒ์ Kafka ๋ธ๋ก์ปค๋ฅผ ์ค์งํ ํ ๋ฐฑ์ ํด์ผ ์ผ๊ด์ฑ ๋ณด์ฅ๋๋ค.
docker network create kafka-net
docker run -d \
--network kafka-net \
--name zookeeper \
-e ZOOKEEPER_CLIENT_PORT=2181 \
confluentinc/cp-zookeeper:7.5.0
docker run -d \
--name schema-registry \
--network kafka-net \
-p 8081:8081 \
-e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://kafka:9092 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081 \
confluentinc/cp-schema-registry:7.5.0
acho@DESKTOP-SCOK45O:~curl -X POST http://localhost:8081/subjects/my-topic-value/versions \ \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}"}'
{"id":1}
acho@DESKTOP-SCOK45O:~$ curl -X POST http://localhost:8081/subjects/test-topic-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\": \"name\", \"type\": \"string\"}]}"
}'
{"id":2}
acho@DESKTOP-SCOcurl http://localhost:8081/subjectssubjects
["my-topic-value","test-topic-value"]
curl http://localhost:8081/subjects/test-topic-value/versions/latest
acho@DESKTOP-SCOK45O:~$ curl http://localhost:8081/subjects/test-topic-value/versions/1
{"subject":"test-topic-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}"}
curl -X DELETE http://localhost:8081/subjects/test-topic-value
curl -X POST http://localhost:8081/compatibility/subjects/test-topic-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"id\", \"type\": \"int\"}]}"
}'
echo '{"name": "Alice"}' | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
echo '{"name": "Alice", "age": 30}' | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
acho@DESKTOP-SCOK45O:~$ curl -X POST http://localhost:8081/subjects/test-topic-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}"
}'
{"id":1}