task1
의 결과로 'GOOD', 'BAD', 'Pending'이라는 결과 3개 중 하나가 나오고 그에 따라 task2-1
~ task 2-3
중 하나가 실행되도록 해야할 경우후행 task이름
이기 때문에 꼼꼼히 작성해야 한다.동시에
수행시키고 싶다면 리스트
형태로 후행 task의 이름을 전달해주면 된다.String
값으로 작성해주면 된다.from airflow import DAG
import pendulum
from airflow.operators.python import BranchPythonOperator, PythonOperator
with DAG(
dag_id="dags_branch_python_operator",
start_date=pendulum.datetime(2023, 8, 1, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
def select_random():
import random
item_lst = ['A', 'B', 'C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B', 'C']:
return ['task_b', 'task_c']
python_branch_task = BranchPythonOperator(
task_id="python_branch_task",
python_callable=select_random
)
def common_func(**kwargs):
print(kwargs['selected'])
task_a = PythonOperator(
task_id='task_a',
python_callable=common_func,
op_kwargs={'selected': 'A'}
)
task_b = PythonOperator(
task_id='task_b',
python_callable=common_func,
op_kwargs={'selected': 'B'}
)
task_c = PythonOperator(
task_id='task_c',
python_callable=common_func,
op_kwargs={'selected': 'C'}
)
# task flow
python_branch_task >> [task_a, task_b, task_c]
첫번째로 trigger DAG를 수행한 결과
랜덤으로 뽑은 값이 select_item == 'A'를 만족함에 따라 task_a만이 수행되었다. task_b와 task_c는 skipped
되었음을 확인할 수 있다.
xcom을 확인해본 결과, return_value에 task_a가 들어가게 되었다.
로그를 살펴보면 xcom에서 following라는 key값으로 task_a가 들어가게 되었음을 확인할 수 있고, task_b와, task_c는 스킵처리 되었음을 확인할 수 있다.
trigger DAG를 여러번 실행해 본 결과