섹션7: Trigger Dag Run 오퍼레이터

류홍규·2023년 8월 15일
0

airflow

목록 보기
7/18
post-thumbnail

1. DAG 간 의존관계 설정

DAG 의존관계 설정 방법

(1) TriggerDagRun 오퍼레이터

  • 파라미터에 어떤 DAG을 Trigger를 할지 dag_id를 넣어주면 된다.

(2) ExternalTask 센서


비교TriggerDagRun 오퍼레이터ExternalTask 센서
방식실행할 다른 DAG의 ID를 지정하여 수행한다.본 Task가 수행되기 전 다른 DAG의 완료를 기다린 후에 수행된다.
권고 적용시점Trigger 되는 DAG의 선행 DAG가 하나만 있을 경우Trigger 되는 DAG의 선행 DAG가 2개 이상인 경우

Airflow_trigger_dag_run 오퍼레이터 가이드

  • trigger_dag_id (str) – The dag_id to trigger (templated).
    - 어떤 dag를 trigger할 것인지 명시해주는 파라미터
    - Triggers a DAG run for a specified dag_id. (필수값)
  • trigger_run_id (str | None) – The run ID to use for the triggered DAG run (templated). If not provided, a run ID will be automatically generated.
    - Run_id 값 직접 지정함
  • execution_date
    - 값을 넣으면, manual(수동실행)으로 간주되어, manual__{{execution_date}}로 수행된다.
  • reset_dag_run (bool) – Whether clear existing dag run if already exists. This is useful when backfill or rerun an existing dag run. This only resets (not recreates) the dag run.
    - 이미 run_id값이 있는 경우에도 다시 재수행할 것인지?

t1 -> t2 -> t3
-> C

만약, t2가 C를 Success로 완료하고, t2가 Success로 완료된 후에 t3를 trigger하고 싶다면, 아래와 같은 파라미터를 쓴다.


  • wait_for_completion (bool) – Whether or not wait for dag run completion. (default: False)
    - 값을 True로 주면, 해당 dag이 Success(완료)된 시점에 t3가 실행되게 된다.
  • poke_interval (int) – Poke interval to check dag run status when wait_for_completion=True. (default: 60)
    - C라는 선행 Dag이 완료가 되었는지 확인하기 위한 주기(Period)
  • allowed_states (list | None) – List of allowed states, default is ('success').
    - t2라는 Dag이 Success라는 것으로 되려면, C라는 Dag가 어떤 상태로 끝나야하는지?
  • failed_states (list | None) – List of failed or dis-allowed states, default is None.
    - t2라는 Dag이 Fail로 되려면, C라는 Dag가 어떤 상태로 끝나야 하는지?

TriggerDagRun 오퍼레이터 _ run_id란?

  • DAG간 수행방식과 시간을 유일하게 식별해주는 키
  • 같은 시간이라 해도 수행방식(Schedule, manual(수동), Backfill)에 따라 키가 달라진다.
  • 스케줄에 의해 실행된 경우 Scheduled__{{data_interval_start}} 값을 가진다.

from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

with DAG(
    dag_id="dags_trigger_dag_run_operator",
    start_date=pendulum.datetime(2023, 8, 1, tz="Asia/Seoul"),
    schedule="30 9 * * *",
    catchup=False
) as dag:
    
    start_task = BashOperator(
        task_id="start_task",
        bash_command='echo "start!"',
    )
    trigger_dag_task = TriggerDagRunOperator(
        task_id='trigger_dag_task',
        trigger_dag_id='dags_python_operator',
        trigger_run_id=None,
        execution_date='{{data_interval_start}}',
        reset_dag_run=True,
        wait_for_completion=False,
        poke_interval=60,
        allowed_states=['Success'],
        failed_states=None
    )

start_task >> trigger_dag_task

PythonOperator는 매일 6시 30분마다 Trigger되는 Dag이다.
-> 이 Dag를 Manual로 하게 되면, execution_date는 manual__2023-08-15T12:37:45.251160+00:00 로 나오게 된다. 이때, 이 시간은 UTC 기준이므로 9시간을 더해주면, 현재 시간이 나온다. 현재 시간은 2023년 8월 15일 오전 9시 37분 45초이다.

trigger_dag_task = TriggerDagRunOperator(
        task_id='trigger_dag_task',
        trigger_dag_id='dags_python_operator',
        trigger_run_id=None,
        execution_date='{{data_interval_start}}',

그럼, 아까 만들어줬던 dags_trigger_dag_run_operator로 가서 Data_interval_start를 확인해보자.

매일 오전 9시 30분으로 schedule를 설정했기 때문에, Data_interval_start는 데이터 관점에서 이전에 수행되었던 날짜인 8월 14일 오전 9시 30분이 나왔다. 가장 최근에 수행된 날짜는 Data_interval_end로 8월 15일 오전 9시 30분이다.

  • 해당 날짜는 trigger가 된, dags_python_operator에 그대로 전달된다.

    Run_id를 확인해보면, manual__2023-08-14T00:30:00+00:00 임을 확인할 수 있다.
profile
공대생의 코딩 정복기

0개의 댓글