동적으로 DAG 를 만드는 방법 중에 디자인 패턴의 팩토리 패턴의 방법을 참고해서 만들 수 있다.
Factory 패턴을 이용해서 DAG 를 만드는 방식은 아래와 같다.
airflow_dags/
│── dags/
│ ├── my_dags.py # Airflow가 인식할 DAG 파일
│── dag_factory/
│ ├── factory.py # DAG을 생성하는 팩토리 함수 정의
│ ├── task_definitions.py # 개별 태스크 정의
from airflow import DAG
from airflow.operators.python import PythonOperator
def create_dag(dag_id, schedule, default_args, task_configs):
"""
팩토리 패턴을 활용하여 DAG을 생성하는 함수
"""
dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)
with dag:
for task_id, task_callable in task_configs.items():
PythonOperator(
task_id=task_id,
python_callable=task_callable,
dag=dag
)
return dag
def sample_task():
print("Hello, Airflow!")
# 여러 DAG에서 동일한 태스크를 사용
task_configs = {
"task_1": sample_task,
"task_2": sample_task
}
from datetime import datetime
from dag_factory.factory import create_dag
from dag_factory.task_definitions import task_configs
default_args = {"owner": "airflow", "start_date": datetime(2024, 1, 1)}
# 10개의 DAG을 생성
for i in range(1, 11):
dag_id = f"factory_generated_dag_{i}" # DAG ID 동적 생성
schedule = "@daily" # 모든 DAG의 스케줄 동일
globals()[dag_id] = create_dag(dag_id, schedule, default_args, task_configs)
airflow_dags/
│── dags/
│ ├── generate_dags.py # DAG을 Airflow에 로드하는 Python 파일
│── config/
│ ├── dag_config_1.yaml # DAG 1 정의
│ ├── dag_config_2.yaml # DAG 2 정의
│ ├── dag_config_3.yaml # DAG 3 정의
│── tasks/
│ ├── task_definitions.py # PythonOperator에서 실행할 함수 정의
default_args:
owner: "airflow"
start_date: "2024-01-01"
retries: 1
dags:
example_dag_1:
schedule_interval: "@daily"
tasks:
task_1:
operator: "airflow.operators.bash.BashOperator"
bash_command: "echo 'Hello from DAG 1'"
task_2:
operator: "airflow.operators.python.PythonOperator"
python_callable_name: "my_python_task"
python_callable_file: "tasks/task_definitions.py"
default_args:
owner: "airflow"
start_date: "2024-01-01"
dags:
example_dag_2:
schedule_interval: "@hourly"
tasks:
task_1:
operator: "airflow.operators.bash.BashOperator"
bash_command: "echo 'Hello from DAG 2'"
default_args:
owner: "airflow"
start_date: "2024-01-01"
dags:
example_dag_3:
schedule_interval: "0 12 * * *"
tasks:
task_1:
operator: "airflow.operators.bash.BashOperator"
bash_command: "echo 'DAG 3 runs at noon!'"
def my_python_task():
print("Hello from Python Task!")
import dagfactory
# 여러 개의 YAML 파일을 로드
yaml_files = [
"/opt/airflow/config/dag_config_1.yaml",
"/opt/airflow/config/dag_config_2.yaml",
"/opt/airflow/config/dag_config_3.yaml"
]
# 각 YAML 파일을 로드해서 DAG 생성
for yaml_file in yaml_files:
dag_factory = dagfactory.DagFactory(yaml_file)
dag_factory.generate_dags(globals()) # Airflow에서 DAG을 자동 인식
YAML 을 파싱하는 코드를 만들어 DAG 를 생성하는 방법