trigger rule

yoon__0_0·2024년 6월 17일
0

이어드림 수업

목록 보기
66/103

trigger rule

  • 상위 task의 상태에 따라 하위 task의 동작을 결정하고 싶을때 적용할 수 있는 파라미터
  • 하위 task 에 파라미터를 지정하여 동작 제어
  • 하위 task에게 주는 것
  • 기본적으로는 all_success로 되어있기 때문에 상위가 잘 돌아야 하위가 돌 수 있는 것

종류

실습

1) all_done

  • bash_upstream_1 : 성공
  • python_upstream_1 : 무조건 fail
  • python_upstream_2 : 성공
  • python_downstream_1 : trigger를 주고 모두 실행이 되면 돌게 해라.
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from airflow.exceptions import AirflowException

import pendulum

with DAG(
    dag_id='dags_python_with_trigger_rule_eg1',
    start_date=pendulum.datetime(2024, 6, 17, tz='Asia/Seoul'),
    schedule=None,
    catchup=False
) as dag:
    bash_upstream_1 = BashOperator(
        task_id='bash_upstream_1',
        bash_command='echo upstream1'
    )

    @task(task_id='python_upstream_1')
    def python_upstream_1():
        raise AirflowException('downstream_1 Exception!')

    @task(task_id='python_upstream_2')
    def python_upstream_2():
        print('정상 처리')
	
 	# all_done : 성공이건 실패건 돈다 
    @task(task_id='python_downstream_1', trigger_rule='all_done')
    def python_downstream_1():
        print('정상 처리')

    [bash_upstream_1, python_upstream_1(), python_upstream_2()] >> python_downstream_1()
  • 결과
    • 한개가 실패해도 downstream 은 실행되는 것을 알 수 있음.

2) none_skipped

  • a,b,c 중 하나만 선택해서 선택된 task만 돌리기
  • skip 된것이 없어야만 실행되도록 만들기 (none_skipped 사용)
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

import pendulum

with DAG(
    dag_id='dags_python_with_trigger_rule_eg2',
    start_date=pendulum.datetime(2024, 6, 17, tz='Asia/Seoul'),
    schedule=None,
    catchup=False
) as dag:
    @task.branch(task_id='branching')
    def random_branch():
        import random
        item_lst = ['A', 'B', 'C']
        selected_item = random.choice(item_lst)
        if selected_item == 'A':
            return 'task_a'
        elif selected_item == 'B':
            return 'task_b'
        elif selected_item == 'C':
            return 'task_c'

    task_a = BashOperator(
        task_id='task_a',
        bash_command='echo upstream1'
    )

    @task(task_id='task_b')
    def task_b():
        print('정상 처리')

    @task(task_id='task_c')
    def task_c():
        print('정상 처리')

    @task(task_id='task_d', trigger_rule='none_skipped')
    def task_d():
        print('정상 처리')

    random_branch() >> [task_a, task_b(), task_c()] >> task_d()
  • 결과
    • random으로 b가 선택되었음.
    • 결국 task_d()는 돌지 않음. : skipped 된 상태로 됨.
profile
신윤재입니다

0개의 댓글