[Airflow] Trigger Dag Run Operator

minyeamer·2025년 6월 7일
0

Apache Airflow 배우기

목록 보기
9/13
post-thumbnail

TriggerDagRunOperator

https://airflow.apache.org/docs/apache-airflow/2.3.4/_api/airflow/operators/trigger_dagrun/index.html

  • 다른 DAG을 실행시키는 Operator
  • 실행할 다른 DAG의 ID를 지정하여 수행
  • 선행 DAG이 하나만 있을 경우 TriggerDagRunOperator를 사용하고, 선행 DAG이 2개 이상인 경우는 ExternalTaskSensor를 사용 권장
trigger-dagrunexternal-task

run_id

  • DAG의 수행 방식과 시간을 유일하게 식별해주는 키
  • 수행 방식(Schedule, manual, Backfill)에 따라 키가 달라짐
  • 스케줄에 의해 실행된 경우 scheduled__{{data_interval_start}} 값을 가짐
    • 예시) scheduled__2025-06-01T00:00:00+00:00

TriggerDagRun 활용

  • trigger_run_id : DAG을 실행시킬 때 어떤 run_id 로 실행할지 지정 가능
  • logical_date : DAG이 트리거된 시간을 지정 가능, manual__{{logical_date}}
  • reset_dag_run : run_id 로 수행된 이력이 있어도 실행시키려면 True 로 설정
  • wait_for_completion : 지정한 DAG이 완료되어야 다음 Task를 실행하고 싶을 경우 True 로 설정
    • 기본적으로는 DAG의 완료 여부에 관계없이 success 로 빠져나가 다음 Task를 실행
  • poke_interval : 지정한 DAG이 완료되었는지 확인하는 주기
  • allowed_states : Task가 success 상태가 되기 위한 DAG의 처리 상태 목록
  • failed_states : Task가 failed 상태가 되기 위한 DAG의 처리 상태 목록
# dags/trigger_dagrun.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id="trigger_dagrun",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    schedule="0 0 * * *",
    catchup=False,
    tags=["example", "dagrun"],
) as dag:
    start_task = BashOperator(
        task_id="start_task",
        bash_command="echo \"start!\"",
    )

    trigger_dag_task = TriggerDagRunOperator(
        task_id="trigger_dag_task",
        trigger_dag_id="python_operator",
        trigger_run_id=None,
        logical_date="{{data_interval_start}}",
        reset_dag_run=True,
        wait_for_completion=False,
        poke_interval=60,
        allowed_states=["success"],
        failed_states=None
        )

    start_task >> trigger_dag_task

DAG 실행

  • trigger_dag_task 의 실행 로그에서 python_operator 가 호출된 것을 확인
[2025-06-07, 10:52:54] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-07, 10:52:54] INFO - Filling up the DagBag from /opt/airflow/dags/trigger_dagrun.py: source="airflow.models.dagbag.DagBag"
[2025-06-07, 10:52:54] INFO - Triggering Dag Run.: trigger_dag_id="python_operator": source="task"
[2025-06-07, 10:52:54] INFO - Dag Run triggered successfully.: trigger_dag_id="python_operator": source="task"
  • 두 번째 이미지인 PythonOperator의 run_id 가 첫 번째 이미지인 TriggerDagRunOperator의 실행 시간과 같다는 것을 알 수 있으며, trigger_run_id 를 지정하지 않았기 때문에 manual 로 지정

trigger-dagrun-op

python-op

profile
데이터의 모든 것을 추구합니다.

0개의 댓글