task 분기처리

yoon__0_0·2024년 6월 17일
0

이어드림 수업

목록 보기
65/103

분기 처리가 필요한 이유

  • task1 의 결과에 따라 task2-x중 하나만 수행하도록 구성해야 할때

분기처리 방법

1) BranchPythonOperator
2) task.branch 데코레이터
3) BaseBranchOperator 상속하여 직접개발

BranchPythonOperator

  • a가 선택되면 task_a가 실행되고, b혹은 c가 선택이 된다면 task_b, task_c가 모두 실행됨
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator

with DAG(
    dag_id='dags_branch_python_operator',
    start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'), 
    schedule='0 1 * * *',
    catchup=False
) as dag:
    def select_random():
        import random

        item_lst = ['A','B','C']
        selected_item = random.choice(item_lst)
        if selected_item == 'A':
            return 'task_a' # 이 값의 task_id를 가진 task를 실행하기로 함. 
        elif selected_item in ['B','C']:
            return ['task_b','task_c']
    
    python_branch_task = BranchPythonOperator(
        task_id='python_branch_task',
        python_callable=select_random # 분기할 로직이 담긴 함수 
    )
    
    def common_func(**kwargs):
        print(kwargs['selected'])

    task_a = PythonOperator(
        task_id='task_a',
        python_callable=common_func,
        op_kwargs={'selected':'A'}
    )

    task_b = PythonOperator(
        task_id='task_b',
        python_callable=common_func,
        op_kwargs={'selected':'B'}
    )

    task_c = PythonOperator(
        task_id='task_c',
        python_callable=common_func,
        op_kwargs={'selected':'C'}
    )

    python_branch_task >> [task_a, task_b, task_c]
    
  • 결과
    선택되지 않은 것은 skipped 상태가 됨.

task.branch 데코레이터

  • 좀더 간편하게 실행 할 수 있음.
from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
from airflow.decorators import task

with DAG(
    dag_id='dags_python_with_branch_decorator',
    start_date=datetime(2024,6,17),
    schedule=None,
    catchup=False
) as dag:
    @task.branch(task_id='python_branch_task')
    def select_random():
        import random
        item_lst = ['A', 'B', 'C']
        selected_item = random.choice(item_lst)
        if selected_item == 'A':
            return 'task_a'
        elif selected_item in ['B','C']:
            return ['task_b','task_c']
    
    def common_func(**kwargs):
        print(kwargs['selected'])

    task_a = PythonOperator(
        task_id='task_a',
        python_callable=common_func,
        op_kwargs={'selected':'A'}
    )

    task_b = PythonOperator(
        task_id='task_b',
        python_callable=common_func,
        op_kwargs={'selected':'B'}
    )

    task_c = PythonOperator(
        task_id='task_c',
        python_callable=common_func,
        op_kwargs={'selected':'C'}
    )

    select_random() >> [task_a, task_b, task_c]

BaseBranchOperator 상속하여 직접개발

  • context 변수를 직접 컨트롤 할 수 있다는 것이 장점
from airflow import DAG
import pendulum
from airflow.operators.branch import BaseBranchOperator
from airflow.operators.python import PythonOperator

with DAG(
    dag_id='dags_base_branch_operator',
    start_date=pendulum.datetime(2024,6,17, tz='Asia/Seoul'),
    schedule=None,
    catchup=False
) as dag:
    class CustomBranchOperator(BaseBranchOperator):
        def choose_branch(self, context):
            import random
            print(context)
            
            item_lst = ['A', 'B', 'C']
            selected_item = random.choice(item_lst)
            if selected_item == 'A':
                return 'task_a'
            elif selected_item in ['B','C']:
                return ['task_b','task_c']

    
    custom_branch_operator = CustomBranchOperator(task_id='python_branch_task')

    
    def common_func(**kwargs):
        print(kwargs['selected'])

    task_a = PythonOperator(
        task_id='task_a',
        python_callable=common_func,
        op_kwargs={'selected':'A'}
    )

    task_b = PythonOperator(
        task_id='task_b',
        python_callable=common_func,
        op_kwargs={'selected':'B'}
    )

    task_c = PythonOperator(
        task_id='task_c',
        python_callable=common_func,
        op_kwargs={'selected':'C'}
    )

    custom_branch_operator >> [task_a, task_b, task_c]
profile
신윤재입니다

0개의 댓글