
이 게시물은
airflow 2.10.5버전을 사용해서 작성됐습니다.
3.x버전과 조금 다를 수 있으니 유의하시기 바랍니다.
가끔은 하나의 task 에서 처리된 결과에 따라 다음에 연결된
여러개의 task 들 중에서 특정 task 를 실행하고 싶을 때가 있습니다.
이럴 때는 Airflow 에서는 제공하는 3가지 방법 중 하나를 사용하면됩니다.
지금부터 하나하나 알아보죠.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator
import pendulum
with DAG(
dag_id="dags_branch_sample1",
start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
schedule=None,
catchup=False,
) as dag:
# 랜덤하게 다음 task_id 를 고르는 메소드 생성
# return 값이 뭐냐에 따라서 다음에 수행할 task 가 정해집니다!
def select_next_task():
import random
random_num = random.randint(1,3) # 1 ~ 3 까지의 숫자를 조회한다.
if random_num == 1:
return "downstream_task1"
elif random_num == 2:
return "downstream_task2"
elif random_num == 3:
# 한번에 2개 이상의 task 를 지정하려면 아래처럼 리스트로 반환!
return ["downstream_task1", "downstream_task2"]
random_select_task = BranchPythonOperator(
task_id='random_select_task',
python_callable=select_next_task
)
# random_num = 1 또는 random_num = 3 일 때 실행되는 task
downstream_task1 = BashOperator(
task_id="downstream_task1", bash_command='echo "I AM TASK 11111"'
)
# random_num = 2 또는 random_num = 3 일 때 실행되는 task
downstream_task2 = BashOperator(
task_id="downstream_task2", bash_command='echo "I AM TASK 22222"'
)
random_select_task >> [downstream_task1, downstream_task2]
DAG 를 실행해봅시다.

실행한 후 그래프를 보면 downstream_task1 은 skipped 되서 실행이 안됐고,
downstream_task2 만 실행된 것을 확인할 수 있습니다.
이말은 random_select_task 에서 사용하는 함수에서
return 값이 'downstream_task2' 여서 그런 겁니다.

BranchPythonOperator 의 함수에서 return 한 문자열이 위 그림처럼
XCOM 에 skipmixin_key 에 들어가네요. Task 실행 흐름을 파악할 때 보면 좋겠네요 :)

별로 중요하진 않지만 downstream_task2 의 LOG 가 찍히는 것도 확인할 수 있습니다.
참고:
만약에 BranchPythonOperator 가 ["downstream_task1", "downstream_task2"]
처럼 리스트를 반환했다면 2개의 하위 task 가 실행됩니다.
그런 경우에 Graph 와 BranchPythonOperator 의 XCOM 상태는 다음과 같습니다.


BranchPythonOperator 의 데코레이터 버전인 @task.branch 을 사용해서
Task 분기처리를 해보겠습니다. BranchPythonOperator 클래스 를 사용할 때와
동일하게 동작하도록 코드를 작성해봤습니다.
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator
import pendulum
with DAG(
dag_id="dags_branch_sample1",
start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
schedule=None,
catchup=False,
) as dag:
@task.branch(task_id="random_select_task")
def select_next_task():
import random
random_num = random.randint(1,3) # 1 ~ 3 까지의 숫자를 조회한다.
if random_num == 1:
return "downstream_task1"
elif random_num == 2:
return "downstream_task2"
elif random_num == 3:
return ["downstream_task1", "downstream_task2"]
downstream_task1 = BashOperator(
task_id="downstream_task1", bash_command='echo "I AM TASK 11111"'
)
downstream_task2 = BashOperator(
task_id="downstream_task2", bash_command='echo "I AM TASK 22222"'
)
select_next_task() >> [downstream_task1, downstream_task2]
동작의 결과는
BranchPythonOperator동일하므로 따로 실행을 결과를 보지는 않겠습니다.
마지막으로 BaseBranchOperator 를 상속하는 클래스를 생성하는 방법입니다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.branch import BaseBranchOperator
import pendulum
with DAG(
dag_id="dags_branch_sample1",
start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
schedule=None,
catchup=False,
) as dag:
class RandomNextTaskSelectOperator(BaseBranchOperator):
def choose_branch(self, context):
import random
random_num = random.randint(1,3) # 1 ~ 3 까지의 숫자를 조회한다.
if random_num == 1:
return "downstream_task1"
elif random_num == 2:
return "downstream_task2"
elif random_num == 3:
return ["downstream_task1", "downstream_task2"]
random_selector_task = RandomNextTaskSelectOperator(task_id="random_selector_task")
downstream_task1 = BashOperator(
task_id="downstream_task1", bash_command='echo "I AM TASK 11111"'
)
downstream_task2 = BashOperator(
task_id="downstream_task2", bash_command='echo "I AM TASK 22222"'
)
random_selector_task >> [downstream_task1, downstream_task2]
동작의 결과는
BranchPythonOperator동일하므로 따로 실행을 결과를 보지는 않겠습니다.
이상으로 Task 의 분기처리를 할 수 있는 3가지 방법을 모두 알아봤습니다.
여태까지의 분기처리를 위한 task 구조는 하나의 상위 task 에서 여러 하위 task 들이
의존하고 있는 형태였습니다. (아래그림 참고)

이번에는 반대로 여러개의 상위 task 가 하나의 하위 task 가 있는 구조라고 가정해봅시다.

이런 구조에서는 종종 하위 task 의 동작 방식의 상위 task 의 동작결과에 따라
다르게 동작하길 바라는 경우가 있습니다.
예를 들어서 ...
... 처럼 말이죠.
이런 경우를 위해서 Airflow 에서는 Trigger Rule 이라는 설정값을 사용합니다.
이 설정은 위 그림에서 본 하위 task 의 Operator 에 설정할 수 있습니다.
Trigger Rule 에는 어떤 종류가 있는지 알아봅시다.
all_success (기본값) :all_failed :all_done:all_skipped:one_failed:one_success:
(모든 상위 태스크의 완료 여부와 관계없이) 하나 이상의 상위 태스크가 성공하면 실행
one_done:
적어도 하나의 상위 태스크가 완료(= 성공이나 실패)이면 실행
none_failed:
상위 태스크 중 실패가 없는 경우(=성공 또는 Skipped) 실행
none_failed_min_one_success:
상위 태스크 중 실패가 없고 성공한 태스트가 적어도 1개 이상이면 실행
none_skipped:
Skip 된 상위 태스크가 없으면 실행 (= 상위 태스크 성공/실패 여부 상관 X)
always:
어떤 경우에나 실행
모든 Trigger Rule 을 테스트하기에는 너무 양이 많으니,
all_done 만 테스트 코드를 작성해보고 결과를 보겠습니다.
from airflow import DAG
from airflow.exceptions import AirflowException
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import pendulum
with DAG(
dag_id="dags_trigger_rule_sample1",
start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
schedule=None,
catchup=False,
) as dag:
# 50 % 의 확률로 Exception 을 던지는 메소드 작성
def half_and_half():
import random
random_num = random.randint(1,2) # 확률은 반반!
if random_num == 1:
raise AirflowException('뭔가 잘못됐음')
elif random_num == 2:
return "downstream_task2"
pt1 = PythonOperator(task_id="pt1", python_callable=half_and_half)
pt2 = PythonOperator(task_id="pt2", python_callable=half_and_half)
downstream_task = BashOperator(
task_id="downstream_task",
bash_command='echo "All upstream task Success!"',
trigger_rule="all_done", # 실패든 성공이든 상관없이 모든 상위 task 가 끝나면 실행!
)
[pt1, pt2] >> downstream_task
DAG 를 여러번 실행해서 상위 태스크가 실패든 성공이든 상관없이
downstream_task 실행되는지를 한번 확인해봅시다.

Dag Trigger 를 여러번 해서 실행해본 결과 위 그림처럼 상위 태스크 2개가
실패든 성공이든 무조건 downstream_task 가 실행되는 것을 확인할 수 있습니다.
이로서 Task 의 분기처리 및 Trigger Rule 에 대해서 가볍게 알아봤습니다.
이만 글을 마치도록 하겠습니다.
읽어주셔서 감사합니다.