[Airflow] Task 의 분기 처리와 Trigger Rule

식빵·2025년 6월 14일
0

Airflow

목록 보기
6/9
post-thumbnail

이 게시물은 airflow 2.10.5 버전을 사용해서 작성됐습니다.
3.x 버전과 조금 다를 수 있으니 유의하시기 바랍니다.

📌 Task 분기처리

가끔은 하나의 task 에서 처리된 결과에 따라 다음에 연결된
여러개의 task 들 중에서 특정 task 를 실행하고 싶을 때가 있습니다.

이럴 때는 Airflow 에서는 제공하는 3가지 방법 중 하나를 사용하면됩니다.
지금부터 하나하나 알아보죠.


1. BranchPythonOperator class

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator
import pendulum


with DAG(
    dag_id="dags_branch_sample1",
    start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
    schedule=None,
    catchup=False,
) as dag:

	# 랜덤하게 다음 task_id 를 고르는 메소드 생성
    # return 값이 뭐냐에 따라서 다음에 수행할 task 가 정해집니다!
    def select_next_task():
        import random
        random_num = random.randint(1,3) # 1 ~ 3 까지의 숫자를 조회한다.
        
        if random_num == 1:
            return "downstream_task1"
        elif random_num == 2:
            return "downstream_task2"
        elif random_num == 3:
        	# 한번에 2개 이상의 task 를 지정하려면 아래처럼 리스트로 반환!
            return ["downstream_task1", "downstream_task2"]

    random_select_task = BranchPythonOperator(
        task_id='random_select_task',
        python_callable=select_next_task
    )

	# random_num = 1 또는 random_num = 3 일 때 실행되는 task
    downstream_task1 = BashOperator(
        task_id="downstream_task1", bash_command='echo "I AM TASK 11111"'
    )

	# random_num = 2 또는 random_num = 3 일 때 실행되는 task
    downstream_task2 = BashOperator(
        task_id="downstream_task2", bash_command='echo "I AM TASK 22222"'
    )

    random_select_task >> [downstream_task1, downstream_task2]

DAG 를 실행해봅시다.


실행한 후 그래프를 보면 downstream_task1 은 skipped 되서 실행이 안됐고,
downstream_task2 만 실행된 것을 확인할 수 있습니다.

이말은 random_select_task 에서 사용하는 함수에서
return 값이 'downstream_task2' 여서 그런 겁니다.


BranchPythonOperator 의 함수에서 return 한 문자열이 위 그림처럼
XCOMskipmixin_key 에 들어가네요. Task 실행 흐름을 파악할 때 보면 좋겠네요 :)


별로 중요하진 않지만 downstream_task2 의 LOG 가 찍히는 것도 확인할 수 있습니다.


참고:
만약에 BranchPythonOperator["downstream_task1", "downstream_task2"]
처럼 리스트를 반환했다면 2개의 하위 task 가 실행됩니다.
그런 경우에 GraphBranchPythonOperatorXCOM 상태는 다음과 같습니다.



2. @task.branch 데코레이터

BranchPythonOperator 의 데코레이터 버전인 @task.branch 을 사용해서
Task 분기처리를 해보겠습니다. BranchPythonOperator 클래스 를 사용할 때와
동일하게 동작하도록 코드를 작성해봤습니다.

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator
import pendulum


with DAG(
    dag_id="dags_branch_sample1",
    start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
    schedule=None,
    catchup=False,
) as dag:

    @task.branch(task_id="random_select_task")
    def select_next_task():
        import random
        random_num = random.randint(1,3) # 1 ~ 3 까지의 숫자를 조회한다.
        if random_num == 1:
            return "downstream_task1"
        elif random_num == 2:
            return "downstream_task2"
        elif random_num == 3:
            return ["downstream_task1", "downstream_task2"]

    downstream_task1 = BashOperator(
        task_id="downstream_task1", bash_command='echo "I AM TASK 11111"'
    )

    downstream_task2 = BashOperator(
        task_id="downstream_task2", bash_command='echo "I AM TASK 22222"'
    )

    select_next_task() >> [downstream_task1, downstream_task2]

동작의 결과는 BranchPythonOperator 동일하므로 따로 실행을 결과를 보지는 않겠습니다.



3. BaseBranchOperator 상속

마지막으로 BaseBranchOperator 를 상속하는 클래스를 생성하는 방법입니다.

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.branch import BaseBranchOperator
import pendulum

with DAG(
    dag_id="dags_branch_sample1",
    start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
    schedule=None,
    catchup=False,
) as dag:

    class RandomNextTaskSelectOperator(BaseBranchOperator):
        def choose_branch(self, context):
            import random
            random_num = random.randint(1,3) # 1 ~ 3 까지의 숫자를 조회한다.
            if random_num == 1:
                return "downstream_task1"
            elif random_num == 2:
                return "downstream_task2"
            elif random_num == 3:
                return ["downstream_task1", "downstream_task2"]

    random_selector_task = RandomNextTaskSelectOperator(task_id="random_selector_task")

    downstream_task1 = BashOperator(
        task_id="downstream_task1", bash_command='echo "I AM TASK 11111"'
    )

    downstream_task2 = BashOperator(
        task_id="downstream_task2", bash_command='echo "I AM TASK 22222"'
    )

    random_selector_task >> [downstream_task1, downstream_task2]

동작의 결과는 BranchPythonOperator 동일하므로 따로 실행을 결과를 보지는 않겠습니다.


이상으로 Task 의 분기처리를 할 수 있는 3가지 방법을 모두 알아봤습니다.




📌 Trigger Rule

여태까지의 분기처리를 위한 task 구조는 하나의 상위 task 에서 여러 하위 task 들이
의존하고 있는 형태였습니다. (아래그림 참고)

이번에는 반대로 여러개의 상위 task 가 하나의 하위 task 가 있는 구조라고 가정해봅시다.

이런 구조에서는 종종 하위 task 의 동작 방식의 상위 task 의 동작결과에 따라
다르게 동작하길 바라는 경우가 있습니다.

예를 들어서 ...

  • 상위의 모든 task 들이 성공했을 때만 하위 task 실행
  • 상위의 모든 task 들이 실패했을 때만 하위 task 실행

... 처럼 말이죠.

이런 경우를 위해서 Airflow 에서는 Trigger Rule 이라는 설정값을 사용합니다.
이 설정은 위 그림에서 본 하위 taskOperator 에 설정할 수 있습니다.
Trigger Rule 에는 어떤 종류가 있는지 알아봅시다.


Trigger Rule 종류

  • all_success (기본값) :
    상위 태스크 모두 성공하면 실행
  • all_failed :
    상위 태스크가 모두 실패하면 실행
  • all_done:
    상위 태스크의 모두 실행이 완료(=실패도 포함!)되면 실행
  • all_skipped:
    모든 상위 태스크가 skipped 상태면 실행
  • one_failed:
    (모든 상위 태스크의 완료 여부와 관계없이) 하나 이상의 상위 태스크가 실패면 실행
  • one_success:
    (모든 상위 태스크의 완료 여부와 관계없이) 하나 이상의 상위 태스크가 성공하면 실행

  • one_done:
    적어도 하나의 상위 태스크가 완료(= 성공이나 실패)이면 실행

  • none_failed:
    상위 태스크 중 실패가 없는 경우(=성공 또는 Skipped) 실행

  • none_failed_min_one_success:
    상위 태스크 중 실패가 없고 성공한 태스트가 적어도 1개 이상이면 실행

  • none_skipped:
    Skip 된 상위 태스크가 없으면 실행 (= 상위 태스크 성공/실패 여부 상관 X)

  • always:
    어떤 경우에나 실행



간단한 실습 코드

모든 Trigger Rule 을 테스트하기에는 너무 양이 많으니,
all_done 만 테스트 코드를 작성해보고 결과를 보겠습니다.

from airflow import DAG
from airflow.exceptions import AirflowException
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import pendulum


with DAG(
    dag_id="dags_trigger_rule_sample1",
    start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
    schedule=None,
    catchup=False,
) as dag:

    # 50 % 의 확률로 Exception 을 던지는 메소드 작성
    def half_and_half():
        import random
        random_num = random.randint(1,2) # 확률은 반반!
        if random_num == 1:
            raise AirflowException('뭔가 잘못됐음')
        elif random_num == 2:
            return "downstream_task2"

    pt1 = PythonOperator(task_id="pt1", python_callable=half_and_half)
    pt2 = PythonOperator(task_id="pt2", python_callable=half_and_half)

    downstream_task = BashOperator(
        task_id="downstream_task",
        bash_command='echo "All upstream task Success!"',
        trigger_rule="all_done", # 실패든 성공이든 상관없이 모든 상위 task 가 끝나면 실행!
    )

    [pt1, pt2] >> downstream_task

DAG 를 여러번 실행해서 상위 태스크가 실패든 성공이든 상관없이
downstream_task 실행되는지를 한번 확인해봅시다.

Dag Trigger 를 여러번 해서 실행해본 결과 위 그림처럼 상위 태스크 2개가
실패든 성공이든 무조건 downstream_task 가 실행되는 것을 확인할 수 있습니다.


이로서 Task 의 분기처리 및 Trigger Rule 에 대해서 가볍게 알아봤습니다.
이만 글을 마치도록 하겠습니다.
읽어주셔서 감사합니다.



참고한 링크

profile
백엔드 개발자로 일하고 있는 식빵(🍞)입니다.

0개의 댓글