DAG Factory로 Airflow DAG 생성하기

모건레옹·2025년 3월 20일
1
post-thumbnail

⚠️ 서론 - 문제점 파악 및 해결 방안찾기

  • 문제점: Airflow DAG 를 만드는데 불편하다..
    • 예시 1: 비슷한 DAG 를 계속해서 만든다
    • 예시 2: 비슷한 DAG 를 계속 복사 및 붙여넣기를 하다보니 실수를 할 때가 발생한다.
    • 예시 3: 설정 파일로 관리하고 싶다.
  • 위 문제를 해결하려고 생각해보니 DAG 를 동적으로 만들 수 밖에 없다.

DAG Factory - 디자인 패턴을 직접 구현하는 방법

동적으로 DAG 를 만드는 방법 중에 디자인 패턴의 팩토리 패턴의 방법을 참고해서 만들 수 있다.
Factory 패턴을 이용해서 DAG 를 만드는 방식은 아래와 같다.

  • DAG 메타데이터를 관리하는 데이터베이스 혹은 CSV 를 활용해서 DAG 를 생성하는 파일을 만든다면, 동적으로 DAG 생성을 진행하도록 만들 수 있다.

📂 디렉토리 구조

airflow_dags/
│── dags/
│   ├── my_dags.py  # Airflow가 인식할 DAG 파일
│── dag_factory/
│   ├── factory.py  # DAG을 생성하는 팩토리 함수 정의
│   ├── task_definitions.py  # 개별 태스크 정의

💻 팩토리 함수 정의 (factory.py)

from airflow import DAG
from airflow.operators.python import PythonOperator

def create_dag(dag_id, schedule, default_args, task_configs):
    """
    팩토리 패턴을 활용하여 DAG을 생성하는 함수
    """
    dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)
    
    with dag:
        for task_id, task_callable in task_configs.items():
            PythonOperator(
                task_id=task_id,
                python_callable=task_callable,
                dag=dag
            )
    
    return dag

💻 태스크 정의 (task_definitions.py)

def sample_task():
    print("Hello, Airflow!")

# 여러 DAG에서 동일한 태스크를 사용
task_configs = {
    "task_1": sample_task,
    "task_2": sample_task
}

💻 DAG 10개 생성 (my_dags.py)


from datetime import datetime
from dag_factory.factory import create_dag
from dag_factory.task_definitions import task_configs

default_args = {"owner": "airflow", "start_date": datetime(2024, 1, 1)}

# 10개의 DAG을 생성
for i in range(1, 11):
    dag_id = f"factory_generated_dag_{i}"  # DAG ID 동적 생성
    schedule = "@daily"  # 모든 DAG의 스케줄 동일

    globals()[dag_id] = create_dag(dag_id, schedule, default_args, task_configs)

DAG Factory - 오픈소스

  • 위와 같이 DAG Factory 를 직접 구현하기 번거롭게 느껴진다면 오픈소스로 만들어진 DAG Factory 를 이용할 수 있다.
    • DAG Factory
    • 관리형 Airflow 서비스하는 회사인 Astronomer 에서 개발한 오픈소스이다.
    • 사용방법은 Getting Started를 참고하였다.

📂 디렉토리 구조

airflow_dags/
│── dags/
│   ├── generate_dags.py  # DAG을 Airflow에 로드하는 Python 파일
│── config/
│   ├── dag_config_1.yaml  # DAG 1 정의
│   ├── dag_config_2.yaml  # DAG 2 정의
│   ├── dag_config_3.yaml  # DAG 3 정의
│── tasks/
│   ├── task_definitions.py  # PythonOperator에서 실행할 함수 정의

💻 dag_config_1.yaml

default_args:
  owner: "airflow"
  start_date: "2024-01-01"
  retries: 1

dags:
  example_dag_1:
    schedule_interval: "@daily"
    tasks:
      task_1:
        operator: "airflow.operators.bash.BashOperator"
        bash_command: "echo 'Hello from DAG 1'"
      task_2:
        operator: "airflow.operators.python.PythonOperator"
        python_callable_name: "my_python_task"
        python_callable_file: "tasks/task_definitions.py"

💻 dag_config_2.yaml

default_args:
  owner: "airflow"
  start_date: "2024-01-01"

dags:
  example_dag_2:
    schedule_interval: "@hourly"
    tasks:
      task_1:
        operator: "airflow.operators.bash.BashOperator"
        bash_command: "echo 'Hello from DAG 2'"

💻 dag_config_3.yaml

default_args:
  owner: "airflow"
  start_date: "2024-01-01"

dags:
  example_dag_3:
    schedule_interval: "0 12 * * *"
    tasks:
      task_1:
        operator: "airflow.operators.bash.BashOperator"
        bash_command: "echo 'DAG 3 runs at noon!'"

💻 Python 태스크 정의 (task_definitions.py)

  • example_dag_1 에서 사용할 my_python_task 함수 정의
def my_python_task():
    print("Hello from Python Task!")

💻 DAG을 로드하는 Python 코드 (generate_dags.py)

import dagfactory

# 여러 개의 YAML 파일을 로드
yaml_files = [
    "/opt/airflow/config/dag_config_1.yaml",
    "/opt/airflow/config/dag_config_2.yaml",
    "/opt/airflow/config/dag_config_3.yaml"
]

# 각 YAML 파일을 로드해서 DAG 생성
for yaml_file in yaml_files:
    dag_factory = dagfactory.DagFactory(yaml_file)
    dag_factory.generate_dags(globals())  # Airflow에서 DAG을 자동 인식
  • globals() 를 사용하는 이유는 Airflow 에서 DAG 를 전역적으로 인식시키기 위함

만약 다음과 같은 상황이라면.. DAG Factory 사용을 고려해봐도 좋다

  • DAG 스크립트의 경우, 개발자들의 역량에 맡기게 되고 그러다보면 개발자 나름대로의 결과물들을 생성하게 된다.
  • 초기 케이스 연구가 잘 진행되었거나 어느정도 운영을 진행하다보면 비슷한 비즈니스 기반으로 DAG 생성을 요청받게 되는 경우가 생긴다.
  • 협업을 진행하며서 서로 코멘트를 남기고 수정을 진행하겠지만 매번 코멘트를 다는 노력이 필요하고, 언젠간 사람 모두가 실수하는 경우가 생길 수 있다.
  • DAG 로 만들어지는 USE CASE 를 정리할 수 있다.

참고자료

YAML 을 파싱하는 코드를 만들어 DAG 를 생성하는 방법

2025년 3월 나는 이럴 때 이 내용을 찾아봤습니다.

  • 회사를 두개 거쳐오면서 Airflow를 사용하고 있다.(약 3년)
  • 각 회사들이 Airflow 를 전문적으로 잘 사용하고 있다고 말할 순 없을 것 같다.
    • DAG 는 약 50개, Task 는 최대 10개 정도로 그리 많지 않은 DAG 와 TASK 의 수
    • 자체 커스터마이징을 진행한 Operator 나 Sensor 는 사용하지 않는다.
    • PythonOperator, GlueOperator, DatabricksRunNowOperator, ExternalTaskSensor 등 사용하기 쉬운 Task 를 개발하여 사용하고 있다.
profile
데이터 엔지니어 ㅎ_ㅎ

0개의 댓글