Airflow Study20- TriggerDagRun Operator

박성현·2024년 6월 10일

Airflow

목록 보기
27/28

DAG 의존관계 설정 방법

1) TriggerDagRun Operator

다른 DAG을 실행시키는 오퍼레이터

2) External 센서

다른 DAG의 Task의 완료를 기다리는 센서


비교TriggerDagRun오퍼레이터ExternalTask센서
방식실행할 다른 DAG의 ID를 지정하여 수행본 Task가 수행되기 전 다른 DAG의 완료를 기다린 후 수행
권고사용시점Trigger 되는 DAG의 선행 DAG이 하나만 있을 경우Trigger되는 DAG의 선행 DAG이 2개 이상인 경우

쉽게 말해 선 후 관계가 1:1인경우 TriggerDagRun, N:1인 경우 Sensor사용


TriggerDagRun Operator 실습

trigger_dag_task = TriggerDagRunOperator(
	task_id='trigger_dag_task', 
    trigger_dag_id='dags_python_operator',
    trigger_run_id=None,
    execution_date='{{data_interval_start}}',
    reset_dag_run=True,
    wait_for_completion=False,
    poke_interval=60,
    allowed_states=['success'],
    failed_states=None
)
  1. task_id (필수):
    트리거 작업의 고유한 ID를 지정합니다. DAG 내에서 다른 task와 구별하기 위해 명확하고 의미 있는 이름을 사용하는 것이 좋습니다. 예시에서는 "trigger_dag_task"로 설정되어 있습니다.

  2. trigger_dag_id (필수):
    실행할 대상 DAG의 ID를 지정합니다. Airflow 웹 UI 또는 코드에서 확인할 수 있습니다. 예시에서는 "dags_python_operator" DAG를 트리거하도록 설정되어 있습니다.

  3. trigger_run_id (선택 사항):
    트리거된 DAG 실행에 대한 고유한 ID를 지정합니다. 기본값은 None이며, 이 경우 Airflow가 자동으로 UUID를 생성합니다. 특정 DAG 실행을 추적하거나 구분해야 할 때 유용합니다.

  4. execution_date (선택 사항):
    트리거되는 DAG의 실행 날짜 및 시간을 지정합니다. 기본값은 None이며, 이 경우 현재 시간을 사용합니다. Jinja 템플릿을 사용하여 동적으로 값을 설정할 수 있습니다. 예시에서는 {{ data_interval_start }}를 사용하여 현재 DAG의 데이터 간격 시작 시간을 사용합니다.

  5. reset_dag_run (선택 사항):
    True로 설정하면 트리거된 DAG의 기존 실행 기록을 재설정하고 새로 실행합니다. False (기본값)로 설정하면 기존 실행 기록을 유지하고, 이미 실행된 경우 다시 실행하지 않습니다.

  6. wait_for_completion (선택 사항):
    True로 설정하면 트리거된 DAG의 실행이 완료될 때까지 현재 task가 대기합니다. False (기본값)로 설정하면 트리거 후 바로 다음 task로 진행합니다.

  7. poke_interval (선택 사항):
    wait_for_completion=True인 경우 트리거된 DAG의 상태를 확인하는 간격(초)을 지정합니다. 기본값은 60초입니다.

  8. allowed_states (선택 사항):
    트리거된 DAG의 실행 상태가 이 목록에 포함된 경우에만 현재 task가 성공으로 표시됩니다. 기본값은 ['success']입니다. 예시에서는 "success" 상태만 허용하도록 설정되어 있습니다.

  9. failed_states (선택 사항):
    트리거된 DAG의 실행 상태가 이 목록에 포함된 경우 현재 task가 실패로 표시됩니다. 기본값은 None이며, 이 경우 모든 실패 상태가 고려됩니다.

필수값 :
task_id : 공통 기본 task_id
trigger_dag_id : 수행시킬 dag의 id

run_id :
dag의 수행 방식(schedule, manual, backfill )과, 시간을 유일하게 식별해주는 키
ex) scheduled
{{data_interval_start}} , 유의사항이 end가 아니고 start시간임 (실행시간x)

Airflow의 TriggerDagRunOperator는 기본적으로 트리거할 DAG가 unpause 상태일 때만 실행됩니다. 만약 트리거할 DAG가 pause 상태라면, TriggerDagRunOperator는 DAG 실행을 트리거하지 않고 건너뜁니다

profile
다소Good한 데이터 엔지니어

0개의 댓글