apache/spark
이미지 내에 존재하는 기본 실행파일이 아닌, 내가 원하는 pyspark 스크립트가 Kubernetes Native 하게 실행될 수 있도록 하는 것이다.Spark Operator로 작업을 실행할 때 driver와 executor의 Docker Image는 apache/spark
이미지를 사용하였다. 따라서 커스텀할 Docker Image도 해당 이미지와 동일한 것으로 선택했다. 글을 작성하는 2023년 12월 23일 현재, apache/spark:latest
는 spark 3.5.0 버전을 사용하고 있다. 이미지의 공식 Dockerfile은 아래 github 링크에서 버전별로 확인이 가능하다.
FROM spark:3.5.0-scala2.12-java11-ubuntu
USER root
RUN set -ex; \
apt-get update; \
apt-get install -y python3 python3-pip; \
rm -rf /var/lib/apt/lists/*
USER spark
위에서 고른 apache/spark:latest
Dockerfile을 커스텀하여 새로 생성될 이미지 내에 내가 실행하려고 하는 Spark 스크립트와 csv dataset 등 필요한 파일들을 넣어줄 것이다.
커스텀한 내용은 아래와 같다.
/workspace/pyspark
경로 아래에 spark 스크립트 (pyspark.py)와 해당 스크립트에서 읽어들일 샘플 dataset (names.csv) 파일을 넣어준다.requirements.txt
에 명시된 python 라이브러리들을 설치해준다.디렉토리 구성도
custom_image/
├── Dockerfile
├── names.csv
├── read-dataset.py
└── requirements.txt
Dockerfile
FROM spark:3.4.1-scala2.12-java11-python3-ubuntu
USER root
WORKDIR /workspace/pyspark
COPY . /workspace/pyspark
RUN set -ex; \
apt-get update; \
apt-get install -y python3 python3-pip; \
rm -rf /var/lib/apt/lists/*
RUN pip3 install --no-cache-dir -r requirements.txt
USER spark
read-dataset.py
from pyspark.sql import SparkSession
import pandas as pd
import os
spark = SparkSession.builder \
.master("local") \
.appName("newnewdaddy") \
.getOrCreate()
# csv dataset 경로
data_path = os.path.join(os.getcwd(), 'names.csv')
print("="*100)
print("FILES IN THIS DIRECTORY")
print(os.listdir(os.getcwd()))
print("="*100)
# 1. pandas dataframe 생성 및 출력
pdf = pd.read_csv(data_path)
print("="*100)
print("PANDAS DATAFRAME")
print(pdf.head(10))
print("="*100)
# 2. spark dataframe 생성 및 출력
sdf = spark.read.csv(data_path, header=True)
print("="*100)
print("SPARK DATAFRAME")
print(sdf.show(10))
print("="*100)
Docker Build 진행
# Dockerhub Push를 위해 'hyunsoolee0506/spark:3.5.0-v0.1' 이름으로 image를 생성해준다.
> docker build -t hyunsoolee0506/spark:3.5.0-v0.1 .
# Dockerhub에 Push 해준다.
> docker push hyunsoolee0506/spark:3.5.0-v0.1
Dockerhub에 이미지 push가 끝났다면 이제 SparkApplication
YAML 파일에서 해당 이미지를 사용하여 /workspace/pyspark/read-dataset.py
경로에 있는 python 파일을 실행해준다.
custom-job.yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: custom-job-1
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: hyunsoolee0506/spark:3.5.0-v0.1 # Custom Docker Image 이름
imagePullPolicy: Always
mainApplicationFile: local:///workspace/pyspark/read-dataset.py # Docker Image 내 실행파일 경로
sparkVersion: "3.5.0"
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.5.0
serviceAccount: spark-spark-operator
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.5.0
kubectl apply -f custom-job.yaml
실행 후 확인해보면 custom-job-1
으로 명명한 job의 driver pod가 뜨고 얼마 지나지 않아 Completed 된 것을 확인할 수 있다.
완료된 driver pod의 로그를 확인해보면 read-dataset.py
코드가 실행되면서 dataset을 읽어와 안에 적어놓은 print문들이 잘 출력된 것을 볼 수 있다.
23/12/23 13:23:41 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, custom-job-1-95ac168c96d7fb9a-driver-svc.test.svc, 7079, None)
23/12/23 13:23:41 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, custom-job-1-95ac168c96d7fb9a-driver-svc.test.svc, 7079, None)
====================================================================================================
FILES IN THIS DIRECTORY
['requirements.txt', 'Dockerfile', 'names.csv', 'read-dataset.py']
====================================================================================================
====================================================================================================
PANDAS DATAFRAME
id name year gender count
0 1 Mary 1880 F 7065
1 2 Anna 1880 F 2604
2 3 Emma 1880 F 2003
3 4 Elizabeth 1880 F 1939
4 5 Minnie 1880 F 1746
5 6 Margaret 1880 F 1578
6 7 Ida 1880 F 1472
7 8 Alice 1880 F 1414
8 9 Bertha 1880 F 1320
9 10 Sarah 1880 F 1288
====================================================================================================
.
.
.
.
.
<<중간 생략>>
.
.
.
.
.
====================================================================================================
SPARK DATAFRAME
23/12/23 13:23:49 INFO FileSourceStrategy: Pushed Filters:
23/12/23 13:23:49 INFO FileSourceStrategy: Post-Scan Filters:
23/12/23 13:23:49 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 203.7 KiB, free 116.3 MiB)
23/12/23 13:23:49 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 35.4 KiB, free 116.3 MiB)
23/12/23 13:23:49 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on custom-job-1-95ac168c96d7fb9a-driver-svc.test.svc:7079 (size: 35.4 KiB, free: 116.9 MiB)
23/12/23 13:23:49 INFO SparkContext: Created broadcast 3 from showString at <unknown>:0
23/12/23 13:23:49 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4236155 bytes, open cost is considered as scanning 4194304 bytes.
23/12/23 13:23:49 INFO SparkContext: Starting job: showString at <unknown>:0
23/12/23 13:23:49 INFO DAGScheduler: Got job 1 (showString at <unknown>:0) with 1 output partitions
23/12/23 13:23:49 INFO DAGScheduler: Final stage: ResultStage 1 (showString at <unknown>:0)
23/12/23 13:23:49 INFO DAGScheduler: Parents of final stage: List()
23/12/23 13:23:49 INFO DAGScheduler: Missing parents: List()
23/12/23 13:23:49 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[12] at showString at <unknown>:0), which has no missing parents
23/12/23 13:23:49 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 11.0 KiB, free 116.3 MiB)
23/12/23 13:23:49 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 5.9 KiB, free 116.2 MiB)
23/12/23 13:23:50 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on custom-job-1-95ac168c96d7fb9a-driver-svc.test.svc:7079 (size: 5.9 KiB, free: 116.9 MiB)
23/12/23 13:23:50 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1535
23/12/23 13:23:50 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[12] at showString at <unknown>:0) (first 15 tasks are for partitions Vector(0))
23/12/23 13:23:50 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks resource profile 0
23/12/23 13:23:50 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1) (custom-job-1-driver, executor driver, partition 0, PROCESS_LOCAL, 7915 bytes)
23/12/23 13:23:50 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
23/12/23 13:23:50 INFO FileScanRDD: Reading File path: file:///workspace/pyspark/names.csv, range: 0-41851, partition values: [empty row]
23/12/23 13:23:50 INFO CodeGenerator: Code generated in 30.9133 ms
23/12/23 13:23:50 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1809 bytes result sent to driver
23/12/23 13:23:50 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 129 ms on custom-job-1-driver (executor driver) (1/1)
23/12/23 13:23:50 INFO DAGScheduler: ResultStage 1 (showString at <unknown>:0) finished in 0.201 s
23/12/23 13:23:50 INFO DAGScheduler: Job 1 is finished. Cancelling potential speculative or zombie tasks for this job
23/12/23 13:23:50 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
23/12/23 13:23:50 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
23/12/23 13:23:50 INFO DAGScheduler: Job 1 finished: showString at <unknown>:0, took 0.213727 s
23/12/23 13:23:50 INFO CodeGenerator: Code generated in 69.749829 ms
+---+---------+----+------+-----+
| id| name|year|gender|count|
+---+---------+----+------+-----+
| 1| Mary|1880| F| 7065|
| 2| Anna|1880| F| 2604|
| 3| Emma|1880| F| 2003|
| 4|Elizabeth|1880| F| 1939|
| 5| Minnie|1880| F| 1746|
| 6| Margaret|1880| F| 1578|
| 7| Ida|1880| F| 1472|
| 8| Alice|1880| F| 1414|
| 9| Bertha|1880| F| 1320|
| 10| Sarah|1880| F| 1288|
+---+---------+----+------+-----+
only showing top 10 rows
None
====================================================================================================
23/12/23 13:23:50 INFO SparkContext: SparkContext is stopping with exitCode 0.
이번 글은
내가 원하는 spark 스크립트를 어떻게 하면 쿠버네티스 환경에서 Spark Operator를 통해 실행되도록 할 수 있을까?
라는 생각에서 시작되었다.
결국 Kubernetes에서 Spark로 데이터 파이프라인을 구성하려면 내가 실행하고자 하는 코드가 실행되어야 했다. 이미지 내의 샘플 코드들만 계속 실행해봤자 "아~ 이렇게 동작이 되는구나" 그 이상의 의미를 가지지 못하기 때문에 결국에는 어떻게든 나의 코드가 실행될 수 있는 방법을 찾아야 이를 위해 들인 나의 시간들도 의미가 있을 것이었다.
로컬에 있는 코드가 driver와 executor pod 실행시 반영되도록 하기 위해 YAML 인자들을 고쳐도보고, spark-submit의 configure 설정도 해보고, helm chart의 value들을 overriding하여 설치해보는 등 제법 다양한 시도들을 해보았는데 생각만큼 수월하게 해결되지는 않았다. 결국 로컬의 코드를 포함한 새로운 Docker Image를 만들어 YAML에서 참조하는 방법을 선택하였지만 이보다 더 유연하고 편하게 driver와 executor pod들을 다룰 수 있도록 하는 것은 나의 숙제로 남게 되었다.
다음 글에서는
어떻게 하면 Container 환경에서 작동하는 spark 스크립트가 AWS S3혹은 Ncloud Object Storage 등의 Cloud Object Storage에 테이블을 저장할 수 있을까
에 대하여 다뤄볼 예정이다.
안녕하세요. 글 항상 잘 보고 있습니다.
도커파일에서 run 부분에 rm -rf /var/lib/apt/lists/*
이거 해주는 이유가 무엇인가요?