(continued)
모든 오퍼레이터와 태스크들은 DAG에 할당되어야만 실행이 가능하다.
Airflow는 명시적으로 전달하지 않더라도 DAG을 추정할 수 있는 몇 가지 방법을 제공한다.
with DAG
내에서 오퍼레이터를 선언하는 경우@dag
데코레이터 내에서 오퍼레이터를 선언하는 경우명시적으로는 dag=
인자에 DAG을 지정한 후 각 오퍼레이터에 전달해야 한다.
DAG 안의 모든 오퍼레이터가 공통적으로 가져야 하는 인자들은 default_args
에 담겨져 전달된다.
default_args = {
'start_date': datetime(2016, 1, 1),
'owner': 'airflow'
}
with DAG('my_dag', default_args=default_args) as dag:
op = DummyOperator(task_id='dummy')
print(op.owner) # "airflow"
기본적으로 하나의 태스크는 그것이 의존하는 모든 태스크들이 성공했을 때에만 실행된다. 하지만, 이러한 기본 동작을 변경할 수 있는 방법들이 존재한다.
DAG 에게 의존하는 모든 태스크들을 실행하는 대신 하나의 경로를 선택해서 내려가라고 할 수 있다.
[주의] 태스크가 브랜칭 오퍼레이터의 다운스트림이면서 선택된 태스크들 중 하나의 다운스트림일 경우 건너뛰지 않는다.
위 그림에서 브랜칭 태스크는 branch_a
, join
그리고 branch_
이다. 여기서 join
은 branch_a
의 다운스트림이므로 브랜치가 결정되는 과정에서 선택되지 않았음에도 실행될 것이다.
branching
태스크
여기서 브랜칭 오퍼레이터가 등장한다. BranchPythonOperator
는 python_callable
에 Callback 함수를 지정한다. DAG은 Callback 함수에서 리턴되는 태스크의 이름을 따라 실행되며 나머지 경로들은 무시된다.
def branch_func(ti):
xcom_value = int(ti.xcom_pull(task_ids="start_task"))
if xcom_value >= 5:
return "continue_task"
else:
return "stop_task"
start_op = BashOperator(
task_id="start_task",
bash_command="echo 5",
xcom_push=True,
dag=dag,
)
branch_op = BranchPythonOperator(
task_id="branch_task",
python_callable=branch_func,
dag=dag,
)
continue_op = DummyOperator(task_id="continue_task", dag=dag)
stop_op = DummyOperator(task_id="stop_task", dag=dag)
start_op >> branch_op >> [continue_op, stop_op]
이 예시는 xcom_value
의 값에 따라 서로 다른 태스크를 실행하게 된다.
import datetime as dt
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule
with DAG(
dag_id='latest_only_with_trigger',
schedule_interval=dt.timedelta(hours=4),
start_date=dt.datetime(2021, 1, 1),
catchup=False,
tags=['example3'],
) as dag:
latest_only = LatestOnlyOperator(task_id='latest_only')
task1 = DummyOperator(task_id='task1')
task2 = DummyOperator(task_id='task2')
task3 = DummyOperator(task_id='task3')
task4 = DummyOperator(task_id='task4', trigger_rule=TriggerRule.ALL_DONE)
latest_only >> task1 >> [task3, task4]
task2 >> [task3, task4]
위의 예시에서:
이전 DAG Ruun 에서 해당 태스크가 성공한 경우에만 실행하기를 원할 경우 태스크의 depends_on_past
인자를 True
로 설정한다.
기본적으로 Airflow는 업스트림 태스크들이 모두 성공하기를 기다렸다가 해당 태스크를 실행한다.
[주의] 트리거 규칙과 스킵된 태스크들간의 상호작용을 이해하는 것이 중요하다. 특히 브랜칭 작업간에 스킵된 태스크들에서는 더욱 그러하다.
all_success
나 all_failed
인 브랜칭 연산의 다운스트림을 사용하고 싶지는 않을 것이기 때문이다.
Skipped tasks will cascade through trigger rules all_success and all_failed, and cause them to skip as well. Consider the following DAG:
# dags/branch_without_trigger.py
import datetime as dt
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
dag = DAG(
dag_id="branch_without_trigger",
schedule_interval="@once",
start_date=dt.datetime(2019, 2, 28),
)
run_this_first = DummyOperator(task_id="run_this_first", dag=dag)
branching = BranchPythonOperator(
task_id="branching", dag=dag, python_callable=lambda: "branch_a"
)
branch_a = DummyOperator(task_id="branch_a", dag=dag)
follow_branch_a = DummyOperator(task_id="follow_branch_a", dag=dag)
branch_false = DummyOperator(task_id="branch_false", dag=dag)
join = DummyOperator(task_id="join", dag=dag)
run_this_first >> branching
branching >> branch_a >> follow_branch_a >> join
branching >> branch_false >> join
join
은 follow_branch_a
와 branch_false
의 다운스트림이다.. join
태스크는 스킵된 것으로 보여질 것이다. 왜냐하면 trigger_rule
이 all_success
로 기본설정되어 있어서이다. 그리고 브랜칭 연산에 의해서 초래된 스킵은 따라 내려가면서 all_success
로 표시된 태스크들을 건너뛴다.
join
태스크의 trigger_rule
을 none_failed_min_one_success
로 설정하면, 대신에 의도된 결과를 얻을 수 있다.