[Airflow] 03. DAG 구성하기 (2)

Jay Park·2021년 11월 22일
0

DAG 두 번째

(continued)

DAG 할당 (Assignment)

모든 오퍼레이터와 태스크들은 DAG에 할당되어야만 실행이 가능하다.
Airflow는 명시적으로 전달하지 않더라도 DAG을 추정할 수 있는 몇 가지 방법을 제공한다.

  • with DAG 내에서 오퍼레이터를 선언하는 경우
  • @dag 데코레이터 내에서 오퍼레이터를 선언하는 경우
  • DAG이 지정된 오퍼레이터를 업스트림이나 다운스트림으로 지정하는 경우

명시적으로는 dag= 인자에 DAG을 지정한 후 각 오퍼레이터에 전달해야 한다.

기본 인자

DAG 안의 모든 오퍼레이터가 공통적으로 가져야 하는 인자들은 default_args에 담겨져 전달된다.

default_args = {
    'start_date': datetime(2016, 1, 1),
    'owner': 'airflow'
}

with DAG('my_dag', default_args=default_args) as dag:
    op = DummyOperator(task_id='dummy')
    print(op.owner)  # "airflow"

제어 흐름 (Control Flow)

기본적으로 하나의 태스크는 그것이 의존하는 모든 태스크들이 성공했을 때에만 실행된다. 하지만, 이러한 기본 동작을 변경할 수 있는 방법들이 존재한다.

  • Branching
  • Latest Only
  • Depends On Past
  • Trigger Rules

Branching

DAG 에게 의존하는 모든 태스크들을 실행하는 대신 하나의 경로를 선택해서 내려가라고 할 수 있다.

[주의] 태스크가 브랜칭 오퍼레이터의 다운스트림이면서 선택된 태스크들 중 하나의 다운스트림일 경우 건너뛰지 않는다.

위 그림에서 브랜칭 태스크는 branch_a, join 그리고 branch_이다. 여기서 joinbranch_a의 다운스트림이므로 브랜치가 결정되는 과정에서 선택되지 않았음에도 실행될 것이다.

branching 태스크

여기서 브랜칭 오퍼레이터가 등장한다. BranchPythonOperatorpython_callable 에 Callback 함수를 지정한다. DAG은 Callback 함수에서 리턴되는 태스크의 이름을 따라 실행되며 나머지 경로들은 무시된다.

def branch_func(ti):
    xcom_value = int(ti.xcom_pull(task_ids="start_task"))
    if xcom_value >= 5:
        return "continue_task"
    else:
        return "stop_task"

start_op = BashOperator(
    task_id="start_task",
    bash_command="echo 5",
    xcom_push=True,
    dag=dag,
)

branch_op = BranchPythonOperator(
    task_id="branch_task",
    python_callable=branch_func,
    dag=dag,
)

continue_op = DummyOperator(task_id="continue_task", dag=dag)
stop_op = DummyOperator(task_id="stop_task", dag=dag)

start_op >> branch_op >> [continue_op, stop_op]

이 예시는 xcom_value의 값에 따라 서로 다른 태스크를 실행하게 된다.

Latest Only

import datetime as dt

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule

with DAG(
    dag_id='latest_only_with_trigger',
    schedule_interval=dt.timedelta(hours=4),
    start_date=dt.datetime(2021, 1, 1),
    catchup=False,
    tags=['example3'],
) as dag:
    latest_only = LatestOnlyOperator(task_id='latest_only')
    task1 = DummyOperator(task_id='task1')
    task2 = DummyOperator(task_id='task2')
    task3 = DummyOperator(task_id='task3')
    task4 = DummyOperator(task_id='task4', trigger_rule=TriggerRule.ALL_DONE)

    latest_only >> task1 >> [task3, task4]
    task2 >> [task3, task4]

위의 예시에서:

  • task1 is directly downstream of latest_only and will be skipped for all runs except the latest.
  • task2 is entirely independent of latest_only and will run in all scheduled periods
  • task3 is downstream of task1 and task2 and because of the default trigger rule being all_success will receive a cascaded skip from task1.
  • task4 is downstream of task1 and task2, but it will not be skipped, since its trigger_rule is set to all_done.

Depends On Past

이전 DAG Ruun 에서 해당 태스크가 성공한 경우에만 실행하기를 원할 경우 태스크의 depends_on_past 인자를 True로 설정한다.

Trigger Rules

기본적으로 Airflow는 업스트림 태스크들이 모두 성공하기를 기다렸다가 해당 태스크를 실행한다.

  • all_success (default): All upstream tasks have succeeded
  • all_failed: All upstream tasks are in a failed or upstream_failed state
  • all_done: All upstream tasks are done with their execution
  • one_failed: At least one upstream task has failed (does not wait for all upstream tasks to be done)
  • one_success: At least one upstream task has succeeded (does not wait for all upstream tasks to be done)
  • none_failed: All upstream tasks have not failed or upstream_failed - that is, all upstream tasks have succeeded or been skipped
  • none_failed_min_one_success: All upstream tasks have not failed or upstream_failed, and at least one upstream task has succeeded.
  • none_skipped: No upstream task is in a skipped state - that is, all upstream tasks are in a success, failed, or upstream_failed state
  • always: No dependencies at all, run this task at any time

[주의] 트리거 규칙과 스킵된 태스크들간의 상호작용을 이해하는 것이 중요하다. 특히 브랜칭 작업간에 스킵된 태스크들에서는 더욱 그러하다.
all_successall_failed인 브랜칭 연산의 다운스트림을 사용하고 싶지는 않을 것이기 때문이다.

Skipped tasks will cascade through trigger rules all_success and all_failed, and cause them to skip as well. Consider the following DAG:

# dags/branch_without_trigger.py
import datetime as dt

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator

dag = DAG(
    dag_id="branch_without_trigger",
    schedule_interval="@once",
    start_date=dt.datetime(2019, 2, 28),
)

run_this_first = DummyOperator(task_id="run_this_first", dag=dag)
branching = BranchPythonOperator(
    task_id="branching", dag=dag, python_callable=lambda: "branch_a"
)

branch_a = DummyOperator(task_id="branch_a", dag=dag)
follow_branch_a = DummyOperator(task_id="follow_branch_a", dag=dag)

branch_false = DummyOperator(task_id="branch_false", dag=dag)

join = DummyOperator(task_id="join", dag=dag)

run_this_first >> branching
branching >> branch_a >> follow_branch_a >> join
branching >> branch_false >> join

joinfollow_branch_abranch_false의 다운스트림이다.. join 태스크는 스킵된 것으로 보여질 것이다. 왜냐하면 trigger_ruleall_success로 기본설정되어 있어서이다. 그리고 브랜칭 연산에 의해서 초래된 스킵은 따라 내려가면서 all_success로 표시된 태스크들을 건너뛴다.

join 태스크의 trigger_rulenone_failed_min_one_success로 설정하면, 대신에 의도된 결과를 얻을 수 있다.

참고자료

profile
Jaytiger

0개의 댓글