spark-on-k8s-operator
를 helm을 이용해 설치하여 쿠버네티스 환경에서 커스텀한 pyspark 스크립트를 실행하는 과정들에 대해서 다뤄보았다. 사용한 예제들은 스크립트 내에서 특정 값을 계산한다던가, 특정 경로에 있는 파일을 읽어와 dataframe으로 만든 후 해당 dataframe을 출력하는 등 최종적인 테이블 형태를 어딘가에 저장하는 과정은 딱히 없었다. Spark 사용이 가능한 Jupyter Lab 환경
에서 어떤식을 코드를 구성하고 configure를 주어야 Container 환경에서 돌아가는 코드가 클라우드 저장소에 데이터를 저장할 수 있는지 알아보도록 할 것이다.Spark 환경에서 AWS와 통신을 할 수 있도록 하려면 hadoop-aws
라는 Maven artifact가 필요하다. 이는
"Apache Hadoop을 AWS와 통합할 수 있게 해주는 라이브러리"
이렇게 정의할 수 있으며 이 라이브러리를 통해 Hadoop Cluster에서 AWS S3와 같은 AWS 서비스와 상호작용을 할 수 있는 것이다.
해당 라이브러리에서 지원하는 주요 기능들은 크게 아래와 같다.
S3 파일 시스템 지원: hadoop-aws는 Hadoop 클러스터에서 AWS S3를 파일 시스템으로 사용할 수 있도록 지원합니다. 따라서 Hadoop 작업은 S3에 저장된 데이터를 읽거나 쓸 수 있습니다.
AWS 인증 및 자격 증명 관리: AWS 서비스에 접근할 때 사용되는 AWS 계정 자격 증명 및 인증에 관련된 기능을 제공합니다.
AWS 리소스 관리: Hadoop 작업에서 AWS 리소스를 효과적으로 관리할 수 있도록 도와줍니다.
관련 자료
아래 글에 도움되는 기능들에 대한 소개 및 실습에 대한 내용들이 잘 정리되어 있어 해당 글을 참고하여 실습을 진행하였다.
해당 글에 나온 Dockerfile 코드를 동일하게 사용하진 않고 SPARK_HOME
, requirements.txt
파일 등 약간의 수정을 하여 사용하였다.
Dockerfile에서 설치되는 라이브러리들은 크게 아래와 같다.
apache-spark 3.1.2
- apache spark 공식 zip 파일hadoop-aws 3.2.0
- Apache Hadoop을 AWS와 통합할 수 있게 해주는 라이브러리aws-java-sdk 1.11.375
- AWS와 상호작용하기 위한 Java 공식 소프트웨어 개발 키트JetS3t
- Java로 작성된 Amazon S3와 상호 작용하기 위한 라이브러리 및 도구 키트또한 Base Image로 `jupyter/scipy-notebook'을 사용하고 있기 때문에 jupyter lab 환경을 위한 8888 포트가 Expose되어 있어 해당 포트로 접속하여 Notebook 환경으로 들어갈 수 있다.
Dockerfile
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
ARG OWNER=jupyter
ARG BASE_CONTAINER=$OWNER/scipy-notebook
FROM $BASE_CONTAINER
LABEL maintainer="Jupyter Project <jupyter@googlegroups.com>"
# Fix DL4006
SHELL ["/bin/bash", "-o", "pipefail", "-c"]
USER root
# Spark dependencies
# Default values can be overridden at build time
# (ARGS are in lower case to distinguish them from ENV)
ARG spark_version="3.1.2"
ARG hadoop_version="3.2"
ARG spark_checksum="2385CB772F21B014CE2ABD6B8F5E815721580D6E8BC42A26D70BBCDDA8D303D886A6F12B36D40F6971B5547B70FAE62B5A96146F0421CB93D4E51491308EF5D5"
ARG openjdk_version="11"
ARG aws_hadoop_version="3.2.0"
ARG aws_sdk_version="1.11.375"
ENV APACHE_SPARK_VERSION="${spark_version}" \
HADOOP_VERSION="${hadoop_version}" \
AWS_SDK_VERSION="${aws_sdk_version}" \
AWS_HADOOP_VERSION="${aws_hadoop_version}"
ENV JUPYTER_ENABLE_LAB=yes
RUN apt-get update --yes && \
apt-get install --yes --no-install-recommends \
"openjdk-${openjdk_version}-jre-headless" \
ca-certificates-java \
curl \
awscli && \
apt-get clean && rm -rf /var/lib/apt/lists/*
# Install requirements
COPY requirements.txt ./
RUN pip3 install --upgrade pip && pip3 install --no-cache-dir -r requirements.txt
RUN apt-get clean && rm requirements.txt
# Spark installation
WORKDIR /tmp
RUN wget -q "https://archive.apache.org/dist/spark/spark-${APACHE_SPARK_VERSION}/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" && \
echo "${spark_checksum} *spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" | sha512sum -c - && \
tar xzf "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" -C /usr/local --owner root --group root --no-same-owner && \
rm "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz"
WORKDIR /usr/local
# Configure Spark
ENV SPARK_HOME=/opt/spark
ENV SPARK_OPTS="--driver-java-options=-Xms1024M --driver-java-options=-Xmx4096M --driver-java-options=-Dlog4j.logLevel=info" \
PATH="${PATH}:${SPARK_HOME}/bin"
RUN ln -s "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}" spark && \
# Add a link in the before_notebook hook in order to source automatically PYTHONPATH
mkdir -p /usr/local/bin/before-notebook.d && \
ln -s "${SPARK_HOME}/sbin/spark-config.sh" /usr/local/bin/before-notebook.d/spark-config.sh
# Fix Spark installation for Java 11 and Apache Arrow library
# see: https://github.com/apache/spark/pull/27356, https://spark.apache.org/docs/latest/#downloading
RUN cp -p "${SPARK_HOME}/conf/spark-defaults.conf.template" "${SPARK_HOME}/conf/spark-defaults.conf" && \
echo 'spark.driver.extraJavaOptions -Dio.netty.tryReflectionSetAccessible=true' >> "${SPARK_HOME}/conf/spark-defaults.conf" && \
echo 'spark.executor.extraJavaOptions -Dio.netty.tryReflectionSetAccessible=true' >> "${SPARK_HOME}/conf/spark-defaults.conf"
# Add hadoop-aws and aws-sdk
RUN wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${AWS_HADOOP_VERSION}/hadoop-aws-${AWS_HADOOP_VERSION}.jar -P "${SPARK_HOME}/jars/" && \
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -P "${SPARK_HOME}/jars/"
# Adding JetS3t libary
RUN wget https://repo1.maven.org/maven2/net/java/dev/jets3t/jets3t/0.9.4/jets3t-0.9.4.jar -P "${SPARK_HOME}/jars/"
# Install pyarrow
RUN mamba install --quiet --yes \
'pyarrow' && \
mamba clean --all -f -y && \
fix-permissions "${CONDA_DIR}" && \
fix-permissions "/home/${NB_USER}"
USER ${NB_UID}
WORKDIR "${HOME}"
# Dockerfile 빌드 진행
> docker build -t hyunsoolee0506/pyspark:aws0.1 .
jupyter-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: jupyter-deployment
spec:
selector:
matchLabels:
app: jupyter
replicas: 1
template:
metadata:
labels:
app: jupyter
spec:
containers:
- name: jupyter-container
image: hyunsoolee0506/pyspark:aws0.1
ports:
- containerPort: 8888
jupyter-service.yaml
apiVersion: v1
kind: Service
metadata:
name: jupyter-service
labels:
app: jupyter
spec:
type: LoadBalancer
selector:
app: jupyter
ports:
- protocol: TCP
port: 8888
targetPort: 8888
위의 YAML 파일들을 실행시키면 Cloud에 Load Balancer가 하나 자동으로 생성된다. 현재 클러스터는 Ncloud의 NKS Cluster에서 작업을 하고 있기 때문에 Naver Cloud Platform에 접속하여 Load Balancer URL을 확인하고 Load Balancer URL:8888
로 접속한다.
Token의 경우 jupyter-deployment pod의 로그를 출력해보면 확인할 수 있다.
> kubectl logs [pod 이름] | grep token
[I 2023-12-23 15:56:43.312 ServerApp] http://jupyter-deployment-74d6fd7f87-45mp9:8888/lab?token=00a49d8b27b2f0a95e4343ac46e022c6ef14755dccbfc23b
[I 2023-12-23 15:56:43.312 ServerApp] http://127.0.0.1:8888/lab?token=00a49d8b27b2f0a95e4343ac46e022c6ef14755dccbfc23b
http://jupyter-deployment-74d6fd7f87-45mp9:8888/lab?token=00a49d8b27b2f0a95e4343ac46e022c6ef14755dccbfc23b
http://127.0.0.1:8888/lab?token=00a49d8b27b2f0a95e4343ac46e022c6ef14755dccbfc23b
hadoopConfiguration
에 대한 설정이 필요하다. 현재 사용중인 Ncloud의 Object Storage는 감사하게도 AWS S3와 동일한 CLI와 Python SDK를 사용하기 때문에 S3에 대한 설정값과 거의 동일하게 설정을 해주면 잘 작동하는 것을 확인할 수 있다. 아래 예제 코드에서는 AWS S3 저장 관련된 configure 설정
과 Ncloud의 Object Storage 저장 관련된 configure 설정
두 가지를 다룰 것이다.s3a
프로토콜을 통한 S3 통합이 가장 잘 지원되므로 S3 주소를 쓸 때 s3a://
형식으로 쓰도록 한다.import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types as T
spark = SparkSession.builder \
.master("local") \
.appName("aws") \
.getOrCreate()
access_key = "[AWS KEY ID]"
secret_key = "[AWS SECRET KEY]"
# hadoop configure 설정
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
schema = T.StructType(
[
T.StructField("col_a", T.IntegerType(), True),
T.StructField("col_b", T.IntegerType(), True),
T.StructField("col_c", T.IntegerType(), True)
]
)
rows = [(1,2,3), (4,5,6), (7,8,9)]
df = spark.createDataFrame(rows, schema)
df_new.write.mode('overwrite').parquet('s3a://[S3 Bucket]/your/dir/path')
Ncloud의 hadoop 관련 configure 설정은 공식 Docs에서 확인할 수 있다. -> Ncloud Docs - Hive 사용
fs.s3a.access.key=<API-ACCESS-KEY>
fs.s3a.connection.ssl.enabled=false
fs.s3a.endpoint=http://kr.objectstorage.ncloud.com
fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
fs.s3a.secret.key=<API-SECRET-KEY>
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types as T
spark = SparkSession.builder \
.master("local") \
.appName("aws") \
.getOrCreate()
access_key = "[NCP KEY ID]"
secret_key = "[NCP SECRET KEY]"
# hadoop configure 설정
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
# Ncloud 관련 추가 configure 설정
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://kr.objectstorage.ncloud.com")
schema = T.StructType(
[
T.StructField("col_a", T.IntegerType(), True),
T.StructField("col_b", T.IntegerType(), True),
T.StructField("col_c", T.IntegerType(), True)
]
)
rows = [(1,2,3), (4,5,6), (7,8,9)]
df = spark.createDataFrame(rows, schema)
df_new.write.mode('overwrite').parquet('s3a://[NOS Bucket]/your/dir/path')
SparkApplication
YAML 파일이 실행되어 Spark 작업이 진행되면서 저장되도록 프로세스가 엮여야 할 것이다.