Airflow Study17- Trigger Rule

박성현·2024년 6월 8일
0

Airflow

목록 보기
24/28
post-thumbnail

Trigger란 ?
앞서 공부한 Branch 와 반대로, 하위 task가 동작하기 위한 룰로 이해

Airflow Trigger Rule 정리

Trigger Rule 설명
all_success (기본값) 상위 Task가 모두 성공하면 실행
all_failed 상위 Task가 모두 실패하면 실행
all_done 상위 Task가 모두 수행되면 실행 (실패도 수행된 것에 포함)
all_skipped 상위 Task가 모두 Skipped 상태면 실행
one_failed 상위 Task 중 하나 이상 실패하면 실행 (모든 상위 Task 완료를 기다리지 않음)
one_success 상위 Task 중 하나 이상 성공하면 실행 (모든 상위 Task 완료를 기다리지 않음)
one_done 상위 Task 중 하나 이상 성공 또는 실패하면 실행
none_failed 상위 Task 중 실패가 없는 경우 실행 (성공 또는 Skipped 상태)
none_failed_min_one_success 상위 Task 중 실패가 없고 성공한 Task가 적어도 1개 이상이면 실행
none_skipped Skip된 상위 Task가 없으면 실행 (상위 Task가 성공, 실패여도 무방)
always 언제나 실행

all_success vs none_failed
: skipped여부 차이


실습1 ( all_done)

상위 Task가 모두 수행되면 실행 (실패도 수행된 것에 포함)
설정 : [1,2,3] >> 4 ,
(1,3) 성공, 2 fail , all_done이니 성공/실패 상관없이 skip만 없으면 수행

bash_upstream_1 = BashOperator(
        task_id = 'bash_upstream_1',
        bash_command='echo upstream1'
    )
    
    @task(task_id = 'python_upstream_1')
    def python_upstream_1():
        raise AirflowException('downstream_1 Exception! ')
    
    @task(task_id ='python_upstream_2')
    def python_upstream_2():
        print('2번 정상처리')
        
    @task(task_id = 'python_downstream_1',trigger_rule='all_done')
    def python_downstream_1():
        print('down 정상처리 ')
        
    
    [bash_upstream_1,python_upstream_1(),python_upstream_2()] >> python_downstream_1()


실습2 ( none-skipped )

Skip된 상위 Task가 없으면 실행 (상위 Task가 성공, 실패여도 무방)
설정 : branch에 의해 1,2,3중 하나만 선택되고 나머진 2개 스킵, downstream task수행여부 안됨

    @task.branch(task_id = 'branch_task1')
    def select_random():
        from random import choice
        lst = ['1','2','3']
        item = choice(lst)
        
        if item == '1':
            return 'upstream_task_1'
        elif item =='2':
            return 'upstream_task_2'
        else :
            return 'upstream_task_3'
        
    @task(task_id = 'upstream_task_1')
    def python_upstream_1():
        print('upstream_task_1')
    
    @task(task_id = 'upstream_task_2')
    def python_upstream_2():
        print('upstream_task_2')
    
    @task(task_id = 'upstream_task_3')
    def python_upstream_3():
        print('upstream_task_3')
    
    
    @task(task_id = 'downstream_task_1', trigger_rule = 'none_skipped')
    def python_downstream_1():
        print('downstream_task_1')
    
    
    select_random() >> [python_upstream_1(), python_upstream_2(),python_upstream_3()] >> python_downstream_1()
    

profile
다소Good한 데이터 엔지니어

0개의 댓글