jupyter/scipy-notebook
을 사용하였다. 그러다보니 이미지 크기만 4GB가 넘어가 상당히 무거운 이미지가 되었다.apache/spark
로 바꾸어 조금 더 가벼운 이미지가 생성될 수 있도록 커스텀 작업을 진행하였고 spark 버전 역시 가장 최신 버전인 3.5.0 버전에 맞도록 다른 모듈들 버전들도 전체적으로 바꾸어주었다.apache/spark
베이스 이미지로 사용Dockerfile
FROM apache/spark
USER root
ARG spark_version="3.5.0"
ARG hadoop_version="3"
# https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz.sha512 에서 확인
ARG spark_checksum="8883c67e0a138069e597f3e7d4edbbd5c3a565d50b28644aad02856a1ec1da7cb92b8f80454ca427118f69459ea326eaa073cf7b1a860c3b796f4b07c2101319"
ARG openjdk_version="11"
ARG aws_hadoop_version="3.3.4"
ARG aws_sdk_version="1.12.262"
# 환경 변수 설정
ENV APACHE_SPARK_VERSION="${spark_version}" \
HADOOP_VERSION="${hadoop_version}" \
AWS_SDK_VERSION="${aws_sdk_version}" \
AWS_HADOOP_VERSION="${aws_hadoop_version}"
ENV JUPYTER_ENABLE_LAB=yes
# 기본 라이브러리들 설치
RUN apt-get update --yes; \
apt-get install --yes --no-install-recommends \
"openjdk-${openjdk_version}-jre-headless" \
ca-certificates-java curl awscli; \
set -ex; \
apt-get install -y python3 python3-pip; \
apt-get clean; \
rm -rf /var/lib/apt/lists/*
# Spark Install
WORKDIR /opt
RUN wget -q "https://archive.apache.org/dist/spark/spark-${APACHE_SPARK_VERSION}/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" && \
echo "${spark_checksum} *spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" | sha512sum -c - && \
tar -zxvf "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" -C /opt --owner root --group root --no-same-owner && \
rm "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz"
# Configure Spark
ENV SPARK_HOME="/opt/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}"
ENV SPARK_TGZ_URL="https://archive.apache.org/dist/spark/spark-${APACHE_SPARK_VERSION}/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz"
ENV SPARK_TGZ_ASC_URL="https://archive.apache.org/dist/spark/spark-${APACHE_SPARK_VERSION}/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz.asc"
ENV PATH="${PATH}:${SPARK_HOME}/bin"
# Fix Spark installation for Java 11 and Apache Arrow library
RUN cp -p "${SPARK_HOME}/conf/spark-defaults.conf.template" "${SPARK_HOME}/conf/spark-defaults.conf" && \
echo 'spark.driver.extraJavaOptions -Dio.netty.tryReflectionSetAccessible=true' >> "${SPARK_HOME}/conf/spark-defaults.conf" && \
echo 'spark.executor.extraJavaOptions -Dio.netty.tryReflectionSetAccessible=true' >> "${SPARK_HOME}/conf/spark-defaults.conf"
# Adding hadoop-aws and aws-sdk
RUN wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${AWS_HADOOP_VERSION}/hadoop-aws-${AWS_HADOOP_VERSION}.jar -P "${SPARK_HOME}/jars/" && \
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -P "${SPARK_HOME}/jars/"
# Adding JetS3t libary
RUN wget https://repo1.maven.org/maven2/net/java/dev/jets3t/jets3t/0.9.4/jets3t-0.9.4.jar -P "${SPARK_HOME}/jars/"
# pyspark 스크립트 경로 설정
WORKDIR /workspace/pyspark
COPY . /workspace/pyspark
# 필요 라이브러리 설치 및 필요없는 디렉토리 제거
RUN pip3 install --no-cache-dir -r requirements.txt; \
rm -rf /opt/spark
위의 Dockerfile로 이미지를 생성하면 pyspark script는 /workspace/pyspark
이하에 저장되게 된다.
스크립트와 동일 경로에 있는 names.csv
파일을 읽어 AWS S3와 Ncloud OS에 parquet 파일 형태로 저장하는 스크립트를 각각 작성해보자.
from pyspark.sql import SparkSession
import os
spark = SparkSession.builder \
.master("local") \
.appName("hadoop-aws") \
.getOrCreate()
sc = spark.sparkContext
access_key = "[AWS API ID KEY]"
secret_key = "[AWS SECRET KEY]"
# HADOOP 관련 설정
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
df = spark.read.csv(os.path.join(os.getcwd(), 'names.csv'), header=True)
df.write.mode('overwrite').parquet(f's3a://[S3-BUCKET]/custom/dir/path')
from pyspark.sql import SparkSession
import os
spark = SparkSession.builder \
.master("local") \
.appName("hadoop-ncloud") \
.getOrCreate()
sc = spark.sparkContext
access_key = "[NCLOUD API KEY ID]"
secret_key = "[NCLOUD SECRET KEY]"
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://kr.objectstorage.ncloud.com")
df = spark.read.csv(os.path.join(os.getcwd(), 'names.csv'), header=True)
df.write.mode('overwrite').parquet(f's3a://[NOS-BUCKET]/custom/dir/path')
Spark Operator에서 실행될 위의 두 스크립트가 포함될 수 있도록 Docker Image를 생성한다. 이미지 이름은 hyunsoolee0506/spark:3.5.0
으로 하였다.
# Dockerfile build
> docker build -t hyunsoolee0506/spark:3.5.0 .
[+] Building 45.6s (16/16) FINISHED
=> [internal] load build definition from Dockerfile 0.1s
=> => transferring dockerfile: 2.82kB 0.0s
=> [internal] load .dockerignore 0.1s
=> => transferring context: 2B 0.0s
=> [internal] load metadata for docker.io/apache/spark:latest 2.0s
=> [auth] apache/spark:pull token for registry-1.docker.io 0.0s
=> [internal] load build context 0.0s
=> => transferring context: 2.99kB 0.0s
=> [ 1/10] FROM docker.io/apache/spark@sha256:0ed5154e6b32ac3af1272d4d65e9f65b13afcfe80b41ad10bd059bcd6317863c 0.0s
=> CACHED [ 2/10] RUN apt-get update --yes; apt-get install --yes --no-install-recommends "openjdk-11-jre-headless" ca-certificates-java curl awscli; set -ex; apt-get install -y python3 python3- 0.0s
=> CACHED [ 3/10] WORKDIR /opt 0.0s
=> CACHED [ 4/10] RUN wget -q "https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz" && echo "2385CB772F21B014CE2ABD6B8F5E815721580D6E8BC42A26D70BBCDDA8D303D886A6F12B36D40F6971B5547B7 0.0s
=> CACHED [ 5/10] RUN cp -p "/opt/spark-3.1.2-bin-hadoop3.2/conf/spark-defaults.conf.template" "/opt/spark-3.1.2-bin-hadoop3.2/conf/spark-defaults.conf" && echo 'spark.driver.extraJavaOptions -Dio.netty.tryRefl 0.0s
=> CACHED [ 6/10] RUN wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar -P "/opt/spark-3.1.2-bin-hadoop3.2/jars/" && wget https://repo1.maven.org/maven2/com/amazonaws/a 0.0s
=> CACHED [ 7/10] RUN wget https://repo1.maven.org/maven2/net/java/dev/jets3t/jets3t/0.9.4/jets3t-0.9.4.jar -P "/opt/spark-3.1.2-bin-hadoop3.2/jars/" 0.0s
=> CACHED [ 8/10] WORKDIR /workspace/pyspark 0.0s
=> [ 9/10] COPY . /workspace/pyspark 0.0s
=> [10/10] RUN pip3 install --no-cache-dir -r requirements.txt; rm -rf /opt/spark 40.9s
=> exporting to image 2.5s
=> => exporting layers 2.4s
=> => writing image sha256:ace5e474a844c501f17220ffaf8dd9aa73004ea36975fd2279e9f634332a969e 0.0s
=> => naming to docker.io/library/hyunsoolee0506/spark:3.5.0
# Docker Push
> docker push hyunsoolee0506/spark:3.5.0
.
.
.
.
1646200a958a: Mounted from apache/spark
6d3bc029a93b: Mounted from apache/spark
59d98d956092: Mounted from apache/spark
a295ff97538c: Mounted from apache/spark
0b4923f16514: Mounted from apache/spark
954c82bdeb5f: Mounted from apache/spark
3.5.0: digest: sha256:7ac6bcda10daf05e474be01944b20bdddc6b58cd49c813aeee58c2e7868d4a02 size: 4516
hyunsoolee0506/spark:3.5.0
Docker Image의 /workspace/pyspark
디렉토리 이하 구조는 아래와 같을 것이다.pyspark/
├── Dockerfile
├── names.csv
├── ncloud-aws.py
├── ncloud-nos.py
└── requirements.txt
hyunsoolee0506/spark:3.5.0
이미지를 사용해서 Spark Operator를 실행할 것이다. 그렇게 되면 작업이 진행되면서 해당 이미지를 사용한 driver와 executor pod가 만들어질 것이고 이미지 내에 spark 작업과 cloud 저장소와 통신할 수 있는 구성 파일들이 모두 포함되어 있기 때문에 문제없이 작업이 완료 될 것이다.pyspark-job-ncp.yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: pyspark-job-ncp
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: hyunsoolee0506/spark:3.5.0
imagePullPolicy: Always
mainApplicationFile: local:///workspace/pyspark/ncloud-nos.py
sparkVersion: "3.5.0"
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.5.0
serviceAccount: spark-spark-operator
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.5.0
SparkApplication YAML 파일을 apply 해주면 아래와 같이 pod가 Running -> Completed 되면서 작업이 잘 수행되는 것을 볼 수 있다.
또한 Ncloud OS 버킷을 확인해보면 ncloud-nos.py
파일에서 명시한 경로 아래에 데이터가 잘 저장된 것을 확인할 수 있다. (당연히 ncloud-aws.py 스크립트를 명시하여 실행하면 AWS S3 버킷에 데이터가 저장되게된다.)
우선 pyspark 스크립트 코드를 변경한다. os.environ.get("NCP_ACCESS_KEY_ID")
코드를 사용하여 환경 변수에서 값을 가져오도록만 고쳐준다.
from pyspark.sql import SparkSession
import os
spark = SparkSession.builder \
.master("local") \
.appName("hadoop-ncloud") \
.getOrCreate()
sc = spark.sparkContext
access_key = os.environ.get("NCP_ACCESS_KEY_ID")
secret_key = os.environ.get("NCP_SECRET_ACCESS_KEY")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://kr.objectstorage.ncloud.com")
df = spark.read.csv(os.path.join(os.getcwd(), 'names.csv'), header=True)
df.write.mode('overwrite').parquet(f's3a://[NOS-BUCKET]/custom/dir/path')
API KEY 정보를 담은 secret 생성
kubectl create secret generic ncp-secret-key \
--from-literal=NCP_ACCESS_KEY_ID=[KEY_ID] \
--from-literal=NCP_SECRET_ACCESS_KEY=[KEY_SECRET]
넘겨줄 env 변수가 secret을 참조하도록 SparkApplication YAML 파일 수정
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: custom-job-ncp2
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: hyunsoolee0506/spark:3.1.2-v2
imagePullPolicy: Always
mainApplicationFile: local:///workspace/pyspark/nos-read-write.py
sparkVersion: "3.5.0"
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.5.0
serviceAccount: spark-spark-operator
envFrom:
- secretRef:
name: ncp-secret-key
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.5.0
envFrom:
- secretRef:
name: ncp-secret-key
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: ScheduledSparkApplication
metadata:
name: pyspark-job-schedule
spec:
schedule: "@every 1m"
concurrencyPolicy: Allow
template:
type: Python
pythonVersion: "3"
mode: cluster
image: hyunsoolee0506/spark:3.5.0
imagePullPolicy: Always
mainApplicationFile: local:///workspace/pyspark/nos-read-write.py
sparkVersion: "3.5.0"
timeToLiveSeconds: 120
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.5.0
serviceAccount: spark-spark-operator
envFrom:
- secretRef:
name: ncp-secret-key
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.5.0
envFrom:
- secretRef:
name: ncp-secret-key
apply를 해준 후 ScheduledSparkApplication 리소스를 확인해보면 아래와 같이 job에 대해 스케쥴링이 되어있는 것을 확인할 수 있으며 지정된 시간이 지나면 자동으로 작업이 진행된다.
spark-on-k8s-operator
에 대해 다뤄진 글들이 그렇게 많지는 않아서 매 번 에러를 만날때마다 그나마 친절하게 작성된 Docs를 참고하거나 굉장히 오랜 고민의 시간들을 가졌다. 특히 hadoop과 aws를 연동하는 hadoop-aws, aws-java-sdk, JetS3t 등의 패키지들을 spark에 설치할 때 호환되는 버전이 맞지 않아서 에러가 나는 경우가 상당히 많았다. 참고했던 블로그에서 사용한 버전은 spark 3.1.2였는데 최신 버전인 3.5.0으로 올려서 사용하고 싶은 욕심이 계속 들어 그 패키지들 버전 관련 에러 해결에만 상당한 시일이 걸렸다.....spark-on-k8s-operator
에 대한 내용들을 조금 더 발전시켜 쿠버네티스상에서 Spark Data Pipeline을 구축하여 운영해볼 수 있도록 할 것이다.
dockerfile 커스텀 하실 때 requirements.txt에 무슨 내용이 들어갔는지도 알 수 있을까요?