(1) TriggerDagRun 오퍼레이터
(2) ExternalTask 센서
비교 | TriggerDagRun 오퍼레이터 | ExternalTask 센서 |
---|---|---|
방식 | 실행할 다른 DAG의 ID를 지정하여 수행한다. | 본 Task가 수행되기 전 다른 DAG의 완료를 기다린 후에 수행된다. |
권고 적용시점 | Trigger 되는 DAG의 선행 DAG가 하나만 있을 경우 | Trigger 되는 DAG의 선행 DAG가 2개 이상인 경우 |
Airflow_trigger_dag_run 오퍼레이터 가이드
t1 -> t2 -> t3
-> C
만약, t2가 C를 Success로 완료하고, t2가 Success로 완료된 후에 t3를 trigger하고 싶다면, 아래와 같은 파라미터를 쓴다.
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
with DAG(
dag_id="dags_trigger_dag_run_operator",
start_date=pendulum.datetime(2023, 8, 1, tz="Asia/Seoul"),
schedule="30 9 * * *",
catchup=False
) as dag:
start_task = BashOperator(
task_id="start_task",
bash_command='echo "start!"',
)
trigger_dag_task = TriggerDagRunOperator(
task_id='trigger_dag_task',
trigger_dag_id='dags_python_operator',
trigger_run_id=None,
execution_date='{{data_interval_start}}',
reset_dag_run=True,
wait_for_completion=False,
poke_interval=60,
allowed_states=['Success'],
failed_states=None
)
start_task >> trigger_dag_task
PythonOperator는 매일 6시 30분마다 Trigger되는 Dag이다.
-> 이 Dag를 Manual로 하게 되면, execution_date는 manual__2023-08-15T12:37:45.251160+00:00
로 나오게 된다. 이때, 이 시간은 UTC 기준이므로 9시간을 더해주면, 현재 시간이 나온다. 현재 시간은 2023년 8월 15일 오전 9시 37분 45초이다.
trigger_dag_task = TriggerDagRunOperator(
task_id='trigger_dag_task',
trigger_dag_id='dags_python_operator',
trigger_run_id=None,
execution_date='{{data_interval_start}}',
그럼, 아까 만들어줬던 dags_trigger_dag_run_operator로 가서 Data_interval_start를 확인해보자.
매일 오전 9시 30분으로 schedule를 설정했기 때문에, Data_interval_start는 데이터 관점에서 이전에 수행되었던 날짜인 8월 14일 오전 9시 30분이 나왔다. 가장 최근에 수행된 날짜는 Data_interval_end로 8월 15일 오전 9시 30분이다.
dags_python_operator
에 그대로 전달된다.manual__2023-08-14T00:30:00+00:00
임을 확인할 수 있다.