https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#branching
BranchPythonOperator
에서 랜덤한 조건에 따라 task_a
만 수행하거나, task_b
와 task_c
를 같이 수행하는 분기 처리# dags/branch_python.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.operators.python import BranchPythonOperator
import pendulum
with DAG(
dag_id="branch_python",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
schedule="0 0 * * *",
catchup=False,
tags=["example", "branch"],
) as dag:
def select_random():
import random
item_lst = ['A','B','C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return "task_a"
else:
return ["task_b","task_c"]
branch_task = BranchPythonOperator(
task_id="branch_task",
python_callable=select_random
)
def print_selected(**kwargs):
print(kwargs["selected"])
task_a = PythonOperator(
task_id="task_a",
python_callable=print_selected,
op_kwargs={"selected":'A'}
)
task_b = PythonOperator(
task_id="task_b",
python_callable=print_selected,
op_kwargs={"selected":'B'}
)
task_c = PythonOperator(
task_id="task_c",
python_callable=print_selected,
op_kwargs={"selected":'C'}
)
branch_task >> [task_a, task_b, task_c]
task_a
만 수행되거나, task_b
와 task_c
가 같이 수행되는 두 가지 경우를 확인![]() | ![]() |
---|
task_a
가 선택되는 작업에서 XCom을 보면 skipmixin_key
키로 {'followed': ['task_a']}
값이 전달되는데, 이를 통해 다른 Task에서도 어떤 분기 처리가 되었는지 확인 가능# branch_task
[2025-06-06, 11:19:07] INFO - Done. Returned value was: task_a: source="airflow.task.operators.airflow.providers.standard.operators.python.BranchPythonOperator"
[2025-06-06, 11:19:07] INFO - Branch into task_a: source="airflow.task.operators.airflow.providers.standard.operators.python.BranchPythonOperator"
[2025-06-06, 11:19:07] INFO - Following branch {'task_a'}: source="airflow.task.operators.airflow.providers.standard.operators.python.BranchPythonOperator"
[2025-06-06, 11:19:07] INFO - Skipping tasks [('task_b', -1), ('task_c', -1)]: source="airflow.task.operators.airflow.providers.standard.operators.python.BranchPythonOperator"
[2025-06-06, 11:19:07] INFO - Skipping downstream tasks.: source="task"
BranchPythonOperator
대신에 @task.branch
데코레이터를 써서 아래와 같이 표현도 가능select_random() >> [task_a, task_b, task_c]
)# dags/branch_python_decorator.py
from airflow.sdk import DAG, task
from airflow.providers.standard.operators.python import PythonOperator
import pendulum
with DAG(
dag_id="branch_python_decorator",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
schedule="0 0 * * *",
catchup=False,
tags=["example", "branch"],
) as dag:
@task.branch(task_id="branch_task")
def select_random():
import random
item_lst = ['A','B','C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return "task_a"
else:
return ["task_b","task_c"]
def print_selected(**kwargs):
print(kwargs["selected"])
task_a = PythonOperator(
task_id="task_a",
python_callable=print_selected,
op_kwargs={"selected":'A'}
)
task_b = PythonOperator(
task_id="task_b",
python_callable=print_selected,
op_kwargs={"selected":'B'}
)
task_c = PythonOperator(
task_id="task_c",
python_callable=print_selected,
op_kwargs={"selected":'C'}
)
select_random() >> [task_a, task_b, task_c]
choose_branch(self, context)
메서드를 구현해야 하고, 분기 처리 로직을 통해 선택되어야 할 Task를 한 개(문자열) 또는 여러 개(리스트)로 반환해야 함# dags/branch_base.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.branch import BaseBranchOperator
from airflow.providers.standard.operators.python import PythonOperator
import pendulum
with DAG(
dag_id="branch_base",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
schedule="0 0 * * *",
catchup=False,
tags=["example", "branch"],
) as dag:
class CustomBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
import random
item_lst = ['A','B','C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return "task_a"
else:
return ["task_b","task_c"]
custom_branch_task = CustomBranchOperator(task_id="custom_branch_task")
def print_selected(**kwargs):
print(kwargs["selected"])
task_a = PythonOperator(
task_id="task_a",
python_callable=print_selected,
op_kwargs={"selected":'A'}
)
task_b = PythonOperator(
task_id="task_b",
python_callable=print_selected,
op_kwargs={"selected":'B'}
)
task_c = PythonOperator(
task_id="task_c",
python_callable=print_selected,
op_kwargs={"selected":'C'}
)
custom_branch_task >> [task_a, task_b, task_c]
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#trigger-rules
옵션 | 설명 |
---|---|
all_success | 기본값, 상위 Task가 모두 성공하면 실행 |
all_failed | 상위 Task가 모두 failed 상태면 실행 |
all_done | 상위 Task가 모두 수행되면 실행 (성공 또는 실패) |
all_skipped | 상위 Task가 모두 skipped 상태면 실행 |
one_failed | 상위 Task 중 하나 이상 실패하면 실행 |
one_success | 상위 Task 중 하나 이상 성공하면 실행 |
one_done | 상위 Task 중 하나 이상 수행되면 실행 (성공 또는 실패) |
none_failed | 상위 Task 중에 failed 상태가 없으면 실행 |
none_failed_min_one_success | 상위 Task 중에 failed 상태가 없고 성공한 Task가 1개 이상이면 실행 |
none_skipped | 상위 Task 중에 skipped 상태가 없으면 실행 |
always | 항상 실행 |
all_done
의 동작을 확인하기 위한 예시 DAG 작성failed
상태를 유발downstream_task
에 trigger_rule
파라미터로 all_done
전달# dags/trigger_rule1.py
from airflow.sdk import DAG, task
from airflow.exceptions import AirflowException
from airflow.providers.standard.operators.bash import BashOperator
import pendulum
with DAG(
dag_id="trigger_rule1",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
schedule="0 0 * * *",
catchup=False,
tags=["example", "branch"],
) as dag:
upstream_task1 = BashOperator(
task_id="upstream_task1",
bash_command="echo upstream1"
)
@task(task_id="upstream_task2")
def upstream_task2():
raise AirflowException("upstream2 Exception")
@task(task_id="upstream_task3")
def upstream_task3():
print("정상 처리")
@task(task_id="downstream_task", trigger_rule="all_done")
def downstream_task():
print("정상 처리")
[upstream_task1, upstream_task2(), upstream_task3()] >> downstream_task()
all_done
은 상위 Task가 성공 또는 실패 여부에 관계없이 모두 수행되면 실행하는 옵션으로, upstream_task2
가 실패 처리되어도 downstream_task
가 수행되는 모습을 확인
none_skipped
의 동작을 확인하기 위한 예시 DAG 작성skipped
상태를 유발downstream_task
에 trigger_rule
파라미터로 none_skipped
전달# dags/trigger_rule2.py
from airflow.sdk import DAG, task
from airflow.providers.standard.operators.bash import BashOperator
import pendulum
with DAG(
dag_id="trigger_rule2",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
schedule="0 0 * * *",
catchup=False,
tags=["example", "branch"],
) as dag:
@task.branch(task_id="branching")
def random_branch():
import random
item_lst = ['A','B','C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return "upstream_task_a"
elif selected_item == 'B':
return "upstream_task_b"
elif selected_item == 'C':
return "upstream_task_c"
upstream_task_a = BashOperator(
task_id="upstream_task_a",
bash_command="echo upstream1"
)
@task(task_id="upstream_task_b")
def upstream_task_b():
print("정상 처리")
@task(task_id="upstream_task_c")
def upstream_task_c():
print("정상 처리")
@task(task_id="downstream_task", trigger_rule="none_skipped")
def downstream_task():
print("정상 처리")
random_branch() >> [upstream_task_a, upstream_task_b(), upstream_task_c()] >> downstream_task()
none_skipped
은 상위 Task가 skipped
상태가 아니어야 실행하는 옵션으로, upstream_task1
만 성공하고 나머지는 skipped
처리되었기 때문에, downstream_task
도 수행되지 못하고 skipped
처리
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#taskgroups
@task_group
데코레이터 또는 TaskGroup
클래스를 활용하여 TaskGroup을 구현tooltip
파라미터로 UI에 표시할 내용을 전달할 수도 있음 (파라미터가 docstring보다 우선)# dags/task_group.py
from airflow.sdk import DAG, task, task_group, TaskGroup
from airflow.providers.standard.operators.python import PythonOperator
import pendulum
with DAG(
dag_id="task_group",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
schedule="0 0 * * *",
catchup=False,
tags=["example", "branch"],
) as dag:
def inner_function2(**kwargs):
msg = kwargs.get("msg") or str()
print(msg)
@task_group(group_id="first_group")
def first_group():
""" 첫 번째 TaskGroup 에 대한 Tooltip 입니다. """
@task(task_id="inner_function1")
def inner_function1(**kwargs):
print("첫 번째 TaskGroup 내 첫 번째 Task 입니다.")
inner_function2 = PythonOperator(
task_id="inner_function2",
python_callable=inner_function2,
op_kwargs={"msg":"첫 번째 TaskGroup 내 두 번째 Task 입니다."}
)
inner_function1() >> inner_function2
with TaskGroup(group_id="second_group", tooltip="두 번째 TaskGroup 에 대한 Tooltip 입니다.") as second_group:
""" tooltip 파라미터의 내용이 우선적으로 표시됩니다. """
@task(task_id="inner_function1")
def inner_function1(**kwargs):
print("두 번째 TaskGroup 내 첫 번째 Task 입니다.")
inner_function2 = PythonOperator(
task_id="inner_function2",
python_callable=inner_function2,
op_kwargs={"msg": "두 번째 TaskGroup 내 두 번째 Task 입니다."}
)
inner_function1() >> inner_function2
first_group() >> second_group
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#edge-labels
>>
또는 <<
연산자 사이에 Label
을 추가# dags/edge_label.py
from airflow.sdk import DAG, Label
from airflow.providers.standard.operators.empty import EmptyOperator
import pendulum
with DAG(
dag_id="edge_label",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
schedule="0 0 * * *",
catchup=False,
tags=["example", "branch"],
) as dag:
empty_1 = EmptyOperator(
task_id="empty_1"
)
empty_2 = EmptyOperator(
task_id="empty_2"
)
empty_1 >> Label("라벨") >> empty_2
empty_3 = EmptyOperator(
task_id="empty_3"
)
empty_4 = EmptyOperator(
task_id="empty_4"
)
empty_5 = EmptyOperator(
task_id="empty_5"
)
empty_6 = EmptyOperator(
task_id="empty_6"
)
empty_2 >> Label("브랜치 시작") >> [empty_3,empty_4,empty_5] >> Label("브랜치 종료") >> empty_6