
PythonOperator을 통해 airflow환경에서 외부 데이터를 수집하고 로컬에 저장하는 과정까지 완성이 됐다.
ETL 과정중 T에 해당하는 Transform에 해당하는 과정을 수행해야 하는데 apache제단의 pyspark 를 이용하여 데이터를 정제해보려고 한다.
pyspark로 작성된 sparkjob을 에어플로우로 실행하는 방법은 크게 두가지가 있다. BashOperator과 SparkSubmitOperator가 있는데 전자의 방식을 선택하고자 하며 이유는 다음과 같다.
BashOperator : Bash_command를 통해 리눅스 환경에서의 명령어를 실행시킬 수 있다.
나의 경우 spark-submit할 내용과 master 노드에 제출할 config들을 한번에 정리해놓는 것이 관리측면에서 편하다고 생각하여 .sh스크립트로 작성해보려 한다.
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum
from datetime import datetime
file_name = "/opt/airflow/jobs/sparktest.py"
with DAG(
dag_id = "dags_bash_operator_test",
start_date=pendulum.datetime(2024, 12, 12, tz = "Asia/Seoul"),
schedule="30 10 * * *",
catchup=False
) as dag:
task1 = BashOperator(
task_id = "task1",
bash_command=f'/opt/airflow/plugins/sparktest.sh {file_name}'
)
task1
BashOperator로 실행한 shell 스크립트 :
#!/bin/bash
script=$1
echo "${script}"
echo "start spark submit with bash operator"
echo "${PATH}"
#echo "${SPARK_HOME}"
spark-submit \
--master spark://spark-master:7077 ${script}
당연스럽게도 spark를 사용하기 위해 spark 컨테이너를 빌드해야한다. airflow와 한번에 컨테이너를 올리고 싶어서 compose파일에 이것저것 추가해주었다.
기존에 서비스중인 airflow의 포트와 겹치지 않게 port를 추가해주었고 worker 노드도 master노드의 포트를 명시적으로 작성해주었다.
(이부분에서 7077인데 7070으로 작성해놓고 로그도 보지않고 컨테이너가 올라가지 않아서 한참동안 삽질했다..)
로컬에 있는 데이터를 가져다 써야하기 때문에 spark도 로컬의 data디렉토리를 읽을수 있게 해주었다.
jobs 디렉토리는 python으로 작성된 sparkjob이 위치할 폴더이다. spark-submit을 할때 해당 디렉토리의 python파일을 읽어갈 예정이다.
또한 network가 추가됐는데, 이는 container간에 연결망이다.
airflow는 master노드에게 spark-submit할 내용을 제출해야 하는데 이때 network가 필요하다.
x-spark-common:
&spark-common
image: bitnami/spark:latest
volumes:
- ./jobs:/opt/bitnami/spark/jobs
- ./data:/opt/bitnami/spark/data
networks:
- default-net
services:
spark-master:
<<: *spark-common
command: bin/spark-class org.apache.spark.deploy.master.Master
environment:
- SPARK_MODE=master
- SPARK_MASTER_PORT=7077
- SPARK_MASTER_WEBUI_PORT=8080
ports:
- "7077:7077"
- "9080:8080"
# - "6066:6066"
spark-worker:
<<: *spark-common
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_WEBUI_PORT=8081
depends_on:
- spark-master
spark개발환경을 제공한다.spark를 사용할 것이기 때문에 그때그때 로컬에서 작성한 sparkjob을 pull해서 실행하고,, 하는 것은 매우 비효율적이다.pyspark문법을 익히며 개발에 아주 큰 도움을 줄수 있을것 같은 컨테이너도 추가한다. jupyterlab:
image: jupyter/pyspark-notebook:latest
ports:
- "8888:8888"
volumes:
- ./notebooks:/home/jovyan/work
- ./data:/home/jovyan/data
environment:
- JUPYTER_ENABLE_LAB=yes
- GRANT_SUDO=yes
- JUPYTER_TOKEN=password
depends_on:
- spark-master
network설정도 해준다.networks:
default-net:
spark는 common으로 한번에 설정이 됐고, airflow도 common에 단순히 추가하면 된다고 생각을 했는데,,
airflow의 구성요소인 schedule과 webserver만 해당airflow-common 부분에 depend 되어 있어서
아래는 데이터베이스에도 제대로 networks를 설정해준 모습이다.
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
networks:
- default-net
redis:
# Redis is limited to 7.2-bookworm due to licencing change
# https://redis.io/blog/redis-adopts-dual-source-available-licensing/
image: redis:7.2-bookworm
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 30s
retries: 50
start_period: 30s
restart: always
networks:
- default-net
airflow에서 spark-submit을 하기 위해서는 여러 패키지들을 필요로 한다. docker-compose파일만으로는 한계가 있으므로 dockerfile을 작성해서 빌드해야 하는데, 이부분에 대한 과정은 아래에 정리해두었다.dockerfile을 작성하고 그 과정에서 생겼던 문제들을 정리해보려고 한다.
dags작성하고spark-submit하려 하는데 32번청도 fail 뜨고 성공했다.
해당 문제들을 해결하는 과정에서
dockerfile과 docker-compose파일의 차이점dockerfile의 필요성dockerfile vs docker-compose.yaml다음과 같이 정리가 되었다.
dockerfile : 이미지를 만들기 위한 틀이며, 단순히 공식 image를 끌어다가 build 하는 방식보다 여러 package를 내가 설정해서 image를 빌드할 수 있다는 장점이 있다.docker-compose : 여러 컨테이너를 단순히 한번에 올리는 방법이며image를 끌어다쓰는 방식과dockerfile을 이용하여 빌드하는 방식docker-compose.yaml파일x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.3}
environment:
...
해당 방식은 단순히 image를 이용하여 airflow를 사용할때 쓸 수 있지만 spark와 같은 second party앱을 사용하고 싶을때는 유용하지 않다.
이를 간과한채 compose 파일에서 network가 설정됐다는 이유로 airflow에서 spark-submit을 사용할 수 있을 것이라 생각했다.
결과 :
spark-submit명령어를 찾을수 없다는log가 출력된다.
dockerfile을 이용하여 컨테이너를 빌드하는 방법airflow컨테이너를 빌드하겠다는게 아니고, dockerfile로 빌드하는 방법을 택하겠다는 docker-compose파일이다. x-airflow-common:
&airflow-common
build:
context: .
dockerfile: Dockerfile
environment:
...
dockerfile은 해당 템플릿을 작성하기위한 문법을 필요로한다.FROM apache/airflow:2.10.3
USER root
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
openjdk-17-jre-headless \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
USER airflow
ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
RUN pip install --no-cache-dir "apache-airflow==${AIRFLOW_VERSION}" apache-airflow-providers-apache-spark==2.1.3
spark를 사용하려면 java 패키지를 필요로한다!**
그래서 이미지를 dockerfile로 만들어낼때 함께 해당 패키지를 다운받고, airflow컨테이너 내부에 해당 파일을 위치시키며, 설치하는 것이 효율적이다.
https://airflow.apache.org/docs/docker-stack/build.html
dockerfile로 빌드중 발생한 trouble
airflow버전과 맞지않은 패키지를 다운받을때 (호환하지않는 패키지를 다운받으려할때 발생하는) 에러를 발생시킨다.
이는 공식문서 확인을 필요로한다.JAVA패키지를 제대로 설치하지 않았을때 오류
- 이외에도
airflow의 몇버전 이상을 쓸때는spark해당패키지의 몇 버전이상을 사용해달라는 로그가 나오기도 한다.
dockerfile중 이부분을 수정하여 맞춰주었다.RUN ... apache-airflow-providers-apache-spark==2.1.3
dockerfile을 작성하면서 docker의 이점에 대해서 확인해볼 수 있었다.
컨테이너의 제거와 생성
패키지 혹은 특정 프로그램의 버전이 맞지 않는다면
단순히 다시 dockerfile을 작성해서 버전을 맞춰주어 컨테이너를 쉽게 버리고 다시 빌드하고,
특정환경에 맞게 바꿀수 있다는 점을 확인할 수 있었다
log확인의 중요성
너무 당연한 얘기지만 log에서 어떤 부분을 고치면 좋겠다는 메세지를 준다. 그렇지 않은 부분도 있겠지만 트러블슈팅의 가장 기본적인 부분이라 생각이 들었다.
sparkjob을 수행할(데이터 모델, 테이블을 생성할) pythonfile 작성 (해당 장에서 사용한 파이썬 파일은 단순히 schema를 출력하는 파일이었다)pythonfile을 수행할 shellscript정리(spark optimization?)