[K8S-SPARK 2부] Spark on K8S 환경에서 custom pyspark 스크립트 실행하기

NewNewDaddy·2023년 12월 23일
0

SPARK

목록 보기
7/17
post-thumbnail

Spark-on-K8S 시리즈 목차

0. INTRO

  • 1편 (spark-on-k8s-operator를 통한 쿠버네티스에서 Spark Job 실행 환경 만들기) 에서 이어지는 글입니다.
  • 이번 글에서는 실행하고자 하는 Custom한 spark 스크립트를 Image에 함께 포함시켜 새로운 Docker Image를 만들어 Spark Operator를 통해 해당 스크립트를 실행해보는 과정을 다뤄볼 것이다. 즉, 원래 사용된 apache/spark 이미지 내에 존재하는 기본 실행파일이 아닌, 내가 원하는 pyspark 스크립트가 Kubernetes Native 하게 실행될 수 있도록 하는 것이다.

1. 베이스로 사용될 Dockerfile 고르기

  • Spark Operator로 작업을 실행할 때 driver와 executor의 Docker Image는 apache/spark 이미지를 사용하였다. 따라서 커스텀할 Docker Image도 해당 이미지와 동일한 것으로 선택했다. 글을 작성하는 2023년 12월 23일 현재, apache/spark:latest는 spark 3.5.0 버전을 사용하고 있다. 이미지의 공식 Dockerfile은 아래 github 링크에서 버전별로 확인이 가능하다.

    
    FROM spark:3.5.0-scala2.12-java11-ubuntu
    
    USER root
    
    RUN set -ex; \
        apt-get update; \
        apt-get install -y python3 python3-pip; \
        rm -rf /var/lib/apt/lists/*
    
    USER spark

2. Docker Image 커스텀하기

  • 위에서 고른 apache/spark:latest Dockerfile을 커스텀하여 새로 생성될 이미지 내에 내가 실행하려고 하는 Spark 스크립트와 csv dataset 등 필요한 파일들을 넣어줄 것이다.

  • 커스텀한 내용은 아래와 같다.

    1. /workspace/pyspark 경로 아래에 spark 스크립트 (pyspark.py)와 해당 스크립트에서 읽어들일 샘플 dataset (names.csv) 파일을 넣어준다.
    2. requirements.txt에 명시된 python 라이브러리들을 설치해준다.
  • 디렉토리 구성도

    custom_image/
          ├── Dockerfile
          ├── names.csv
          ├── read-dataset.py
          └── requirements.txt
    1. Dockerfile

      
      FROM spark:3.4.1-scala2.12-java11-python3-ubuntu
      
      USER root
      
      WORKDIR /workspace/pyspark
      
      COPY . /workspace/pyspark
      
      RUN set -ex; \
          apt-get update; \
          apt-get install -y python3 python3-pip; \
          rm -rf /var/lib/apt/lists/*
      
      RUN pip3 install --no-cache-dir -r requirements.txt
      
      USER spark
    2. read-dataset.py

      • 동일 경로상에 있는 csv 파일을 pandas와 spark로 각각 읽어들인 후 dataframe의 row 10개를 출력하는 간단한 함수를 작성한다.
      
      from pyspark.sql import SparkSession
      import pandas as pd
      import os
      
      spark = SparkSession.builder \
              .master("local") \
              .appName("newnewdaddy") \
              .getOrCreate()
      
      # csv dataset 경로
      data_path = os.path.join(os.getcwd(), 'names.csv')
      
      print("="*100)
      print("FILES IN THIS DIRECTORY")
      print(os.listdir(os.getcwd()))
      print("="*100)
      
      # 1. pandas dataframe 생성 및 출력
      pdf = pd.read_csv(data_path)
      print("="*100)
      print("PANDAS DATAFRAME")
      print(pdf.head(10))
      print("="*100)
      
      # 2. spark dataframe 생성 및 출력
      sdf = spark.read.csv(data_path, header=True)
      print("="*100)
      print("SPARK DATAFRAME")
      print(sdf.show(10))
      print("="*100)
  • Docker Build 진행

    # Dockerhub Push를 위해 'hyunsoolee0506/spark:3.5.0-v0.1' 이름으로 image를 생성해준다.
    
    	> docker build -t hyunsoolee0506/spark:3.5.0-v0.1 .
      
    # Dockerhub에 Push 해준다.
    
    	> docker push hyunsoolee0506/spark:3.5.0-v0.1

3. Custom Docker Image를 사용하여 Spark Job 실행

  • Dockerhub에 이미지 push가 끝났다면 이제 SparkApplication YAML 파일에서 해당 이미지를 사용하여 /workspace/pyspark/read-dataset.py 경로에 있는 python 파일을 실행해준다.

  • custom-job.yaml

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: custom-job-1
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: hyunsoolee0506/spark:3.5.0-v0.1 # Custom Docker Image 이름
  imagePullPolicy: Always
  mainApplicationFile: local:///workspace/pyspark/read-dataset.py # Docker Image 내 실행파일 경로
  sparkVersion: "3.5.0"
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.5.0
    serviceAccount: spark-spark-operator
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.5.0

kubectl apply -f custom-job.yaml

  • 실행 후 확인해보면 custom-job-1으로 명명한 job의 driver pod가 뜨고 얼마 지나지 않아 Completed 된 것을 확인할 수 있다.

  • 완료된 driver pod의 로그를 확인해보면 read-dataset.py 코드가 실행되면서 dataset을 읽어와 안에 적어놓은 print문들이 잘 출력된 것을 볼 수 있다.

23/12/23 13:23:41 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, custom-job-1-95ac168c96d7fb9a-driver-svc.test.svc, 7079, None)
23/12/23 13:23:41 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, custom-job-1-95ac168c96d7fb9a-driver-svc.test.svc, 7079, None)
====================================================================================================
FILES IN THIS DIRECTORY
['requirements.txt', 'Dockerfile', 'names.csv', 'read-dataset.py']
====================================================================================================
====================================================================================================
PANDAS DATAFRAME
   id       name  year gender  count
0   1       Mary  1880      F   7065
1   2       Anna  1880      F   2604
2   3       Emma  1880      F   2003
3   4  Elizabeth  1880      F   1939
4   5     Minnie  1880      F   1746
5   6   Margaret  1880      F   1578
6   7        Ida  1880      F   1472
7   8      Alice  1880      F   1414
8   9     Bertha  1880      F   1320
9  10      Sarah  1880      F   1288
====================================================================================================
.
.
.
.
.
<<중간 생략>>
.
.
.
.
.
====================================================================================================
SPARK DATAFRAME
23/12/23 13:23:49 INFO FileSourceStrategy: Pushed Filters: 
23/12/23 13:23:49 INFO FileSourceStrategy: Post-Scan Filters: 
23/12/23 13:23:49 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 203.7 KiB, free 116.3 MiB)
23/12/23 13:23:49 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 35.4 KiB, free 116.3 MiB)
23/12/23 13:23:49 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on custom-job-1-95ac168c96d7fb9a-driver-svc.test.svc:7079 (size: 35.4 KiB, free: 116.9 MiB)
23/12/23 13:23:49 INFO SparkContext: Created broadcast 3 from showString at <unknown>:0
23/12/23 13:23:49 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4236155 bytes, open cost is considered as scanning 4194304 bytes.
23/12/23 13:23:49 INFO SparkContext: Starting job: showString at <unknown>:0
23/12/23 13:23:49 INFO DAGScheduler: Got job 1 (showString at <unknown>:0) with 1 output partitions
23/12/23 13:23:49 INFO DAGScheduler: Final stage: ResultStage 1 (showString at <unknown>:0)
23/12/23 13:23:49 INFO DAGScheduler: Parents of final stage: List()
23/12/23 13:23:49 INFO DAGScheduler: Missing parents: List()
23/12/23 13:23:49 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[12] at showString at <unknown>:0), which has no missing parents
23/12/23 13:23:49 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 11.0 KiB, free 116.3 MiB)
23/12/23 13:23:49 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 5.9 KiB, free 116.2 MiB)
23/12/23 13:23:50 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on custom-job-1-95ac168c96d7fb9a-driver-svc.test.svc:7079 (size: 5.9 KiB, free: 116.9 MiB)
23/12/23 13:23:50 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1535
23/12/23 13:23:50 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[12] at showString at <unknown>:0) (first 15 tasks are for partitions Vector(0))
23/12/23 13:23:50 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks resource profile 0
23/12/23 13:23:50 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1) (custom-job-1-driver, executor driver, partition 0, PROCESS_LOCAL, 7915 bytes) 
23/12/23 13:23:50 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
23/12/23 13:23:50 INFO FileScanRDD: Reading File path: file:///workspace/pyspark/names.csv, range: 0-41851, partition values: [empty row]
23/12/23 13:23:50 INFO CodeGenerator: Code generated in 30.9133 ms
23/12/23 13:23:50 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1809 bytes result sent to driver
23/12/23 13:23:50 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 129 ms on custom-job-1-driver (executor driver) (1/1)
23/12/23 13:23:50 INFO DAGScheduler: ResultStage 1 (showString at <unknown>:0) finished in 0.201 s
23/12/23 13:23:50 INFO DAGScheduler: Job 1 is finished. Cancelling potential speculative or zombie tasks for this job
23/12/23 13:23:50 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
23/12/23 13:23:50 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
23/12/23 13:23:50 INFO DAGScheduler: Job 1 finished: showString at <unknown>:0, took 0.213727 s
23/12/23 13:23:50 INFO CodeGenerator: Code generated in 69.749829 ms
+---+---------+----+------+-----+
| id|     name|year|gender|count|
+---+---------+----+------+-----+
|  1|     Mary|1880|     F| 7065|
|  2|     Anna|1880|     F| 2604|
|  3|     Emma|1880|     F| 2003|
|  4|Elizabeth|1880|     F| 1939|
|  5|   Minnie|1880|     F| 1746|
|  6| Margaret|1880|     F| 1578|
|  7|      Ida|1880|     F| 1472|
|  8|    Alice|1880|     F| 1414|
|  9|   Bertha|1880|     F| 1320|
| 10|    Sarah|1880|     F| 1288|
+---+---------+----+------+-----+
only showing top 10 rows

None
====================================================================================================
23/12/23 13:23:50 INFO SparkContext: SparkContext is stopping with exitCode 0.

4. OUTRO

  • 이번 글은

    내가 원하는 spark 스크립트를 어떻게 하면 쿠버네티스 환경에서 Spark Operator를 통해 실행되도록 할 수 있을까?

    라는 생각에서 시작되었다.

  • 결국 Kubernetes에서 Spark로 데이터 파이프라인을 구성하려면 내가 실행하고자 하는 코드가 실행되어야 했다. 이미지 내의 샘플 코드들만 계속 실행해봤자 "아~ 이렇게 동작이 되는구나" 그 이상의 의미를 가지지 못하기 때문에 결국에는 어떻게든 나의 코드가 실행될 수 있는 방법을 찾아야 이를 위해 들인 나의 시간들도 의미가 있을 것이었다.

  • 로컬에 있는 코드가 driver와 executor pod 실행시 반영되도록 하기 위해 YAML 인자들을 고쳐도보고, spark-submit의 configure 설정도 해보고, helm chart의 value들을 overriding하여 설치해보는 등 제법 다양한 시도들을 해보았는데 생각만큼 수월하게 해결되지는 않았다. 결국 로컬의 코드를 포함한 새로운 Docker Image를 만들어 YAML에서 참조하는 방법을 선택하였지만 이보다 더 유연하고 편하게 driver와 executor pod들을 다룰 수 있도록 하는 것은 나의 숙제로 남게 되었다.

  • 다음 글에서는

    어떻게 하면 Container 환경에서 작동하는 spark 스크립트가 AWS S3혹은 Ncloud Object Storage 등의 Cloud Object Storage에 테이블을 저장할 수 있을까

    에 대하여 다뤄볼 예정이다.

profile
데이터 엔지니어의 작업공간 / #PYTHON #CLOUD #SPARK #AWS #GCP #NCLOUD

2개의 댓글

comment-user-thumbnail
2024년 5월 17일

안녕하세요. 글 항상 잘 보고 있습니다.
도커파일에서 run 부분에 rm -rf /var/lib/apt/lists/*
이거 해주는 이유가 무엇인가요?

1개의 답글