Airflow 분기처리(branch)

BAO.DE·2025년 8월 27일

Apache Airflow

목록 보기
13/20

Branch PythonOperator 개념

DAG 실행 중 특정 조건에 따라 다음에 실행할 태스크를 동적으로 선택할 때 사용(분기처리)

task1의 결과에따라 그다음 taska , taskb , taskc를 결정

    def select_random():
        import random
        item_list = ['A','B','C']
        select_item = random.choice(item_list)
        if select_item == 'A':
            return 'task_a'
        elif select_item in ['B','C']:
            return ['task_b','task_c']
    
    branch_task = BranchPythonOperator(
        task_id = "python_branch_task",
        python_callable= select_random
    )

    
    
    예시의 함수처럼 random 결과에 따라 실행될 task를 결정 
# 실제 작업을 수행할 함수들
def run_task_a():
    print(">>> Running Task A logic")

def run_task_b():
    print(">>> Running Task B logic")

def run_task_c():
    print(">>> Running Task C logic")

def end_task():
    print(">>> DAG finished successfully")

Dag 작성

dag: ## 실행할 bash 명령어
      # 브랜치 태스크
    branch_task = BranchPythonOperator(
        task_id="python_branch_task",
        python_callable=select_random
    )

    # 브랜치 이후 태스크들
    task_a = PythonOperator(
        task_id='task_a',
        python_callable=run_task_a
    )

    task_b = PythonOperator(
        task_id='task_b',
        python_callable=run_task_b
    )

    task_c = PythonOperator(
        task_id='task_c',
        python_callable=run_task_c
    )

    # 브랜치 이후 모든 경로를 합칠 엔드 태스크
    task_end = PythonOperator(
        task_id='end',
        python_callable=end_task,
        trigger_rule='none_failed_or_skipped'  # 일부 태스크가 skipped 되어도 실행
    )

    branch_task >> [task_a, task_b, task_c]
    task_a >> task_end
    task_b >> task_end
    task_c >> task_end

그림으로 도식화 해보자면 이런 task 형태일것이다.

       +----------------+
       | branch_task    |
       +----------------+
         /      |       \
        /       |        \
       v        v         v
  +---------+ +---------+ +---------+
  | task_a  | | task_b  | | task_c  |
  +---------+ +---------+ +---------+
       \        |        /
        \       |       /
         v      v      v
       +----------------+
       |   task_end     |
       +----------------+

task - graph

실행결과

python_branch_task >> task_a >> end
task b , task c >> skipped

3번째는 python_branch_task >> task_b,task_c >> end
결과를 반환

0개의 댓글