[Spark] Multi Node Spark Cluster에서 Spark Streaming을 통한 실시간 파일 처리

NewNewDaddy·2024년 5월 17일
0

SPARK

목록 보기
14/16
post-thumbnail
post-custom-banner

🔹 0. INTRO

  • Spark Streaming은 실시간으로 발생되는 데이터들을 받아 group by나 aggregation 등의 처리 작업들 수행할 수 있는 아파치 스파크의 확장 기능입니다.
  • 실제로 보면 완전 실시간으로 데이터가 조합되며 처리되는 구조는 아니고, 아주 짧은 배치 간격(Micro-Batch)으로 데이터를 처리하는 것이기 때문에 간격이 매우 짧은 배치 처리라고 할 수도 있지만 이 또한 실시간 Stream 처리라고 볼 수 있다.
  • Kafka, Kinesis 등의 툴들을 통해 실시간으로 전달되는 데이터를 받아와 처리하는 과정에서 Spark Streaming이 쓰일 수 있다.

🔹 1. Spark Cluster 환경 구성

  • Spark Streaming 관련 실습을 위해 단순하게 pyspark 환경이 구성되어 있는 jupyter lab 이미지를 Docker로 올려도 되지만 Spark Cluster 환경을 구성하여 partition이나 shuffle 등을 성능 튜닝 작업이 더 용이하도록 아래와 같이 도커 컨테이너를 구성한다.
    • Spark Master - 1개
    • Spark Worker - 3개
    • Spark History Server - 1개
    • Pyspark Jupyter Lab - 1개
  • Pyspark Jupyter Lab에 사용된 이미지는 필요한 라이브러리들이 미리 install된 커스텀 이미지를 사용하였으며 전체적인 Docker Compose 파일은 아래와 같다.

docker-compose.yaml

version: '3'
services:
  pyspark-jupyter:
    image: hyunsoolee0506/sparkcluster:3.3.0-v2
    container_name: pyspark-jupyter-lab
    ports:
        - 8888:8888
        - 4040:4040
    environment:
        JUPYTER_PORT: 8888
        SPARK_UI_PORT: 4040
    volumes:
        - spark_data:/data:rw
        - ./sparklab:/home/workspace
        - /tmp/spark-events-local:/tmp/spark-events
    restart: always
  
  spark-master:
    image: bde2020/spark-master:3.3.0-hadoop3.3
    container_name: spark-master
    ports:
      - "8080:8080"
      - "7077:7077"
    environment:
      - INIT_DAEMON_STEP=setup_spark
      - ENABLE_INIT_DAEMON=true
    volumes:
        - spark_data:/data:rw
    restart: always

  spark-worker-1:
    image: bde2020/spark-worker:3.3.0-hadoop3.3
    container_name: spark-worker-1
    depends_on:
      - spark-master
    ports:
      - "8081:8081"
    stdin_open: true
    tty: true
    environment:
      - "SPARK_MASTER=spark://spark-master:7077"
    volumes:
        - spark_data:/data:rw
    restart: always

  spark-worker-2:
    image: bde2020/spark-worker:3.3.0-hadoop3.3
    container_name: spark-worker-2
    depends_on:
      - spark-master
    ports:
      - "8082:8081"
    stdin_open: true
    tty: true
    environment:
      - "SPARK_MASTER=spark://spark-master:7077"
    volumes:
        - spark_data:/data:rw
    restart: always

  spark-worker-3:
    image: bde2020/spark-worker:3.3.0-hadoop3.3
    container_name: spark-worker-3
    depends_on:
      - spark-master
    ports:
      - "8083:8081"
    stdin_open: true
    tty: true
    environment:
      - "SPARK_MASTER=spark://spark-master:7077"
    volumes:
        - spark_data:/data:rw
    restart: always

  spark-history-server:
    image: bde2020/spark-history-server:3.3.0-hadoop3.3
    container_name: spark-history-server
    depends_on:
      - spark-master
    ports:
      - "18081:18081"
    volumes:
      - /tmp/spark-events-local:/tmp/spark-events
    restart: always

volumes:
  spark_data:
  • docker compose up -d 명령어를 통해 파일을 실행시켜주면 6개의 컨테이너가 올라가고 정상적으로 생성이 되었다면 아래 URL에서 각각의 서버에 접속이 가능하다.
    • localhost:8888 - Jupyter Lab
    • localhost:18081 - Spark History Server

🔹 2. Spark Streaming - File

  • Spark Streaming 처리를 할 수 있는 Source는 아래와 같이 크게 세가지로 분류할 수 있을 것입니다.
    1. Socket을 통해 들어오는 데이터
    2. File형태로 쌓이는 데이터
    3. Kafka 등 실시간 수집 Tool을 통해 들어오는 데이터
  • 이번 글에서는 이 중 두 번째인 File 형태로 쌓이는 데이터에 대한 실시간 처리를 해보도록 하겠습니다

1) Spark Session 생성

from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import window as W
from pyspark.sql import functions as F

spark = (
    SparkSession 
    .builder 
    .appName("Streaming Process Files") 
    .master("local[*]") 
    .getOrCreate()
)

spark.conf.set("spark.streaming.stopGracefullyOnShutdown", True)
spark.conf.set("spark.sql.shuffle.partitions", 8)
spark.conf.set("spark.sql.streaming.schemaInference", True)
  • spark.streaming.stopGracefullyOnShutdown : 애플리케이션이 종료될 때, 모든 처리 중인 작업을 완료하고 안전하게 종료되도록 설정하는 옵션
  • spark.sql.shuffle.partitions : 데이터가 파티셔닝 될 때 파티션 개수를 설정하는 옵션
  • spark.sql.streaming.schemaInference : 실시간성 데이터 처리시 처리되는 데이터에 대한 schema 정보를 확인할 수 있도록 해주는 옵션

🔹 3. File Streaming Read 코드 작성

  • 특정 경로에 저장되는 file을 streaming 할 때 줄 수 있는 옵션에는 3가지가 있다.
    1. off(default) : 아무런 변화 없음 (Default 옵션)
    2. delete : 처리 완료된 파일은 삭제
    3. archive : 처리 완료된 파일은 'sourceArchiveDir' 경로로 이동

1) off/delete option

df_01 = (
    spark
    .readStream
    .option("cleanSource", "archive")
    .option("maxFilesPerTrigger", 1)
    .format("json")
    .load("./dataset/") # 읽을 파일이 있는 경로
)

2) archive option

df_01 = (
    spark
    .readStream
    .option("cleanSource", "archive")
    .option("sourceArchiveDir", "archive/dir") # archive되는 파일의 이동 경로
    .option("maxFilesPerTrigger", 1)
    .format("json")
    .load("./dataset/") # 읽을 파일이 있는 경로
)
  • cleanSource : 읽기 완료된 파일에 대한 처리 옵션
  • sourceArchiveDir : archive 옵션 선택시 처리 완료된 파일이 옮겨질 경로
  • maxFilesPerTrigger : streaming시 한 번에 읽을 파일의 수

🔹 4. Read한 File 데이터 처리 코드 작성

  • 실시간으로 들어오는 파일은 아래와 같은 형태를 가지고 있다.(예시)
{
  'customerId': 'CI00103',
 'data': {'devices': [{'deviceId': 'D001',
                       'measure': 'C',
                       'status': 'ERROR',
                       'temperature': 15},
                      {'deviceId': 'D002',
                       'measure': 'C',
                       'status': 'SUCCESS',
                       'temperature': 16}]},
 'eventId': 'e3cb26d3-41b2-49a2-84f3-0156ed8d7502',
 'eventOffset': 10001,
 'eventPublisher': 'device',
 'eventTime': '2023-01-05 11:13:53.643364'
}
  • Nested된 구조이기 때문에 explode하는 과정이 필요한데 아래 코드를 거치게되면 데이터의 nested 구조가 flatten된다.
df_01 = spark.read.json("./dataset/device_01.json")
df_01.printSchema()

    root
     |-- customerId: string (nullable = true)
     |-- data: struct (nullable = true)
     |    |-- devices: array (nullable = true)
     |    |    |-- element: struct (containsNull = true)
     |    |    |    |-- deviceId: string (nullable = true)
     |    |    |    |-- measure: string (nullable = true)
     |    |    |    |-- status: string (nullable = true)
     |    |    |    |-- temperature: long (nullable = true)
     |-- eventId: string (nullable = true)
     |-- eventOffset: long (nullable = true)
     |-- eventPublisher: string (nullable = true)
     |-- eventTime: string (nullable = true)
df_02 = df_01.withColumn("data_devices", F.explode("data.devices")).drop("data")

df_03 = \
    df_02 \
        .withColumn("deviceId", F.col("data_devices.deviceId")) \
        .withColumn("measure", F.col("data_devices.measure")) \
        .withColumn("status", F.col("data_devices.status")) \
        .withColumn("temperature", F.col("data_devices.temperature")) \
        .drop("data_devices")

df_03.printSchema()

    root
     |-- customerId: string (nullable = true)
     |-- eventId: string (nullable = true)
     |-- eventOffset: long (nullable = true)
     |-- eventPublisher: string (nullable = true)
     |-- eventTime: string (nullable = true)
     |-- deviceId: string (nullable = true)
     |-- measure: string (nullable = true)
     |-- status: string (nullable = true)
     |-- temperature: long (nullable = true)

🔹 5. 처리된 DataFrame 저장 코드 작성

  • 위에서 flatten 처리된 실시간 파일 데이터에 대하여 저장할 수 있는 writeStream 코드를 작성한다.
  • outputMode는 3가지 종류가 존재하며 데이터를 받아오는 Source의 성격에 따라서 사용하지 못하는 옵션도 존재한다.
    1. complete : 지금까지 처리된 모든 행을 출력
    2. append : 마지막 트리거 이후 새롭게 처리된 행만 출력
    3. update : 마지막 트리거 이후 업데이트된 행만 출력

1) console에 출력

  • 데이터가 처리된 결과물을 console 출력으로 확인할 수 있다.
stream = (
    df_03
    .writeStream
    .format("console")
    .outputMode("append")
    .option("checkpointLocation", "checkpoint_dir/file_streaming")
    .start()
)

stream.awaitTermination()

2) memory에 저장

stream = (
    df_03
    .writeStream
    .format("memory")
    .queryName("memory") # spark sql에 저장될 테이블 이름
    .outputMode("append")
    .option("checkpointLocation", "checkpoint_dir/file_streaming")
    .start()
)

stream.awaitTermination()

3) csv/parquet 등 파일 형태로 저장

stream = (
    df_03
    .writeStream
    .format("csv")
    .outputMode("append")
    .option("header", "true")
    .option("path", "output/02_file_streaming") # CSV 파일 저장 경로
    .option("checkpointLocation", "checkpoint_dir/file_streaming")
    .start()
)

stream.awaitTermination()
  • checkpointLocation : 데이터 처리 내역이 저장되는 경로로 세션이 종료되더라도 해당 디렉토리에 저장된 내용을 참고하여 이미 처리된 데이터들에 대해서는 중복 작업을 하지 않는다.

🔹 6. 전체 코드

  • 파일을 읽는 readStream과 쓰는 writeStream에 여러가지 옵션을 줄 수 있기 때문에 아래의 전체 코드에서는 off 옵션으로 읽고 memory에 결과를 저장하는 작업을 수행하는 코드로 작성을 할 것이다.
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import window as W
from pyspark.sql import functions as F

spark = (
    SparkSession 
    .builder 
    .appName("Streaming Process Files") 
    .master("local[*]") 
    .getOrCreate()
)

spark.conf.set("spark.streaming.stopGracefullyOnShutdown", True)
spark.conf.set("spark.sql.shuffle.partitions", 8)
spark.conf.set("spark.sql.streaming.schemaInference", True)

df_01 = (
    spark
    .readStream
    .option("maxFilesPerTrigger", 1)
    .format("json")
    .load("./dataset/")
	)

df_02 = (
        df_01
        .withColumn("data_devices", F.explode("data.devices"))
        .drop("data")
        )

df_03 = (
        df_02
        .withColumn("deviceId", F.col("data_devices.deviceId"))
        .withColumn("measure", F.col("data_devices.measure"))
        .withColumn("status", F.col("data_devices.status"))
        .withColumn("temperature", F.col("data_devices.temperature"))
        .drop("data_devices")
        )
        
stream = (
    df_03
    .writeStream
    .format("memory")
    .queryName("memory")
    .outputMode("append")
    .option("checkpointLocation", "checkpoint_dir/02_file_streaming")
    .start()
	)

stream.awaitTermination()
  • streaming 처리 완료된 파일 읽기
spark.sql("SELECT * FROM memory").show()

    +----------+--------------------+-----------+--------------+--------------------+--------+-------+-------+-----------+
    |customerId|             eventId|eventOffset|eventPublisher|           eventTime|deviceId|measure| status|temperature|
    +----------+--------------------+-----------+--------------+--------------------+--------+-------+-------+-----------+
    |   CI00101|1450324a-c546-417...|      10038|        device|2023-01-05 11:13:...|    D004|      C|SUCCESS|         20|
    |   CI00101|1450324a-c546-417...|      10038|        device|2023-01-05 11:13:...|    D004|      C|SUCCESS|          1|
    |   CI00101|1450324a-c546-417...|      10038|        device|2023-01-05 11:13:...|    D002|      D|SUCCESS|         21|
    |   CI00108|aa90011f-3967-496...|      10003|        device|2023-01-05 11:13:...|    D004|      C|SUCCESS|         16|
    |   CI00103|e3cb26d3-41b2-49a...|      10001|        device|2023-01-05 11:13:...|    D001|      C|  ERROR|         15|
    |   CI00103|e3cb26d3-41b2-49a...|      10001|        device|2023-01-05 11:13:...|    D002|      C|SUCCESS|         16|
    |   CI00101|1450324a-c546-417...|      10038|        device|2023-01-05 11:13:...|    D004|      C|SUCCESS|         20|
    |   CI00101|1450324a-c546-417...|      10038|        device|2023-01-05 11:13:...|    D004|      C|SUCCESS|          1|
    |   CI00101|1450324a-c546-417...|      10038|        device|2023-01-05 11:13:...|    D002|      C|SUCCESS|         21|
    +----------+--------------------+-----------+--------------+--------------------+--------+-------+-------+-----------+

🔹 7. OUTRO

  • 이번 글에서는 특정 디렉토리에 쌓이는 File 데이터를 Spark Streaming을 통해 실시간으로 처리하는 코드를 다뤄 보았다. 이후 글에서는 Kafka를 통해 수집되는 데이터를 읽어 처리하는 방법에 대해서 다뤄보도록 할 것이다.
profile
데이터 엔지니어의 작업공간 / #PYTHON #SPARK #AWS #NCLOUD
post-custom-banner

0개의 댓글