- zookeeper - 1개
- kafka node - 3개
- kafkadrop UI - 1개
docker-compose.yaml
version: '3'
services:
zookeeper:
image: zookeeper:3.7
hostname: zookeeper
user: '0:0'
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
volumes:
- /mnt/d/kafka/data/zookeeper/data:/data
- /mnt/d/kafka/data/zookeeper/datalog:/datalog
restart: always
networks:
- dockercompose_dataops
kafka1:
image: confluentinc/cp-kafka:7.0.0
hostname: kafka1
user: '0:0'
ports:
- "9091:9091"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- /mnt/d/kafka/data/broker1:/var/lib/kafka
restart: always
depends_on:
- zookeeper
networks:
- dockercompose_dataops
kafka2:
image: confluentinc/cp-kafka:7.0.0
hostname: kafka2
user: '0:0'
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- /mnt/d/kafka/data/broker2:/var/lib/kafka
restart: always
depends_on:
- zookeeper
networks:
- dockercompose_dataops
kafka3:
image: confluentinc/cp-kafka:7.0.0
hostname: kafka3
user: '0:0'
ports:
- "9093:9093"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- /mnt/d/kafka/data/broker3:/var/lib/kafka
restart: always
depends_on:
- zookeeper
networks:
- dockercompose_dataops
kafdrop:
image: obsidiandynamics/kafdrop
ports:
- "9000:9000"
environment:
KAFKA_BROKER_CONNECT: "kafka1:19091"
restart: always
depends_on:
- kafka1
- kafka2
- kafka3
networks:
- dockercompose_dataops
networks:
dockercompose_dataops:
external: true
Port 간섭 없이 정상적으로 docker compose 파일이 설치가 되었다면 kafka 클러스터 관련 5개의 컨테이너가 Up 상태가 되었을 것이다.
9000번 포트로 접속을 하게되면 kafdrop UI를 볼 수 있다.
아래 부분 Topic 섹션에서 +new
탭을 누르면 새로운 토픽을 손쉽게 생성할 수 있다. "demo" 라는 토픽을 생성하여 실습을 진행하도록 할 것이다.
fakedata.py
from faker import Faker
import shortuuid
from datetime import datetime
fake = Faker()
fake.profile()
def create_fakeuser() -> dict:
fake = Faker()
fake_profile = fake.profile()
key_list = ["name", "ssn", "job", "residence", "blood_group", "sex", "birthdate"]
fake_dict = dict()
for key in key_list:
fake_dict[key] = fake_profile[key]
fake_dict["uuid"] = shortuuid.uuid()
fake_dict['birthdate'] = fake_dict['birthdate'].strftime("%Y%m%d")
fake_dict['timestamp'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return fake_dict
형상
{'name': 'Donald Smith',
'ssn': '571-96-7625',
'job': 'Clinical embryologist',
'residence': '39473 Bailey Islands Apt. 406\nLake Jamesmouth, WV 65741',
'blood_group': 'B+',
'sex': 'M',
'birthdate': '19560708',
'uuid': 'by7vqUMcNpZQBqaScCEuYN',
'timestamp': '2024-05-23 01:27:12'}
demo
토픽으로 전송한다.import time, json
from kafka import KafkaProducer
from fakedata import create_fakeuser
topic_name = "demo"
producer = KafkaProducer(
bootstrap_servers=["kafka1:19091", "kafka2:19092", "kafka3:19093"]
)
ORDER_LIMIT = 100
for i in range(1, ORDER_LIMIT+1):
data = create_fakeuser()
producer.send(topic_name, json.dumps(data).encode("utf-8"))
print("=="*30)
print(data)
print(f">>>>>>>>>>> {i} MESSAGE SENT <<<<<<<<<<<<")
time.sleep(2)
demo
토픽으로 전송된 데이터를 Spark Streaming이 Consumer 역할을 하면서 받아와 처리를 해주어야 한다.demo
토픽에 저장된 내용을 단순히 가져와서 읽기 위해서는 spark.read
함수를 사용하면 되지만 실시간으로 streaming하여 읽으려고 하는 경우에는 spark.readStream
함수를 사용해주어야 한다.spark.jars.packages
설정에 들어갈 내용을 추가해주어야 한다.from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("kafka_streaming")
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
.master("local[*]")
.getOrCreate()
)
spark.read
함수를 사용하여 토픽에 저장되어 있는 데이터를 일회성으로 읽어와 처리하는 코드를 작성한다.topic_name = "demo"
bootstrap_servers = "kafka1:19091,kafka2:19092,kafka3:19093"
kafka_df = spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("subscribe", topic_name) \
.option("startingOffsets", "earliest") \
.load()
kafka_df.show()
+----+--------------------+-----+---------+------+--------------------+-------------+
| key| value|topic|partition|offset| timestamp|timestampType|
+----+--------------------+-----+---------+------+--------------------+-------------+
|null|[7B 22 6E 61 6D 6...| demo| 2| 0|2024-05-20 01:22:...| 0|
|null|[7B 22 6E 61 6D 6...| demo| 0| 0|2024-05-20 01:22:...| 0|
|null|[7B 22 6E 61 6D 6...| demo| 2| 1|2024-05-20 01:22:...| 0|
|null|[7B 22 6E 61 6D 6...| demo| 2| 2|2024-05-20 01:53:...| 0|
|null|[7B 22 6E 61 6D 6...| demo| 1| 0|2024-05-20 01:53:...| 0|
+----+--------------------+-----+---------+------+--------------------+-------------+
##데이터에 대한 Schema 선언
schema = T.StructType([
T.StructField("birthdate", T.StringType()),
T.StructField("blood_group", T.StringType()),
T.StructField("job", T.StringType()),
T.StructField("name", T.StringType()),
T.StructField("residence", T.StringType()),
T.StructField("sex", T.StringType()),
T.StructField("ssn", T.StringType()),
T.StructField("uuid", T.StringType()),
T.StructField("timestamp", T.TimestampType()),
])
## value 컬럼 내용 변환 후 table 형태로 가공
value_df = kafka_df.select(F.from_json(F.col("value").cast("string"), schema).alias("value"))
processed_df = value_df.selectExpr(
"value.birthdate",
"value.blood_group",
"value.job",
"value.name",
"value.residence",
"value.sex",
"value.ssn",
"value.uuid",
"value.timestamp"
)
processed_df.show()
+---------+-----------+--------------------+-----------------+--------------------+---+-----------+--------------------+-------------------+
|birthdate|blood_group| job| name| residence|sex| ssn| uuid| timestamp|
+---------+-----------+--------------------+-----------------+--------------------+---+-----------+--------------------+-------------------+
| 20200724| B+|Medical sales rep...| Rachel Moore|166 Peters Valley...| F|066-83-1189|9qA2gsGoA2mm9Ka9x...|2024-05-20 01:22:53|
| 19690318| A+| Arboriculturist| Jeffery Thompson|999 Simmons River...| M|247-66-1932|PPtpVZVJvzm6TCByK...|2024-05-20 01:54:00|
| 19360621| A+|Lecturer, further...| Richard Trevino|USCGC Burnett\nFP...| M|354-23-9711|LyF7YaaZhU3PZzHoP...|2024-05-20 01:54:06|
| 20240305| A+|Accountant, chart...|Taylor Washington|2495 Nelson Field...| M|796-40-1135|VSAhzdQLTcnkK6YsA...|2024-05-20 01:54:08|
| 19590717| B-|Maintenance engineer| Barbara Johnson|483 Selena Locks ...| F|877-04-5763|X4myut2ntC8WpH4gf...|2024-05-20 01:54:10|
+---------+-----------+--------------------+-----------------+--------------------+---+-----------+--------------------+-------------------+
df_console = processed_df.writeStream \
.format("console") \
.outputMode("append") \
.option("checkpointLocation", "checkpoint_dir/kafka_streaming") \
.trigger(processingTime="5 seconds") \
.start()
df_console.awaitTermination()
.outputMode("append")
: 마지막 트리거 이후 새롭게 처리된 행들에 대해서만 처리option("checkpointLocation", "checkpoint_dir/kafka_streaming")
: 중복처리 방지를 위해 데이터 처리 기록을 남기기 위한 디렉토리trigger(processingTime="5 seconds")
: write 되는 빈도awaitTermination()
: writeStream 코드가 종료될때까지 코드를 계속 실행하며 기다린다.spark.readStream
함수를 통해 데이터를 실시간으로 읽어와 처리하고 콘솔에 print 하는 통합 코드(kafka_streaming.py)를 구성해본다.kafka_streaming.py
from pyspark.sql import SparkSession
topic_name = "demo"
bootstrap_servers = "kafka1:19091,kafka2:19092,kafka3:19093"
spark = (
SparkSession
.builder
.appName("kafka_streaming")
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
.master("local[*]")
.getOrCreate()
)
kafka_df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_servers)
.option("subscribe", topic_name)
.option("startingOffsets", "earliest")
.load()
)
schema = T.StructType([
T.StructField("birthdate", T.StringType()),
T.StructField("blood_group", T.StringType()),
T.StructField("job", T.StringType()),
T.StructField("name", T.StringType()),
T.StructField("residence", T.StringType()),
T.StructField("sex", T.StringType()),
T.StructField("ssn", T.StringType()),
T.StructField("uuid", T.StringType()),
T.StructField("timestamp", T.TimestampType()),
])
value_df = kafka_df.select(F.from_json(F.col("value").cast("string"), schema).alias("value"))
processed_df = value_df.selectExpr(
"value.birthdate",
"value.blood_group",
"value.job",
"value.name",
"value.residence",
"value.sex",
"value.ssn",
"value.uuid",
"value.timestamp"
)
df_console = processed_df.writeStream \
.format("console") \
.outputMode("append") \
.option("checkpointLocation", "checkpoint_dir/kafka_streaming") \
.trigger(processingTime="5 seconds") \
.start()
df_console.awaitTermination()
kafka_streaming.py
파일을 실행하여 대기를 시켜주고 fakedata.py
파일 실행을 통해 kafka cluster에 fake data를 보내준다.