SPARK_NAMESPACE = "spark"
SPARK_JOB_SERVICE_ACCOUNT = "spark-job-sa"
SPARK_JOB_NODE_SELECTOR = {"nodegroup": "<NODE_GROUP>"}
SPARK_JOB_TOLERATIONS = [
{
"key": "nodegroup",
"operator": "Equal",
"value": "<NODE_GROUP>",
"effect": "NoSchedule",
}
]
SPARK_USER = "aibatch_svc"
HADOOP_CONF = {
"fs.defaultFS": "hdfs://<namenode_ip>:<port>",
"dfs.client.socket-timeout": "20000",
"dfs.client.retry.policy.enabled": "false",
"dfs.client.use.datanode.hostname": "false",
"dfs.client.read.shortcircuit": "false",
"ipc.client.connect.timeout": "120",
"ipc.client.connect.max.retries": "3",
"fs.s3a.aws.credentials.provider": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider",
"fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
}
def get_spark_application_spec(app_name: str, job_name: str, container_image: str, main_application_filename: str, arguments: List[str] = []):
"""
container_image: ECR 컨테이너 이미지
main_application_filename: 메인 애플리케이션 파일 경로 (ex. /app/main.py)
"""
return {
"apiVersion": "sparkoperator.k8s.io/v1beta2",
"kind": "SparkApplication",
"metadata": {
"name": job_name,
"namespace": SPARK_NAMESPACE,
"labels": {
"app": app_name,
"managed-by": "airflow",
},
},
"spec": {
"type": "Python",
"mode": "cluster",
"sparkVersion": "3.5.1",
"image": container_image,
"imagePullPolicy": "IfNotPresent",
"mainApplicationFile": f"local://{main_application_filename}",
"arguments": arguments,
"driver": {
"cores": 1,
"memory": "1g",
"serviceAccount": SPARK_JOB_SERVICE_ACCOUNT,
"nodeSelector": SPARK_JOB_NODE_SELECTOR,
"tolerations": SPARK_JOB_TOLERATIONS,
},
"executor": {
"instances": 2,
"cores": 1,
"memory": "1g",
"serviceAccount": SPARK_JOB_SERVICE_ACCOUNT,
"nodeSelector": SPARK_JOB_NODE_SELECTOR,
"tolerations": SPARK_JOB_TOLERATIONS,
},
"restartPolicy": {"type": "Never"},
"sparkConf": {
"spark.driver.extraClassPath": "/opt/spark/jars/hadoop-aws-3.3.4.jar:/opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar",
"spark.executor.extraClassPath": "/opt/spark/jars/hadoop-aws-3.3.4.jar:/opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar",
"spark.kubernetes.driverEnv.HADOOP_USER_NAME": SPARK_USER,
"spark.kubernetes.executorEnv.HADOOP_USER_NAME": SPARK_USER,
"spark.kubernetes.driverEnv.SPARK_USER": SPARK_USER,
"spark.kubernetes.executorEnv.SPARK_USER": SPARK_USER,
},
"hadoopConf": HADOOP_CONF,
}
}
from __future__ import annotations
from airflow import DAG
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
import pendulum
DAG_ID = "epic_sensor_analysis"
account_id = Variable.get("account_id")
image_version = "latest"
CONTAINER_IMAGE = f"..."
APP_NAME = "spark-app"
JOB_NAME = f"{APP_NAME}-{{ ds_nodash }}-{{ ts_nodash }}"
base_date = "{{ data_interval_start.in_timezone('Asia/Seoul').strftime('%Y-%m-%d') }}"
arguments = [
"--date",
base_date
]
main_application_filename = "/app/main.py"
spark_application_spec = get_spark_application_spec(app_name=APP_NAME, job_name=JOB_NAME, container_image=CONTAINER_IMAGE, main_application_filename=main_application_filename, arguments=arguments)
spark_application_spec["spec"]["driver"]["cores"] = 1
spark_application_spec["spec"]["driver"]["memory"] = "1g"
spark_application_spec["spec"]["executor"]["instances"] = 5
spark_application_spec["spec"]["executor"]["cores"] = 2
spark_application_spec["spec"]["executor"]["memory"] = "6g"
with DAG(
dag_id=DAG_ID,
start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
schedule="11 0 * * *",
catchup=False,
max_active_runs=1,
) as dag:
submit = SparkKubernetesOperator(
task_id="submit",
namespace=SPARK_NAMESPACE,
kubernetes_conn_id="kubernetes_default",
template_spec=spark_application_spec,
get_logs=True,
delete_on_termination=True,
reattach_on_restart=True,
)