[k8s] Airfow SparkKubernetesOperator 로 Spark 앱 submit 하기

Woong·2026년 1월 22일

Docker, k8s

목록 보기
31/38
  • spark config 정의
# spark 
SPARK_NAMESPACE = "spark"
SPARK_JOB_SERVICE_ACCOUNT = "spark-job-sa"
SPARK_JOB_NODE_SELECTOR = {"nodegroup": "<NODE_GROUP>"}
SPARK_JOB_TOLERATIONS = [
    {
        "key": "nodegroup",
        "operator": "Equal",
        "value": "<NODE_GROUP>",
        "effect": "NoSchedule",
    }
]
SPARK_USER = "aibatch_svc"
HADOOP_CONF = {
    "fs.defaultFS": "hdfs://<namenode_ip>:<port>",
    "dfs.client.socket-timeout": "20000",
    "dfs.client.retry.policy.enabled": "false",
    "dfs.client.use.datanode.hostname": "false",
    "dfs.client.read.shortcircuit": "false",
    "ipc.client.connect.timeout": "120",
    "ipc.client.connect.max.retries": "3",
    # S3A
    "fs.s3a.aws.credentials.provider": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider",
    "fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
}

def get_spark_application_spec(app_name: str, job_name: str, container_image: str, main_application_filename: str, arguments: List[str] = []):
    """
    container_image: ECR 컨테이너 이미지
    main_application_filename: 메인 애플리케이션 파일 경로 (ex. /app/main.py)
    """


    return {
        "apiVersion": "sparkoperator.k8s.io/v1beta2",
        "kind": "SparkApplication",
        "metadata": {
            "name": job_name,
            "namespace": SPARK_NAMESPACE,
            "labels": {
                "app": app_name,
                "managed-by": "airflow",
            },
        },
        "spec": {
            "type": "Python",
            "mode": "cluster",
            "sparkVersion": "3.5.1",
            "image": container_image,
            "imagePullPolicy": "IfNotPresent",
            "mainApplicationFile": f"local://{main_application_filename}",
            "arguments": arguments,
            "driver": {
                "cores": 1,
                "memory": "1g",
                "serviceAccount": SPARK_JOB_SERVICE_ACCOUNT,
                "nodeSelector": SPARK_JOB_NODE_SELECTOR,
                "tolerations": SPARK_JOB_TOLERATIONS,
            },
            "executor": {
                "instances": 2,
                "cores": 1,
                "memory": "1g",
                "serviceAccount": SPARK_JOB_SERVICE_ACCOUNT,
                "nodeSelector": SPARK_JOB_NODE_SELECTOR,
                "tolerations": SPARK_JOB_TOLERATIONS,
            },
            "restartPolicy": {"type": "Never"},
        
            "sparkConf": {
                "spark.driver.extraClassPath": "/opt/spark/jars/hadoop-aws-3.3.4.jar:/opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar",
                "spark.executor.extraClassPath": "/opt/spark/jars/hadoop-aws-3.3.4.jar:/opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar",

                "spark.kubernetes.driverEnv.HADOOP_USER_NAME": SPARK_USER,
                "spark.kubernetes.executorEnv.HADOOP_USER_NAME": SPARK_USER,
                "spark.kubernetes.driverEnv.SPARK_USER": SPARK_USER,
                "spark.kubernetes.executorEnv.SPARK_USER": SPARK_USER,
            },
            "hadoopConf": HADOOP_CONF,
        }
    }
  • DAG 정의
from __future__ import annotations

from airflow import DAG
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
import pendulum

DAG_ID = "epic_sensor_analysis"

account_id = Variable.get("account_id")
image_version = "latest"

CONTAINER_IMAGE = f"..."

APP_NAME = "spark-app"
JOB_NAME = f"{APP_NAME}-{{ ds_nodash }}-{{ ts_nodash }}"
base_date = "{{ data_interval_start.in_timezone('Asia/Seoul').strftime('%Y-%m-%d') }}"

arguments = [
    "--date",
    base_date
]

main_application_filename = "/app/main.py"

spark_application_spec = get_spark_application_spec(app_name=APP_NAME, job_name=JOB_NAME, container_image=CONTAINER_IMAGE, main_application_filename=main_application_filename, arguments=arguments)

spark_application_spec["spec"]["driver"]["cores"] = 1
spark_application_spec["spec"]["driver"]["memory"] = "1g"
spark_application_spec["spec"]["executor"]["instances"] = 5
spark_application_spec["spec"]["executor"]["cores"] = 2
spark_application_spec["spec"]["executor"]["memory"] = "6g"


with DAG(
    dag_id=DAG_ID,
    start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
    schedule="11 0 * * *",
    catchup=False,
    max_active_runs=1,
) as dag:

    submit = SparkKubernetesOperator(
        task_id="submit",
        namespace=SPARK_NAMESPACE,
        kubernetes_conn_id="kubernetes_default",
        template_spec=spark_application_spec,
        get_logs=True,
        delete_on_termination=True,
        reattach_on_restart=True,
    )

0개의 댓글