[K8S-SPARK 4부] Spark on K8S 환경에서 Spark Operator 실행하여 Cloud Storage에 결과물 저장하기

NewNewDaddy·2023년 12월 27일
0

SPARK

목록 보기
9/16
post-thumbnail
post-custom-banner

Spark-on-K8S 시리즈 목차

0. INTRO

  • 1부, 2부 - spark-on-k8s-operator를 사용하기 위한 환경구성 및 Spark Operator를 통한 Pyspark Job 실행
  • 3부 - Container 환경에서 Pyspark Job을 통해 Cloud Storage로 결과물 저장하기
  • 마지막 4부에서는 앞서 세 편의 글에서 다뤘던 내용들을 엮어 Pyspark Job을 통해 Cloud Storage로 결과물 저장하는 과정을 Spark Operator를 통해 실행해보는 과정을 다뤄볼 것이다.

1. Dockerfile 커스텀

  • 앞선 글에서는 Spark를 Jupyter Lab의 Interactive한 환경에서 실행해보고 결과물이 저장되는 것을 확인하기 위해 Base Image를 jupyter/scipy-notebook을 사용하였다. 그러다보니 이미지 크기만 4GB가 넘어가 상당히 무거운 이미지가 되었다.
  • 실제 Spark Operation을 사용하면서는 Jupyter Lab 기능을 사용할 필요가 없기 때문에 Base Image를 apache/spark로 바꾸어 조금 더 가벼운 이미지가 생성될 수 있도록 커스텀 작업을 진행하였고 spark 버전 역시 가장 최신 버전인 3.5.0 버전에 맞도록 다른 모듈들 버전들도 전체적으로 바꾸어주었다.
    • apache/spark 베이스 이미지로 사용
    • hadoop 3.5.0 버전 설치 후 configure 설정
    • hadoop-aws, aws-java-sdk, JetS3t의 jar 패키지들을 각각 spark의 jar 경로 아래에 등록
    • workspace 경로 이하에 실행될 pyspark 코드들 저장
    • requirements.txt를 통해 container 내부에서 코드 실행에 필요한 라이브러리들 설치

Dockerfile

FROM apache/spark

USER root

ARG spark_version="3.5.0"
ARG hadoop_version="3"
# https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz.sha512 에서 확인
ARG spark_checksum="8883c67e0a138069e597f3e7d4edbbd5c3a565d50b28644aad02856a1ec1da7cb92b8f80454ca427118f69459ea326eaa073cf7b1a860c3b796f4b07c2101319"
ARG openjdk_version="11"
ARG aws_hadoop_version="3.3.4"
ARG aws_sdk_version="1.12.262"

# 환경 변수 설정
ENV APACHE_SPARK_VERSION="${spark_version}" \
    HADOOP_VERSION="${hadoop_version}" \
    AWS_SDK_VERSION="${aws_sdk_version}" \
    AWS_HADOOP_VERSION="${aws_hadoop_version}"
ENV JUPYTER_ENABLE_LAB=yes

# 기본 라이브러리들 설치
RUN apt-get update --yes; \
    apt-get install --yes --no-install-recommends \
    "openjdk-${openjdk_version}-jre-headless" \
    ca-certificates-java curl awscli; \
    set -ex; \
    apt-get install -y python3 python3-pip; \
    apt-get clean; \
    rm -rf /var/lib/apt/lists/*

# Spark Install
WORKDIR /opt

RUN wget -q "https://archive.apache.org/dist/spark/spark-${APACHE_SPARK_VERSION}/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" && \
    echo "${spark_checksum} *spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" | sha512sum -c - && \
    tar -zxvf "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" -C /opt --owner root --group root --no-same-owner && \
    rm "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz"

# Configure Spark
ENV SPARK_HOME="/opt/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}"
ENV SPARK_TGZ_URL="https://archive.apache.org/dist/spark/spark-${APACHE_SPARK_VERSION}/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz"
ENV SPARK_TGZ_ASC_URL="https://archive.apache.org/dist/spark/spark-${APACHE_SPARK_VERSION}/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz.asc"
ENV PATH="${PATH}:${SPARK_HOME}/bin"

# Fix Spark installation for Java 11 and Apache Arrow library
RUN cp -p "${SPARK_HOME}/conf/spark-defaults.conf.template" "${SPARK_HOME}/conf/spark-defaults.conf" && \
    echo 'spark.driver.extraJavaOptions -Dio.netty.tryReflectionSetAccessible=true' >> "${SPARK_HOME}/conf/spark-defaults.conf" && \
    echo 'spark.executor.extraJavaOptions -Dio.netty.tryReflectionSetAccessible=true' >> "${SPARK_HOME}/conf/spark-defaults.conf"

# Adding hadoop-aws and aws-sdk
RUN wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${AWS_HADOOP_VERSION}/hadoop-aws-${AWS_HADOOP_VERSION}.jar -P "${SPARK_HOME}/jars/" && \
    wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -P "${SPARK_HOME}/jars/"
# Adding JetS3t libary
RUN wget https://repo1.maven.org/maven2/net/java/dev/jets3t/jets3t/0.9.4/jets3t-0.9.4.jar -P "${SPARK_HOME}/jars/"

# pyspark 스크립트 경로 설정
WORKDIR /workspace/pyspark

COPY . /workspace/pyspark

# 필요 라이브러리 설치 및 필요없는 디렉토리 제거
RUN pip3 install --no-cache-dir -r requirements.txt; \
    rm -rf /opt/spark

2. Pyspark 스크립트 작성 후 Docker build 진행

  • 위의 Dockerfile로 이미지를 생성하면 pyspark script는 /workspace/pyspark 이하에 저장되게 된다.

  • 스크립트와 동일 경로에 있는 names.csv 파일을 읽어 AWS S3와 Ncloud OS에 parquet 파일 형태로 저장하는 스크립트를 각각 작성해보자.

    • ncloud-aws.py
    
    from pyspark.sql import SparkSession
    import os
    
    spark = SparkSession.builder \
            .master("local") \
            .appName("hadoop-aws") \
            .getOrCreate()
    
    sc = spark.sparkContext
    access_key = "[AWS API ID KEY]"
    secret_key = "[AWS SECRET KEY]"
    
    # HADOOP 관련 설정
    sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
    sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
    sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    
    df = spark.read.csv(os.path.join(os.getcwd(), 'names.csv'), header=True)
    
    df.write.mode('overwrite').parquet(f's3a://[S3-BUCKET]/custom/dir/path')
    • ncloud-nos.py
    
    from pyspark.sql import SparkSession
    import os
    
    spark = SparkSession.builder \
            .master("local") \
            .appName("hadoop-ncloud") \
            .getOrCreate()
    
    sc = spark.sparkContext
    access_key = "[NCLOUD API KEY ID]"
    secret_key = "[NCLOUD SECRET KEY]"
    
    sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
    sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
    sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
    sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://kr.objectstorage.ncloud.com")
    
    df = spark.read.csv(os.path.join(os.getcwd(), 'names.csv'), header=True)
    
    df.write.mode('overwrite').parquet(f's3a://[NOS-BUCKET]/custom/dir/path')
  • Spark Operator에서 실행될 위의 두 스크립트가 포함될 수 있도록 Docker Image를 생성한다. 이미지 이름은 hyunsoolee0506/spark:3.5.0 으로 하였다.


# Dockerfile build

  > docker build -t hyunsoolee0506/spark:3.5.0 .
    
    [+] Building 45.6s (16/16) FINISHED
     => [internal] load build definition from Dockerfile                                                                                                                                                                     0.1s 
     => => transferring dockerfile: 2.82kB                                                                                                                                                                                   0.0s 
     => [internal] load .dockerignore                                                                                                                                                                                        0.1s 
     => => transferring context: 2B                                                                                                                                                                                          0.0s 
     => [internal] load metadata for docker.io/apache/spark:latest                                                                                                                                                           2.0s 
     => [auth] apache/spark:pull token for registry-1.docker.io                                                                                                                                                              0.0s 
     => [internal] load build context                                                                                                                                                                                        0.0s 
     => => transferring context: 2.99kB                                                                                                                                                                                      0.0s 
     => [ 1/10] FROM docker.io/apache/spark@sha256:0ed5154e6b32ac3af1272d4d65e9f65b13afcfe80b41ad10bd059bcd6317863c                                                                                                          0.0s 
     => CACHED [ 2/10] RUN apt-get update --yes;     apt-get install --yes --no-install-recommends     "openjdk-11-jre-headless"     ca-certificates-java curl awscli;     set -ex;     apt-get install -y python3 python3-  0.0s 
     => CACHED [ 3/10] WORKDIR /opt                                                                                                                                                                                          0.0s 
     => CACHED [ 4/10] RUN wget -q "https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz" &&     echo "2385CB772F21B014CE2ABD6B8F5E815721580D6E8BC42A26D70BBCDDA8D303D886A6F12B36D40F6971B5547B7  0.0s 
     => CACHED [ 5/10] RUN cp -p "/opt/spark-3.1.2-bin-hadoop3.2/conf/spark-defaults.conf.template" "/opt/spark-3.1.2-bin-hadoop3.2/conf/spark-defaults.conf" &&     echo 'spark.driver.extraJavaOptions -Dio.netty.tryRefl  0.0s 
     => CACHED [ 6/10] RUN wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar -P "/opt/spark-3.1.2-bin-hadoop3.2/jars/" &&     wget https://repo1.maven.org/maven2/com/amazonaws/a  0.0s 
     => CACHED [ 7/10] RUN wget https://repo1.maven.org/maven2/net/java/dev/jets3t/jets3t/0.9.4/jets3t-0.9.4.jar -P "/opt/spark-3.1.2-bin-hadoop3.2/jars/"                                                                   0.0s 
     => CACHED [ 8/10] WORKDIR /workspace/pyspark                                                                                                                                                                            0.0s 
     => [ 9/10] COPY . /workspace/pyspark                                                                                                                                                                                    0.0s 
     => [10/10] RUN pip3 install --no-cache-dir -r requirements.txt;     rm -rf /opt/spark                                                                                                                                  40.9s 
     => exporting to image                                                                                                                                                                                                   2.5s 
     => => exporting layers                                                                                                                                                                                                  2.4s 
     => => writing image sha256:ace5e474a844c501f17220ffaf8dd9aa73004ea36975fd2279e9f634332a969e                                                                                                                             0.0s 
     => => naming to docker.io/library/hyunsoolee0506/spark:3.5.0
  • Spark Operator가 실행되면서 해당 이미지를 사용할 수 있도록 Dockerhub로 push 해준다.
# Docker Push
  > docker push hyunsoolee0506/spark:3.5.0
  
    .
    .
    .
    .
    1646200a958a: Mounted from apache/spark
    6d3bc029a93b: Mounted from apache/spark
    59d98d956092: Mounted from apache/spark
    a295ff97538c: Mounted from apache/spark
    0b4923f16514: Mounted from apache/spark
    954c82bdeb5f: Mounted from apache/spark
    3.5.0: digest: sha256:7ac6bcda10daf05e474be01944b20bdddc6b58cd49c813aeee58c2e7868d4a02 size: 4516
  • 이렇게 되면 현재 hyunsoolee0506/spark:3.5.0 Docker Image의 /workspace/pyspark 디렉토리 이하 구조는 아래와 같을 것이다.

    pyspark/
    ├── Dockerfile
    ├── names.csv
    ├── ncloud-aws.py
    ├── ncloud-nos.py
    └── requirements.txt

3. SparkApplication 실행 후 확인

  • 위에서 만든 hyunsoolee0506/spark:3.5.0 이미지를 사용해서 Spark Operator를 실행할 것이다. 그렇게 되면 작업이 진행되면서 해당 이미지를 사용한 driver와 executor pod가 만들어질 것이고 이미지 내에 spark 작업과 cloud 저장소와 통신할 수 있는 구성 파일들이 모두 포함되어 있기 때문에 문제없이 작업이 완료 될 것이다.

pyspark-job-ncp.yaml

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: pyspark-job-ncp
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: hyunsoolee0506/spark:3.5.0
  imagePullPolicy: Always
  mainApplicationFile: local:///workspace/pyspark/ncloud-nos.py
  sparkVersion: "3.5.0"
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.5.0
    serviceAccount: spark-spark-operator
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.5.0
  • SparkApplication YAML 파일을 apply 해주면 아래와 같이 pod가 Running -> Completed 되면서 작업이 잘 수행되는 것을 볼 수 있다.

    • RUNNING
    • COMPLETED
  • 또한 Ncloud OS 버킷을 확인해보면 ncloud-nos.py 파일에서 명시한 경로 아래에 데이터가 잘 저장된 것을 확인할 수 있다. (당연히 ncloud-aws.py 스크립트를 명시하여 실행하면 AWS S3 버킷에 데이터가 저장되게된다.)

4. Spark Operator에 환경변수 넘겨주기

  • 2번 섹션에서 작성한 pyspark 스크립트에는 사실 큰 문제점이 한 가지 있다. 바로 API KEY ID와 SECRET KEY가 모두 하드코딩되어 있는 것이다. 그렇게 되면 해당 이미지로 컨테이너 생성 후 내부로 접근하여 파일을 열어보면 비밀번호가 바로 보이기 때문에 보안상 매우매우 좋지 않다.
  • 따라서 SparkApplication을 execute 하는 순간 해당 KEY의 정보들을 driver와 executor pod에 환경 변수로 넘겨주어 내부 코드 실행시 환경 변수를 참조하여 값을 가져올 수 있도록 설정해줄 것이다.
  1. 우선 pyspark 스크립트 코드를 변경한다. os.environ.get("NCP_ACCESS_KEY_ID") 코드를 사용하여 환경 변수에서 값을 가져오도록만 고쳐준다.

    
    from pyspark.sql import SparkSession
    import os
    
    spark = SparkSession.builder \
            .master("local") \
            .appName("hadoop-ncloud") \
            .getOrCreate()
    
    sc = spark.sparkContext
    access_key = os.environ.get("NCP_ACCESS_KEY_ID")
    secret_key = os.environ.get("NCP_SECRET_ACCESS_KEY")
    
    sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
    sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
    sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
    sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://kr.objectstorage.ncloud.com")
    
    df = spark.read.csv(os.path.join(os.getcwd(), 'names.csv'), header=True)
    
    df.write.mode('overwrite').parquet(f's3a://[NOS-BUCKET]/custom/dir/path')
  2. API KEY 정보를 담은 secret 생성

    kubectl create secret generic ncp-secret-key \
            --from-literal=NCP_ACCESS_KEY_ID=[KEY_ID] \
            --from-literal=NCP_SECRET_ACCESS_KEY=[KEY_SECRET]
  3. 넘겨줄 env 변수가 secret을 참조하도록 SparkApplication YAML 파일 수정

    
    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: custom-job-ncp2
    spec:
      type: Python
      pythonVersion: "3"
      mode: cluster
      image: hyunsoolee0506/spark:3.1.2-v2
      imagePullPolicy: Always
      mainApplicationFile: local:///workspace/pyspark/nos-read-write.py
      sparkVersion: "3.5.0"
      driver:
        cores: 1
        coreLimit: "1200m"
        memory: "512m"
        labels:
          version: 3.5.0
        serviceAccount: spark-spark-operator
        envFrom:
          - secretRef:
              name: ncp-secret-key
      executor:
        cores: 1
        instances: 1
        memory: "512m"
        labels:
          version: 3.5.0
        envFrom:
          - secretRef:
              name: ncp-secret-key
  • 이후에 YAML 파일을 apply 해보면 secret에 등록한 변수들이 환경변수로 잘 넘어가고 pyspark 코드에서도 API KEY 관련 정보들을 받아와서 코드가 잘 실행되는 것을 확인해볼 수 있다.

4. SparkApplication 스케쥴링 작업

  • spark-on-k8s-operator에서는 Spark Operator에 대한 스케쥴링 작업이 가능하도록 ScheduledSparkApplication 종류의 API 오브젝트도 지원을 해주고 있다. 인자들이 몇 가지 추가되고 포맷은 거의 비슷하다.
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: ScheduledSparkApplication
metadata:
  name: pyspark-job-schedule
spec:
  schedule: "@every 1m"
  concurrencyPolicy: Allow
  template:
    type: Python
    pythonVersion: "3"
    mode: cluster
    image: hyunsoolee0506/spark:3.5.0
    imagePullPolicy: Always
    mainApplicationFile: local:///workspace/pyspark/nos-read-write.py
    sparkVersion: "3.5.0"
    timeToLiveSeconds: 120
    driver:
      cores: 1
      coreLimit: "1200m"
      memory: "512m"
      labels:
        version: 3.5.0
      serviceAccount: spark-spark-operator
      envFrom:
        - secretRef:
            name: ncp-secret-key
    executor:
      cores: 1
      instances: 1
      memory: "512m"
      labels:
        version: 3.5.0
      envFrom:
        - secretRef:
            name: ncp-secret-key
  • apply를 해준 후 ScheduledSparkApplication 리소스를 확인해보면 아래와 같이 job에 대해 스케쥴링이 되어있는 것을 확인할 수 있으며 지정된 시간이 지나면 자동으로 작업이 진행된다.

5. OUTRO

  • 이번 글에서는 1부, 2부, 3부 에서 다뤘던 내용들을 종합하여 Kubernetes native하게 Pyspark 스크립트를 실행하고 해당 코드의 결과물을 Cloud Storage에 저장까지 해보았다.
  • spark-on-k8s-operator에 대해 다뤄진 글들이 그렇게 많지는 않아서 매 번 에러를 만날때마다 그나마 친절하게 작성된 Docs를 참고하거나 굉장히 오랜 고민의 시간들을 가졌다. 특히 hadoop과 aws를 연동하는 hadoop-aws, aws-java-sdk, JetS3t 등의 패키지들을 spark에 설치할 때 호환되는 버전이 맞지 않아서 에러가 나는 경우가 상당히 많았다. 참고했던 블로그에서 사용한 버전은 spark 3.1.2였는데 최신 버전인 3.5.0으로 올려서 사용하고 싶은 욕심이 계속 들어 그 패키지들 버전 관련 에러 해결에만 상당한 시일이 걸렸다.....
  • 그 외에도 진행하는 과정에서 수많은 에러들을 마주했지만 차근차근 해결해나가다 보니 내가 처음 생각했던 최종 이미지와 같이 spark job이 작동을 하게 되었고 그 과정에서 내 스스로가 약간은 성장한 것 같은 느낌이 들어 뿌듯하기도 하였다.
  • Spark를 K8S에서 사용하는 더 효율적인 방법들이 분명히 더 있겠지만 우선은 지금껏 탐구해본 spark-on-k8s-operator에 대한 내용들을 조금 더 발전시켜 쿠버네티스상에서 Spark Data Pipeline을 구축하여 운영해볼 수 있도록 할 것이다.

6. 참고자료

profile
데이터 엔지니어의 작업공간 / #PYTHON #SPARK #AWS #NCLOUD
post-custom-banner

3개의 댓글

comment-user-thumbnail
2024년 5월 28일

dockerfile 커스텀 하실 때 requirements.txt에 무슨 내용이 들어갔는지도 알 수 있을까요?

1개의 답글