[Airflow] DAGs

2022년 8월 15일
  • Aiflow의 Task를 모으는 core concept이다.

  • Tasks는 A,B,C,D 이다. run 시 해당 task 들은 의존성을 갖고 순서대로 수행된다.
  • 내일부터 5분 마다, 새해 첫날 마다 등으로 수행할 수도 있다.
  • DAG는 task에서 어떤 작업을 수행하는지 신경쓰지 안흔ㄴ다. 오직 어떻게 그것들이 실행될 것인지에만 신경쓴다.
    • 어떤 순서로, 얼마나 재수행할지, timeout은 등

Declaring a DAG

  • DAG를 declaring 하는 세가지 방법이있다.
with DAG(
    "my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule_interval="@daily", catchup=False
) as dag:
    op = EmptyOperator(task_id="task")
  • context manager 방법
my_dag = DAG("my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
             schedule_interval="@daily", catchup=False)
op = EmptyOperator(task_id="task", dag=my_dag)
  • standard 생성자를 사용하는 방법
@dag(start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     schedule_interval="@daily", catchup=False)
def generate_dag():
    op = EmptyOperator(task_id="task")

dag = generate_dag()
  • @dag를 통한 방법
    • DAG generator로 function을 turon 해준다.

Task Dependencies

first_task >> [second_task, third_task]
third_task << fourth_task
  • Task/Operator는 보통 혼자 있지않고, 서로 의존성을 가진다.
  • Task들 사이 의존성을 선언하는 방법은 DAG 구조로 만드는 것이다.
first_task.set_downstream(second_task, third_task)
  • 해당 method를 통해서 명시적으로 적용하는 방법도 있다.
from airflow.models.baseoperator import cross_downstream

# Replaces
# [op1, op2] >> op3
# [op1, op2] >> op4
cross_downstream([op1, op2], [op3, op4])
  • cross_downstream이라는 shortcut으 사용하면 위의 task를 쉽게 대체할 수 있다.
from airflow.models.baseoperator import chain

# Replaces op1 >> op2 >> op3 >> op4
chain(op1, op2, op3, op4)

# You can also do it dynamically
chain(*[EmptyOperator(task_id='op' + i) for i in range(1, 6)])
  • chain을 사용해서 의존성을 함께 묶는 shortcut도 있다.
from airflow.models.baseoperator import chain

# Replaces
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)
  • pair wise 의존성도 쌉가능 !

Loading DAGs

dag_1 = DAG('this_dag_will_be_discovered')

def my_function():
    dag_2 = DAG('but_this_dag_will_not')

  • aiflow는 python source 파일에서 DAGs를 load 한다. 이는 DAG_FOLDER에 설정되어있다.
  • 이는 해당 파일로 어떠한 DAG 객체도 load 하고 실행할 수 있다는 것을 의미한다.
dag_1 = DAG('this_dag_will_be_discovered')

def my_function():
    dag_2 = DAG('but_this_dag_will_not')

  • load되는 순서에 유의해야되는데, 위처럼 코드를 작성했을 경우 dag_1이 적용되고 dag_2는 load되지 않는다. dag_1이 top level에 있는 것이다.

Running DAGs

  • 2가지 방법이 있다.
    • API로 수동으로 triggering
    • schdule을 정의
with DAG("my_daily_dag", schedule_interval="@daily"):
  • schedule_interval argument를 통해 스케줄을 정의할 수 있다.
with DAG("my_daily_dag", schedule_interval="0 * * * *"):
  • Crontab 스케줄 값을 넣어주면된다.

DAG Assignment

  • 명시적으로 Task/Operator를 pass 하지 않아도 되는몇가지 방법이 있다.
    • with DAG block안에 Operator를 선언
    • @dag decordator로 Operator 선언
    • DAG를 갖고 있는 Operator의 upstream, downstream에 Operator를 넣는다.

Default Arguments

  • retires와 같은 default argument는 대부분 공통 set으로 많이 쓴다.
  • defulat_ags를 통해 이러한 과정을 진행할 수 있다.
import pendulum

with DAG(
    start_date=pendulum.datetime(2016, 1, 1, tz="UTC"),
    default_args={'retries': 2},
) as dag:
    op = BashOperator(task_id='dummy', bash_command='Hello World!')
    print(op.retries)  # 2

The DAG decorator

    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
def example_dag_decorator(email: str = 'example@example.com'):
    DAG to send server IP to email.

    :param email: Email to send IP to. Defaults to example@example.com.
    get_ip = GetRequestOperator(task_id='get_ip', url="http://httpbin.org/get")

    def prepare_email(raw_json: Dict[str, Any]) -> Dict[str, str]:
        external_ip = raw_json['origin']
        return {
            'subject': f'Server connected from {external_ip}',
            'body': f'Seems like today your server executing Airflow is connected from IP {external_ip}<br>',

    email_info = prepare_email(get_ip.output)

        task_id='send_email', to=email, subject=email_info['subject'], html_content=email_info['body']

dag = example_dag_decorator()
  • @dag decordator는 DAG generator function으로 바꿔준다.
  • 코드가 깔끔해지고 DAG 파라미터로 function에 setup 해줄 수 도 있다.
  • Jinja template을 통해 Python Code나 {{ }} 형태로 파라미터를 접근할 수 있게된다.

Control Flow

  • Branching
    • 선택한 Task를 상황에 따라 이동시킬 수 있다.
  • Latest Only
    • ??
  • Depnds On Past
    • task가 이전 run에 의존하도록 해준다.
  • Trigger Rules
    • DAG가 run할 Taskㄹ르 상황에 따라 지정해준다.


  • 모든 곳에 의존성이 없는 task를 branching 할 수 있다.
def branch_func(ti):
    xcom_value = int(ti.xcom_pull(task_ids="start_task"))
    if xcom_value >= 5:
        return "continue_task"
    elif xcom_value >= 3:
        return "stop_task"
        return None

start_op = BashOperator(
    bash_command="echo 5",

branch_op = BranchPythonOperator(

continue_op = EmptyOperator(task_id="continue_task", dag=dag)
stop_op = EmptyOperator(task_id="stop_task", dag=dag)

start_op >> branch_op >> [continue_op, stop_op]

