Airflow DAG 선언하기

Alan·2023년 3월 22일
0

Airflow 맛보기

목록 보기
3/7

Airflow 설치시, 기본으로 구성된 turorial DAG를 살펴보자

from __future__ import annotations

# [START tutorial]
# [START import_module]
from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator

# [END import_module]


# [START instantiate_dag]
with DAG(
    "tutorial",
    # [START default_args]
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "email": ["airflow@example.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function,
        # 'on_success_callback': some_other_function,
        # 'on_retry_callback': another_function,
        # 'sla_miss_callback': yet_another_function,
        # 'trigger_rule': 'all_success'
    },
    # [END default_args]
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:
    # [END instantiate_dag]

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    # [START basic_task]
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    # [END basic_task]

    # [START documentation]
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    # [END documentation]

    # [START jinja_template]
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )
    # [END jinja_template]

    t1 >> [t2, t3]
# [END tutorial]

DAG 선언

  • 하나의 Python파일로 DAG를 선언

  • 이때, task가 구동되는 context와 script가 선언된 context가 다를 수 있음에 주의

  • 서로 다른 worker는 상호 간 통신이 불가능하지만, 이를 지원하기 위한 XComs라는 기능이 존재

  • 스케줄러는 Python DAG파일을 바탕으로 변경사항을 반영. 하지만, DAG의 양이나 airflow-scheduler의 리소스 등의 환경적 요인들로 인해 수 초 ~ 수 분까지 반영되지 않을 수 있음

Import Modules

  • Workflow Pipeline은 DAG 개체를 정의하는 Python Script.

  • 필요한 라이브러리를 가져와야 하며, 해당 라이브러리는 airflow가 동작하는 모든 머신의 Python 경로에 설치되어 있어야 함

Default Arguments

default_args={
    "depends_on_past": False,
    "email": ["airflow@example.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
},
  • 하나의 DAG에 여러 개의 Task를 정의한다면, 각 Task 생성자에게 인수 집합을 명시적으로 전달할 수도 있고, Task를 만들 때 사용 가능한 기본 매개 변수의 dictionary를 정의할 수 있음

  • 어떤 task에서 다른 arguments를 사용해야 한다면, 해당 task에서 명시적으로 선언해 override해서 사용

  • 주요 설정

    • depends_on_past

      • 이전 스케줄링된(DAG) task가 완료되어야 하는지 여부
      • DAG가 a → b → c 라면, 이전 스케줄링의 a가 완료되어야 현재 스케줄링의 a가 실행될 수 있음
    • retries

      • 재시도 횟수(최소 1)
    • retry_delay

      • 실패시 재시도까지 기다리는 시간
    • queue

      • 해당 task를 넣을 queue, 설정하지 않으면 airflow.cfg의 default queue로 설정

      • worker 노드를 특정하고 싶을 때 사용(worker 마다 서로 다른 queue를 구독하게 해서 분리)

    • wait_for_downstream

      • 이전 스케줄링된 task의 downstream 작업들이 완료되어야 하는지 여부
      • DAG가 a → b → c 라면, 이전 스케줄링의 a, b, c까지 모두 완료되어야 현재 스케줄링의 a가 실행될 수 있음
    • execution_timeout

      • 실행 시간의 timeout
      • 너무 오랫동안 hang 되어 있는 작업을 방지
    • {}_callback

      • {} 조건 하에서 수행될 callback 함수 등록
      • on_failure_callback에 nofify함수를 등록해 task 실패 시, notify할 수 있음
      • 이 밖에도 자원이나 변수 정리 등에 사용
    • task_concurrency

      • 같은 task에 대해 동시에 active run 상태를 허용하는 task 수준에서의 concurrency

0개의 댓글