[Airflow] Branch Operator

minyeamer·2025년 6월 6일
0

Apache Airflow 배우기

목록 보기
7/13
post-thumbnail

Branching

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#branching

  • 특정 Task의 결과에 따라 하위 Task를 선별해서 수행시키고 싶을 때 사용

branching

BranchPythonOperator

Branching 활용

  • BranchPythonOperator 에서 랜덤한 조건에 따라 task_a 만 수행하거나, task_btask_c 를 같이 수행하는 분기 처리
# dags/branch_python.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.operators.python import BranchPythonOperator
import pendulum

with DAG(
    dag_id="branch_python",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    schedule="0 0 * * *",
    catchup=False,
    tags=["example", "branch"],
) as dag:
    def select_random():
        import random

        item_lst = ['A','B','C']
        selected_item = random.choice(item_lst)
        if selected_item == 'A':
            return "task_a"
        else:
            return ["task_b","task_c"]

    branch_task = BranchPythonOperator(
        task_id="branch_task",
        python_callable=select_random
    )

    def print_selected(**kwargs):
        print(kwargs["selected"])

    task_a = PythonOperator(
        task_id="task_a",
        python_callable=print_selected,
        op_kwargs={"selected":'A'}
    )

    task_b = PythonOperator(
        task_id="task_b",
        python_callable=print_selected,
        op_kwargs={"selected":'B'}
    )

    task_c = PythonOperator(
        task_id="task_c",
        python_callable=print_selected,
        op_kwargs={"selected":'C'}
    )

    branch_task >> [task_a, task_b, task_c]

Branching 결과

  • 여러 번 Trigger하여 실행했는데, 의도대로 task_a 만 수행되거나, task_btask_c 가 같이 수행되는 두 가지 경우를 확인
task_atask_bc
  • 또한, task_a 가 선택되는 작업에서 XCom을 보면 skipmixin_key 키로 {'followed': ['task_a']} 값이 전달되는데, 이를 통해 다른 Task에서도 어떤 분기 처리가 되었는지 확인 가능

skipmixin_key

  • 마찬가지로 실행 로그에서도 어떤 Task가 선택되었고, 어떤 Task가 Skip되었는지 조회 가능
# branch_task

[2025-06-06, 11:19:07] INFO - Done. Returned value was: task_a: source="airflow.task.operators.airflow.providers.standard.operators.python.BranchPythonOperator"
[2025-06-06, 11:19:07] INFO - Branch into task_a: source="airflow.task.operators.airflow.providers.standard.operators.python.BranchPythonOperator"
[2025-06-06, 11:19:07] INFO - Following branch {'task_a'}: source="airflow.task.operators.airflow.providers.standard.operators.python.BranchPythonOperator"
[2025-06-06, 11:19:07] INFO - Skipping tasks [('task_b', -1), ('task_c', -1)]: source="airflow.task.operators.airflow.providers.standard.operators.python.BranchPythonOperator"
[2025-06-06, 11:19:07] INFO - Skipping downstream tasks.: source="task"

@task.branch

  • BranchPythonOperator 대신에 @task.branch 데코레이터를 써서 아래와 같이 표현도 가능
  • 함수를 호출하여 종속성 표현 (select_random() >> [task_a, task_b, task_c])
# dags/branch_python_decorator.py

from airflow.sdk import DAG, task
from airflow.providers.standard.operators.python import PythonOperator
import pendulum

with DAG(
    dag_id="branch_python_decorator",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    schedule="0 0 * * *",
    catchup=False,
    tags=["example", "branch"],
) as dag:
    @task.branch(task_id="branch_task")
    def select_random():
        import random

        item_lst = ['A','B','C']
        selected_item = random.choice(item_lst)
        if selected_item == 'A':
            return "task_a"
        else:
            return ["task_b","task_c"]

    def print_selected(**kwargs):
        print(kwargs["selected"])

    task_a = PythonOperator(
        task_id="task_a",
        python_callable=print_selected,
        op_kwargs={"selected":'A'}
    )

    task_b = PythonOperator(
        task_id="task_b",
        python_callable=print_selected,
        op_kwargs={"selected":'B'}
    )

    task_c = PythonOperator(
        task_id="task_c",
        python_callable=print_selected,
        op_kwargs={"selected":'C'}
    )

    select_random() >> [task_a, task_b, task_c]

BaseBranchOperator

https://airflow.apache.org/docs/apache-airflow/2.9.2/_api/airflow/operators/branch/index.html#airflow.operators.branch.BaseBranchOperator

  • Branching 기능을 제공하는 Operator의 기본 클래스
  • 해당 클래스를 상속받을 경우 choose_branch(self, context) 메서드를 구현해야 하고, 분기 처리 로직을 통해 선택되어야 할 Task를 한 개(문자열) 또는 여러 개(리스트)로 반환해야 함
# dags/branch_base.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.branch import BaseBranchOperator
from airflow.providers.standard.operators.python import PythonOperator
import pendulum

with DAG(
    dag_id="branch_base",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    schedule="0 0 * * *",
    catchup=False,
    tags=["example", "branch"],
) as dag:
    class CustomBranchOperator(BaseBranchOperator):
        def choose_branch(self, context):
            import random

            item_lst = ['A','B','C']
            selected_item = random.choice(item_lst)
            if selected_item == 'A':
                return "task_a"
            else:
                return ["task_b","task_c"]

    custom_branch_task = CustomBranchOperator(task_id="custom_branch_task")

    def print_selected(**kwargs):
        print(kwargs["selected"])

    task_a = PythonOperator(
        task_id="task_a",
        python_callable=print_selected,
        op_kwargs={"selected":'A'}
    )

    task_b = PythonOperator(
        task_id="task_b",
        python_callable=print_selected,
        op_kwargs={"selected":'B'}
    )

    task_c = PythonOperator(
        task_id="task_c",
        python_callable=print_selected,
        op_kwargs={"selected":'C'}
    )

    custom_branch_task >> [task_a, task_b, task_c]

Trigger Rule

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#trigger-rules

  • 상위 Task들의 상태에 따라 수행여부를 결정하고 싶을 때 사용
  • 기본적으로는 상위 Task가 모두 성공해야 실행

trigger

Trigger Rule 종류

옵션설명
all_success기본값, 상위 Task가 모두 성공하면 실행
all_failed상위 Task가 모두 failed 상태면 실행
all_done상위 Task가 모두 수행되면 실행 (성공 또는 실패)
all_skipped상위 Task가 모두 skipped 상태면 실행
one_failed상위 Task 중 하나 이상 실패하면 실행
one_success상위 Task 중 하나 이상 성공하면 실행
one_done상위 Task 중 하나 이상 수행되면 실행 (성공 또는 실패)
none_failed상위 Task 중에 failed 상태가 없으면 실행
none_failed_min_one_success상위 Task 중에 failed 상태가 없고 성공한 Task가 1개 이상이면 실행
none_skipped상위 Task 중에 skipped 상태가 없으면 실행
always항상 실행

all_done 예시

  • all_done 의 동작을 확인하기 위한 예시 DAG 작성
  • 3개의 상위 Task 중 2번째 Task에서 의도적으로 예외를 발생시켜서 failed 상태를 유발
  • 하위 Task downstream_tasktrigger_rule 파라미터로 all_done 전달
# dags/trigger_rule1.py

from airflow.sdk import DAG, task
from airflow.exceptions import AirflowException
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id="trigger_rule1",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    schedule="0 0 * * *",
    catchup=False,
    tags=["example", "branch"],
) as dag:
    upstream_task1 = BashOperator(
        task_id="upstream_task1",
        bash_command="echo upstream1"
    )

    @task(task_id="upstream_task2")
    def upstream_task2():
        raise AirflowException("upstream2 Exception")

    @task(task_id="upstream_task3")
    def upstream_task3():
        print("정상 처리")

    @task(task_id="downstream_task", trigger_rule="all_done")
    def downstream_task():
        print("정상 처리")

    [upstream_task1, upstream_task2(), upstream_task3()] >> downstream_task()
  • all_done 은 상위 Task가 성공 또는 실패 여부에 관계없이 모두 수행되면 실행하는 옵션으로, upstream_task2 가 실패 처리되어도 downstream_task 가 수행되는 모습을 확인

trigger-ex1

none_skipped 예시

  • none_skipped 의 동작을 확인하기 위한 예시 DAG 작성
  • 3개의 상위 Task 중 랜덤한 한 Task만 수행하고 나머지 Task에선 skipped 상태를 유발
  • 하위 Task downstream_tasktrigger_rule 파라미터로 none_skipped 전달
# dags/trigger_rule2.py

from airflow.sdk import DAG, task
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id="trigger_rule2",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    schedule="0 0 * * *",
    catchup=False,
    tags=["example", "branch"],
) as dag:
    @task.branch(task_id="branching")
    def random_branch():
        import random

        item_lst = ['A','B','C']
        selected_item = random.choice(item_lst)
        if selected_item == 'A':
            return "upstream_task_a"
        elif selected_item == 'B':
            return "upstream_task_b"
        elif selected_item == 'C':
            return "upstream_task_c"

    upstream_task_a = BashOperator(
        task_id="upstream_task_a",
        bash_command="echo upstream1"
    )

    @task(task_id="upstream_task_b")
    def upstream_task_b():
        print("정상 처리")

    @task(task_id="upstream_task_c")
    def upstream_task_c():
        print("정상 처리")

    @task(task_id="downstream_task", trigger_rule="none_skipped")
    def downstream_task():
        print("정상 처리")

    random_branch() >> [upstream_task_a, upstream_task_b(), upstream_task_c()] >> downstream_task()
  • none_skipped 은 상위 Task가 skipped 상태가 아니어야 실행하는 옵션으로, upstream_task1 만 성공하고 나머지는 skipped 처리되었기 때문에, downstream_task 도 수행되지 못하고 skipped 처리

trigger-ex1

TaskGroup

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#taskgroups

  • 여러 Task들을 그룹화하는 개념
  • UI 상에서 Task들을 모아서 편하게 보고 관리하기 쉽게 하기 위한 목적

task-group

TaskGroup 활용

  • @task_group 데코레이터 또는 TaskGroup 클래스를 활용하여 TaskGroup을 구현
  • docstring을 추가해 Airflow UI에서 TaskGroup에 대한 Tooltip을 표시
    • 또는, tooltip 파라미터로 UI에 표시할 내용을 전달할 수도 있음 (파라미터가 docstring보다 우선)
# dags/task_group.py

from airflow.sdk import DAG, task, task_group, TaskGroup
from airflow.providers.standard.operators.python import PythonOperator
import pendulum

with DAG(
    dag_id="task_group",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    schedule="0 0 * * *",
    catchup=False,
    tags=["example", "branch"],
) as dag:
    def inner_function2(**kwargs):
        msg = kwargs.get("msg") or str() 
        print(msg)

    @task_group(group_id="first_group")
    def first_group():
        """ 첫 번째 TaskGroup 에 대한 Tooltip 입니다. """

        @task(task_id="inner_function1")
        def inner_function1(**kwargs):
            print("첫 번째 TaskGroup 내 첫 번째 Task 입니다.")

        inner_function2 = PythonOperator(
            task_id="inner_function2",
            python_callable=inner_function2,
            op_kwargs={"msg":"첫 번째 TaskGroup 내 두 번째 Task 입니다."}
        )

        inner_function1() >> inner_function2

    with TaskGroup(group_id="second_group", tooltip="두 번째 TaskGroup 에 대한 Tooltip 입니다.") as second_group:
        """ tooltip 파라미터의 내용이 우선적으로 표시됩니다. """
        @task(task_id="inner_function1")
        def inner_function1(**kwargs):
            print("두 번째 TaskGroup 내 첫 번째 Task 입니다.")

        inner_function2 = PythonOperator(
            task_id="inner_function2",
            python_callable=inner_function2,
            op_kwargs={"msg": "두 번째 TaskGroup 내 두 번째 Task 입니다."}
        )

        inner_function1() >> inner_function2

    first_group() >> second_group

TaskGroup 조회

  • DAG 실행 후 Graph View에서 두 개의 TaskGroup을 확인
  • 기대와 다르게 지정한 Tooltip이 표시되지 않았는데, Airflow 3.0 버전의 버그인 것으로 추정

task-group1

  • TaskGroup을 클릭하면 펼쳐지면서 내부 Task를 표시

task-group2

Edge Label

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#edge-labels

  • Task 연결에 대한 설명을 추가하는 개념
  • Task 종속성을 나타내는 >> 또는 << 연산자 사이에 Label 을 추가

Edge Label 활용

  • 첫 번째 Label은 두 개의 단일 Task 사이를 연결
  • 두 번째와 세 번째 Label은 Branch의 시작과 끝을 각각 연결
# dags/edge_label.py

from airflow.sdk import DAG, Label
from airflow.providers.standard.operators.empty import EmptyOperator
import pendulum

with DAG(
    dag_id="edge_label",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    schedule="0 0 * * *",
    catchup=False,
    tags=["example", "branch"],
) as dag:
    empty_1 = EmptyOperator(
        task_id="empty_1"
    )

    empty_2 = EmptyOperator(
        task_id="empty_2"
    )

    empty_1 >> Label("라벨") >> empty_2

    empty_3 = EmptyOperator(
        task_id="empty_3"
    )

    empty_4 = EmptyOperator(
        task_id="empty_4"
    )

    empty_5 = EmptyOperator(
        task_id="empty_5"
    )

    empty_6 = EmptyOperator(
        task_id="empty_6"
    )

    empty_2 >> Label("브랜치 시작") >> [empty_3,empty_4,empty_5] >> Label("브랜치 종료") >> empty_6

Edge Label 조회

  • Airflow UI의 Graph View에서 Edge Label을 확인
  • Branch 연결에 대해서는 모든 연결에 동일한 내용의 Label을 표시

edge-label

profile
데이터의 모든 것을 추구합니다.

0개의 댓글