[K8S-SPARK 3부] Pyspark Container 환경에서 Cloud Storage에 결과물 저장하기

NewNewDaddy·2023년 12월 24일
0

SPARK

목록 보기
8/16
post-thumbnail
post-custom-banner

Spark-on-K8S 시리즈 목차

0. INTRO

  • 앞선 글(1부, 2부)에서는 spark-on-k8s-operator를 helm을 이용해 설치하여 쿠버네티스 환경에서 커스텀한 pyspark 스크립트를 실행하는 과정들에 대해서 다뤄보았다. 사용한 예제들은 스크립트 내에서 특정 값을 계산한다던가, 특정 경로에 있는 파일을 읽어와 dataframe으로 만든 후 해당 dataframe을 출력하는 등 최종적인 테이블 형태를 어딘가에 저장하는 과정은 딱히 없었다.
  • 하지만 데이터 파이프라인이 의미를 가지려면 Extract되고 Transform된 데이터가 로컬 저장소가 되었든, S3나 Ncloud OS와 같은 Cloud Storage가 되었든 어딘가에 저장이 되어야 했다. 그래야만 그 이후의 코드가 정제되어 저장된 데이터 경로를 읽어와 다시 추가적인 가공을 하거나 할 것이기 때문이다.
  • 이번 글에서는 쿠버네티스상에 구성된 Spark 환경에서 데이터를 정제하고 특정 Cloud Storage에 정제된 데이터를 저장하는 과정을 다루어 볼 것이다.
  • 앞서 사용하였던 SparkApplication YAML 파일로 스크립트를 실행하진 않고 우선은 Spark 사용이 가능한 Jupyter Lab 환경에서 어떤식을 코드를 구성하고 configure를 주어야 Container 환경에서 돌아가는 코드가 클라우드 저장소에 데이터를 저장할 수 있는지 알아보도록 할 것이다.

1. hadoop-aws에 대하여

  • Spark 환경에서 AWS와 통신을 할 수 있도록 하려면 hadoop-aws라는 Maven artifact가 필요하다. 이는
    "Apache Hadoop을 AWS와 통합할 수 있게 해주는 라이브러리"
    이렇게 정의할 수 있으며 이 라이브러리를 통해 Hadoop Cluster에서 AWS S3와 같은 AWS 서비스와 상호작용을 할 수 있는 것이다.

  • 해당 라이브러리에서 지원하는 주요 기능들은 크게 아래와 같다.

    1. S3 파일 시스템 지원: hadoop-aws는 Hadoop 클러스터에서 AWS S3를 파일 시스템으로 사용할 수 있도록 지원합니다. 따라서 Hadoop 작업은 S3에 저장된 데이터를 읽거나 쓸 수 있습니다.

    2. AWS 인증 및 자격 증명 관리: AWS 서비스에 접근할 때 사용되는 AWS 계정 자격 증명 및 인증에 관련된 기능을 제공합니다.

    3. AWS 리소스 관리: Hadoop 작업에서 AWS 리소스를 효과적으로 관리할 수 있도록 도와줍니다.

  • 관련 자료

1. Dockerfile 생성

  • 아래 글에 도움되는 기능들에 대한 소개 및 실습에 대한 내용들이 잘 정리되어 있어 해당 글을 참고하여 실습을 진행하였다.

  • 해당 글에 나온 Dockerfile 코드를 동일하게 사용하진 않고 SPARK_HOME, requirements.txt 파일 등 약간의 수정을 하여 사용하였다.

  • Dockerfile에서 설치되는 라이브러리들은 크게 아래와 같다.

    • apache-spark 3.1.2 - apache spark 공식 zip 파일
    • hadoop-aws 3.2.0 - Apache Hadoop을 AWS와 통합할 수 있게 해주는 라이브러리
    • aws-java-sdk 1.11.375 - AWS와 상호작용하기 위한 Java 공식 소프트웨어 개발 키트
    • JetS3t - Java로 작성된 Amazon S3와 상호 작용하기 위한 라이브러리 및 도구 키트
  • 또한 Base Image로 `jupyter/scipy-notebook'을 사용하고 있기 때문에 jupyter lab 환경을 위한 8888 포트가 Expose되어 있어 해당 포트로 접속하여 Notebook 환경으로 들어갈 수 있다.

Dockerfile

# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
ARG OWNER=jupyter
ARG BASE_CONTAINER=$OWNER/scipy-notebook
FROM $BASE_CONTAINER

LABEL maintainer="Jupyter Project <jupyter@googlegroups.com>"

# Fix DL4006
SHELL ["/bin/bash", "-o", "pipefail", "-c"]

USER root

# Spark dependencies
# Default values can be overridden at build time
# (ARGS are in lower case to distinguish them from ENV)
ARG spark_version="3.1.2"
ARG hadoop_version="3.2"
ARG spark_checksum="2385CB772F21B014CE2ABD6B8F5E815721580D6E8BC42A26D70BBCDDA8D303D886A6F12B36D40F6971B5547B70FAE62B5A96146F0421CB93D4E51491308EF5D5"
ARG openjdk_version="11"
ARG aws_hadoop_version="3.2.0"
ARG aws_sdk_version="1.11.375"


ENV APACHE_SPARK_VERSION="${spark_version}" \
    HADOOP_VERSION="${hadoop_version}" \
    AWS_SDK_VERSION="${aws_sdk_version}" \
    AWS_HADOOP_VERSION="${aws_hadoop_version}"
ENV JUPYTER_ENABLE_LAB=yes
RUN apt-get update --yes && \
    apt-get install --yes --no-install-recommends \
    "openjdk-${openjdk_version}-jre-headless" \
    ca-certificates-java  \
    curl \
    awscli && \
    apt-get clean && rm -rf /var/lib/apt/lists/*


# Install requirements
COPY requirements.txt ./
RUN pip3 install --upgrade pip  && pip3 install --no-cache-dir -r requirements.txt
RUN apt-get clean && rm requirements.txt


# Spark installation
WORKDIR /tmp
RUN wget -q "https://archive.apache.org/dist/spark/spark-${APACHE_SPARK_VERSION}/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" && \
    echo "${spark_checksum} *spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" | sha512sum -c - && \
    tar xzf "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" -C /usr/local --owner root --group root --no-same-owner && \
    rm "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz"

WORKDIR /usr/local

# Configure Spark
ENV SPARK_HOME=/opt/spark
ENV SPARK_OPTS="--driver-java-options=-Xms1024M --driver-java-options=-Xmx4096M --driver-java-options=-Dlog4j.logLevel=info" \
    PATH="${PATH}:${SPARK_HOME}/bin"

RUN ln -s "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}" spark && \
    # Add a link in the before_notebook hook in order to source automatically PYTHONPATH
    mkdir -p /usr/local/bin/before-notebook.d && \
    ln -s "${SPARK_HOME}/sbin/spark-config.sh" /usr/local/bin/before-notebook.d/spark-config.sh

# Fix Spark installation for Java 11 and Apache Arrow library
# see: https://github.com/apache/spark/pull/27356, https://spark.apache.org/docs/latest/#downloading
RUN cp -p "${SPARK_HOME}/conf/spark-defaults.conf.template" "${SPARK_HOME}/conf/spark-defaults.conf" && \
    echo 'spark.driver.extraJavaOptions -Dio.netty.tryReflectionSetAccessible=true' >> "${SPARK_HOME}/conf/spark-defaults.conf" && \
    echo 'spark.executor.extraJavaOptions -Dio.netty.tryReflectionSetAccessible=true' >> "${SPARK_HOME}/conf/spark-defaults.conf"

# Add hadoop-aws and aws-sdk
RUN wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${AWS_HADOOP_VERSION}/hadoop-aws-${AWS_HADOOP_VERSION}.jar -P "${SPARK_HOME}/jars/" && \
    wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -P "${SPARK_HOME}/jars/"
# Adding JetS3t libary
RUN wget https://repo1.maven.org/maven2/net/java/dev/jets3t/jets3t/0.9.4/jets3t-0.9.4.jar -P "${SPARK_HOME}/jars/"


# Install pyarrow
RUN mamba install --quiet --yes \
    'pyarrow' && \
    mamba clean --all -f -y && \
    fix-permissions "${CONDA_DIR}" && \
    fix-permissions "/home/${NB_USER}"
USER ${NB_UID}


WORKDIR "${HOME}"
# Dockerfile 빌드 진행

  > docker build -t hyunsoolee0506/pyspark:aws0.1 .

2. Jupyter Notebook 환경 구성 및 접속

  • 위에서 생성한 이미지를 바탕으로 Deployment와 loadBalancer 타입의 Service를 생성하여 Jupyter 환경으로 접속을 할 것이다.

jupyter-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: jupyter-deployment
spec:
  selector:
    matchLabels:
      app: jupyter
  replicas: 1
  template:
    metadata:
      labels:
        app: jupyter
    spec:
      containers:
        - name: jupyter-container
          image: hyunsoolee0506/pyspark:aws0.1
          ports:
            - containerPort: 8888

jupyter-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: jupyter-service
  labels:
    app: jupyter
spec:
  type: LoadBalancer
  selector:
    app: jupyter
  ports:
  - protocol: TCP
    port: 8888
    targetPort: 8888
  • 위의 YAML 파일들을 실행시키면 Cloud에 Load Balancer가 하나 자동으로 생성된다. 현재 클러스터는 Ncloud의 NKS Cluster에서 작업을 하고 있기 때문에 Naver Cloud Platform에 접속하여 Load Balancer URL을 확인하고 Load Balancer URL:8888로 접속한다.

  • Token의 경우 jupyter-deployment pod의 로그를 출력해보면 확인할 수 있다.


> kubectl logs [pod 이름] | grep token

  [I 2023-12-23 15:56:43.312 ServerApp] http://jupyter-deployment-74d6fd7f87-45mp9:8888/lab?token=00a49d8b27b2f0a95e4343ac46e022c6ef14755dccbfc23b
  [I 2023-12-23 15:56:43.312 ServerApp] http://127.0.0.1:8888/lab?token=00a49d8b27b2f0a95e4343ac46e022c6ef14755dccbfc23b
          http://jupyter-deployment-74d6fd7f87-45mp9:8888/lab?token=00a49d8b27b2f0a95e4343ac46e022c6ef14755dccbfc23b
          http://127.0.0.1:8888/lab?token=00a49d8b27b2f0a95e4343ac46e022c6ef14755dccbfc23b

3. Pyspark 코드 실행하여 S3에 테이블 저장

  • Spark 코드에서 Cloud Storage와 통신하려면 코드상으로 hadoopConfiguration에 대한 설정이 필요하다. 현재 사용중인 Ncloud의 Object Storage는 감사하게도 AWS S3와 동일한 CLI와 Python SDK를 사용하기 때문에 S3에 대한 설정값과 거의 동일하게 설정을 해주면 잘 작동하는 것을 확인할 수 있다. 아래 예제 코드에서는 AWS S3 저장 관련된 configure 설정Ncloud의 Object Storage 저장 관련된 configure 설정 두 가지를 다룰 것이다.
  1. AWS S3 저장 관련된 configure 설정
  • 최신 Hadoop 버전에서는 s3a 프로토콜을 통한 S3 통합이 가장 잘 지원되므로 S3 주소를 쓸 때 s3a:// 형식으로 쓰도록 한다.
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types as T

spark = SparkSession.builder \
        .master("local") \
        .appName("aws") \
        .getOrCreate()


access_key = "[AWS KEY ID]"
secret_key = "[AWS SECRET KEY]"

# hadoop configure 설정
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

schema = T.StructType(
            [
                T.StructField("col_a", T.IntegerType(), True),
                T.StructField("col_b", T.IntegerType(), True),
                T.StructField("col_c", T.IntegerType(), True)
            ]
        )

rows = [(1,2,3), (4,5,6), (7,8,9)]

df = spark.createDataFrame(rows, schema)

df_new.write.mode('overwrite').parquet('s3a://[S3 Bucket]/your/dir/path')
  1. Ncloud Object Storage 저장 관련된 configure 설정
  • Ncloud의 hadoop 관련 configure 설정은 공식 Docs에서 확인할 수 있다. -> Ncloud Docs - Hive 사용

    
    fs.s3a.access.key=<API-ACCESS-KEY>
    fs.s3a.connection.ssl.enabled=false
    fs.s3a.endpoint=http://kr.objectstorage.ncloud.com
    fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
    fs.s3a.secret.key=<API-SECRET-KEY>
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types as T

spark = SparkSession.builder \
        .master("local") \
        .appName("aws") \
        .getOrCreate()


access_key = "[NCP KEY ID]"
secret_key = "[NCP SECRET KEY]"

# hadoop configure 설정
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

# Ncloud 관련 추가 configure 설정
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://kr.objectstorage.ncloud.com")

schema = T.StructType(
            [
                T.StructField("col_a", T.IntegerType(), True),
                T.StructField("col_b", T.IntegerType(), True),
                T.StructField("col_c", T.IntegerType(), True)
            ]
        )

rows = [(1,2,3), (4,5,6), (7,8,9)]

df = spark.createDataFrame(rows, schema)

df_new.write.mode('overwrite').parquet('s3a://[NOS Bucket]/your/dir/path')
  • 위의 코드들을 실행하면 각각 AWS S3와 Ncloud Object Storage에 사용자가 설정한 경로로 샘플 데이터가 저장된 것을 확인할 수 있다.

4. OUTRO

  • 이번 글에서는 hadoop 환경에서 AWS와 통신할 수 있는 라이브러리 + Jupyter Lab 작업 환경이 포함된 Dockerfile을 만들고 이미지 생성 후 해당 이미지를 사용한 Container 환경에서 Pyspark 코드를 통해 AWS와 NCP의 Cloud Storage에 데이터를 저장해보는 과정을 다루었다.
  • Container 환경에서 Cloud로 데이터를 저장하는 로직을 살펴보고 결과를 확인하기 위해 Jupyter 환경에서 작업을 하였지만 최종적으로는 1부2부에서 다뤘던 것 같이 쿠버네티스 환경에서 SparkApplication YAML 파일이 실행되어 Spark 작업이 진행되면서 저장되도록 프로세스가 엮여야 할 것이다.
  • 이번 글에서 사용한 Docker Image를 그대로 사용하여 작업을 실행하면 될 것이라 생각하였지만 진행하는 과정에서 수많은 에러들을 만나게 되었고 Docker Image에 대한 제법 많은 수정이 이루어졌다. 마주쳤던 모든 Trial and Error들을 다 다룰 수는 없겠지만 "Kubernetes Native하게 Pyspark Job을 실행하고 Cloud에 데이터를 저장" 하는 과정의 마지막 내용들을 다음 글에서 다뤄보고 SparkApplication 작업을 스케쥴링하는 방법도 추가적으로 다뤄볼 것이다.
profile
데이터 엔지니어의 작업공간 / #PYTHON #SPARK #AWS #NCLOUD
post-custom-banner

0개의 댓글