섹션6: BranchPython 오퍼레이터로 분기처리하기

류홍규·2023년 8월 13일
0

airflow

목록 보기
6/18
post-thumbnail

1. Task 분기 처리 방법

Task 분기처리, 왜 필요한가?

  • task1의 결과에 따라 task2-x 중 하나만 수행하도록 구성해야 할 때
    예) 상위에 있는 task1의 결과로 'GOOD', 'BAD', 'Pending'이라는 결과 3개 중 하나가 나오고 그에 따라 task2-1 ~ task 2-3 중 하나가 실행되도록 해야할 경우

Task 분기 처리 방법

  • 1) BrachPythonOperator
  • 2) task.branch 데커레이터 이용
  • 3) BaseBranchOperator 상속하여 직접 개발

2. BranchPythonOperator

  • 선행 task를 정의해준뒤, 후행 task를 python_callable에 함수로 적고, 리스트로 같은 레벨로 묶어주면 된다.
  • 주의해야할 점은, python_callable에 작성할 함수의 return값이 후행 task이름 이기 때문에 꼼꼼히 작성해야 한다.
  • 함수 작성시, 로직에서 두개 후행 task를 동시에 수행시키고 싶다면 리스트 형태로 후행 task의 이름을 전달해주면 된다.
  • 후행 task이름은 String 값으로 작성해주면 된다.

from airflow import DAG
import pendulum
from airflow.operators.python import BranchPythonOperator, PythonOperator

with DAG(
    dag_id="dags_branch_python_operator",
    start_date=pendulum.datetime(2023, 8, 1, tz='Asia/Seoul'),
    schedule=None,
    catchup=False
) as dag:
    def select_random():
        import random
        item_lst = ['A', 'B', 'C']
        selected_item = random.choice(item_lst)
        if selected_item == 'A':
            return 'task_a'
        elif selected_item in ['B', 'C']:
            return ['task_b', 'task_c']
    python_branch_task = BranchPythonOperator(
        task_id="python_branch_task",
        python_callable=select_random
    )
    def common_func(**kwargs):
        print(kwargs['selected'])
    task_a = PythonOperator(
        task_id='task_a',
        python_callable=common_func,
        op_kwargs={'selected': 'A'}
    )
    task_b = PythonOperator(
        task_id='task_b',
        python_callable=common_func,
        op_kwargs={'selected': 'B'}
    )
    task_c = PythonOperator(
        task_id='task_c',
        python_callable=common_func,
        op_kwargs={'selected': 'C'}
    )
    
    # task flow
    python_branch_task >> [task_a, task_b, task_c]

첫번째로 trigger DAG를 수행한 결과


랜덤으로 뽑은 값이 select_item == 'A'를 만족함에 따라 task_a만이 수행되었다. task_b와 task_c는 skipped 되었음을 확인할 수 있다.


xcom을 확인해본 결과, return_value에 task_a가 들어가게 되었다.


로그를 살펴보면 xcom에서 following라는 key값으로 task_a가 들어가게 되었음을 확인할 수 있고, task_b와, task_c는 스킵처리 되었음을 확인할 수 있다.

trigger DAG를 여러번 실행해 본 결과

profile
공대생의 코딩 정복기

0개의 댓글