- Spark Master - 1개
- Spark Worker - 3개
- Spark History Server - 1개
- Pyspark Jupyter Lab - 1개
docker-compose.yaml
version: '3'
services:
pyspark-jupyter:
image: hyunsoolee0506/sparkcluster:3.3.0-v2
container_name: pyspark-jupyter-lab
ports:
- 8888:8888
- 4040:4040
environment:
JUPYTER_PORT: 8888
SPARK_UI_PORT: 4040
volumes:
- spark_data:/data:rw
- ./sparklab:/home/workspace
- /tmp/spark-events-local:/tmp/spark-events
restart: always
spark-master:
image: bde2020/spark-master:3.3.0-hadoop3.3
container_name: spark-master
ports:
- "8080:8080"
- "7077:7077"
environment:
- INIT_DAEMON_STEP=setup_spark
- ENABLE_INIT_DAEMON=true
volumes:
- spark_data:/data:rw
restart: always
spark-worker-1:
image: bde2020/spark-worker:3.3.0-hadoop3.3
container_name: spark-worker-1
depends_on:
- spark-master
ports:
- "8081:8081"
stdin_open: true
tty: true
environment:
- "SPARK_MASTER=spark://spark-master:7077"
volumes:
- spark_data:/data:rw
restart: always
spark-worker-2:
image: bde2020/spark-worker:3.3.0-hadoop3.3
container_name: spark-worker-2
depends_on:
- spark-master
ports:
- "8082:8081"
stdin_open: true
tty: true
environment:
- "SPARK_MASTER=spark://spark-master:7077"
volumes:
- spark_data:/data:rw
restart: always
spark-worker-3:
image: bde2020/spark-worker:3.3.0-hadoop3.3
container_name: spark-worker-3
depends_on:
- spark-master
ports:
- "8083:8081"
stdin_open: true
tty: true
environment:
- "SPARK_MASTER=spark://spark-master:7077"
volumes:
- spark_data:/data:rw
restart: always
spark-history-server:
image: bde2020/spark-history-server:3.3.0-hadoop3.3
container_name: spark-history-server
depends_on:
- spark-master
ports:
- "18081:18081"
volumes:
- /tmp/spark-events-local:/tmp/spark-events
restart: always
volumes:
spark_data:
docker compose up -d
명령어를 통해 파일을 실행시켜주면 6개의 컨테이너가 올라가고 정상적으로 생성이 되었다면 아래 URL에서 각각의 서버에 접속이 가능하다.
- localhost:8888 - Jupyter Lab
- localhost:18081 - Spark History Server
File 형태로 쌓이는 데이터
에 대한 실시간 처리를 해보도록 하겠습니다from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import window as W
from pyspark.sql import functions as F
spark = (
SparkSession
.builder
.appName("Streaming Process Files")
.master("local[*]")
.getOrCreate()
)
spark.conf.set("spark.streaming.stopGracefullyOnShutdown", True)
spark.conf.set("spark.sql.shuffle.partitions", 8)
spark.conf.set("spark.sql.streaming.schemaInference", True)
spark.streaming.stopGracefullyOnShutdown
: 애플리케이션이 종료될 때, 모든 처리 중인 작업을 완료하고 안전하게 종료되도록 설정하는 옵션spark.sql.shuffle.partitions
: 데이터가 파티셔닝 될 때 파티션 개수를 설정하는 옵션spark.sql.streaming.schemaInference
: 실시간성 데이터 처리시 처리되는 데이터에 대한 schema 정보를 확인할 수 있도록 해주는 옵션
- off(default) : 아무런 변화 없음 (Default 옵션)
- delete : 처리 완료된 파일은 삭제
- archive : 처리 완료된 파일은 'sourceArchiveDir' 경로로 이동
df_01 = (
spark
.readStream
.option("cleanSource", "archive")
.option("maxFilesPerTrigger", 1)
.format("json")
.load("./dataset/") # 읽을 파일이 있는 경로
)
df_01 = (
spark
.readStream
.option("cleanSource", "archive")
.option("sourceArchiveDir", "archive/dir") # archive되는 파일의 이동 경로
.option("maxFilesPerTrigger", 1)
.format("json")
.load("./dataset/") # 읽을 파일이 있는 경로
)
cleanSource
: 읽기 완료된 파일에 대한 처리 옵션sourceArchiveDir
: archive 옵션 선택시 처리 완료된 파일이 옮겨질 경로maxFilesPerTrigger
: streaming시 한 번에 읽을 파일의 수{
'customerId': 'CI00103',
'data': {'devices': [{'deviceId': 'D001',
'measure': 'C',
'status': 'ERROR',
'temperature': 15},
{'deviceId': 'D002',
'measure': 'C',
'status': 'SUCCESS',
'temperature': 16}]},
'eventId': 'e3cb26d3-41b2-49a2-84f3-0156ed8d7502',
'eventOffset': 10001,
'eventPublisher': 'device',
'eventTime': '2023-01-05 11:13:53.643364'
}
df_01 = spark.read.json("./dataset/device_01.json")
df_01.printSchema()
root
|-- customerId: string (nullable = true)
|-- data: struct (nullable = true)
| |-- devices: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- deviceId: string (nullable = true)
| | | |-- measure: string (nullable = true)
| | | |-- status: string (nullable = true)
| | | |-- temperature: long (nullable = true)
|-- eventId: string (nullable = true)
|-- eventOffset: long (nullable = true)
|-- eventPublisher: string (nullable = true)
|-- eventTime: string (nullable = true)
df_02 = df_01.withColumn("data_devices", F.explode("data.devices")).drop("data")
df_03 = \
df_02 \
.withColumn("deviceId", F.col("data_devices.deviceId")) \
.withColumn("measure", F.col("data_devices.measure")) \
.withColumn("status", F.col("data_devices.status")) \
.withColumn("temperature", F.col("data_devices.temperature")) \
.drop("data_devices")
df_03.printSchema()
root
|-- customerId: string (nullable = true)
|-- eventId: string (nullable = true)
|-- eventOffset: long (nullable = true)
|-- eventPublisher: string (nullable = true)
|-- eventTime: string (nullable = true)
|-- deviceId: string (nullable = true)
|-- measure: string (nullable = true)
|-- status: string (nullable = true)
|-- temperature: long (nullable = true)
writeStream
코드를 작성한다.outputMode
는 3가지 종류가 존재하며 데이터를 받아오는 Source의 성격에 따라서 사용하지 못하는 옵션도 존재한다.
- complete : 지금까지 처리된 모든 행을 출력
- append : 마지막 트리거 이후 새롭게 처리된 행만 출력
- update : 마지막 트리거 이후 업데이트된 행만 출력
stream = (
df_03
.writeStream
.format("console")
.outputMode("append")
.option("checkpointLocation", "checkpoint_dir/file_streaming")
.start()
)
stream.awaitTermination()
stream = (
df_03
.writeStream
.format("memory")
.queryName("memory") # spark sql에 저장될 테이블 이름
.outputMode("append")
.option("checkpointLocation", "checkpoint_dir/file_streaming")
.start()
)
stream.awaitTermination()
stream = (
df_03
.writeStream
.format("csv")
.outputMode("append")
.option("header", "true")
.option("path", "output/02_file_streaming") # CSV 파일 저장 경로
.option("checkpointLocation", "checkpoint_dir/file_streaming")
.start()
)
stream.awaitTermination()
checkpointLocation
: 데이터 처리 내역이 저장되는 경로로 세션이 종료되더라도 해당 디렉토리에 저장된 내용을 참고하여 이미 처리된 데이터들에 대해서는 중복 작업을 하지 않는다.readStream
과 쓰는 writeStream
에 여러가지 옵션을 줄 수 있기 때문에 아래의 전체 코드에서는 off 옵션으로 읽고 memory에 결과를 저장
하는 작업을 수행하는 코드로 작성을 할 것이다.from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import window as W
from pyspark.sql import functions as F
spark = (
SparkSession
.builder
.appName("Streaming Process Files")
.master("local[*]")
.getOrCreate()
)
spark.conf.set("spark.streaming.stopGracefullyOnShutdown", True)
spark.conf.set("spark.sql.shuffle.partitions", 8)
spark.conf.set("spark.sql.streaming.schemaInference", True)
df_01 = (
spark
.readStream
.option("maxFilesPerTrigger", 1)
.format("json")
.load("./dataset/")
)
df_02 = (
df_01
.withColumn("data_devices", F.explode("data.devices"))
.drop("data")
)
df_03 = (
df_02
.withColumn("deviceId", F.col("data_devices.deviceId"))
.withColumn("measure", F.col("data_devices.measure"))
.withColumn("status", F.col("data_devices.status"))
.withColumn("temperature", F.col("data_devices.temperature"))
.drop("data_devices")
)
stream = (
df_03
.writeStream
.format("memory")
.queryName("memory")
.outputMode("append")
.option("checkpointLocation", "checkpoint_dir/02_file_streaming")
.start()
)
stream.awaitTermination()
spark.sql("SELECT * FROM memory").show()
+----------+--------------------+-----------+--------------+--------------------+--------+-------+-------+-----------+
|customerId| eventId|eventOffset|eventPublisher| eventTime|deviceId|measure| status|temperature|
+----------+--------------------+-----------+--------------+--------------------+--------+-------+-------+-----------+
| CI00101|1450324a-c546-417...| 10038| device|2023-01-05 11:13:...| D004| C|SUCCESS| 20|
| CI00101|1450324a-c546-417...| 10038| device|2023-01-05 11:13:...| D004| C|SUCCESS| 1|
| CI00101|1450324a-c546-417...| 10038| device|2023-01-05 11:13:...| D002| D|SUCCESS| 21|
| CI00108|aa90011f-3967-496...| 10003| device|2023-01-05 11:13:...| D004| C|SUCCESS| 16|
| CI00103|e3cb26d3-41b2-49a...| 10001| device|2023-01-05 11:13:...| D001| C| ERROR| 15|
| CI00103|e3cb26d3-41b2-49a...| 10001| device|2023-01-05 11:13:...| D002| C|SUCCESS| 16|
| CI00101|1450324a-c546-417...| 10038| device|2023-01-05 11:13:...| D004| C|SUCCESS| 20|
| CI00101|1450324a-c546-417...| 10038| device|2023-01-05 11:13:...| D004| C|SUCCESS| 1|
| CI00101|1450324a-c546-417...| 10038| device|2023-01-05 11:13:...| D002| C|SUCCESS| 21|
+----------+--------------------+-----------+--------------+--------------------+--------+-------+-------+-----------+