DAG 실행 중 특정 조건에 따라 다음에 실행할 태스크를 동적으로 선택할 때 사용(분기처리)
task1의 결과에따라 그다음 taska , taskb , taskc를 결정
def select_random():
import random
item_list = ['A','B','C']
select_item = random.choice(item_list)
if select_item == 'A':
return 'task_a'
elif select_item in ['B','C']:
return ['task_b','task_c']
branch_task = BranchPythonOperator(
task_id = "python_branch_task",
python_callable= select_random
)
예시의 함수처럼 random 결과에 따라 실행될 task를 결정
# 실제 작업을 수행할 함수들
def run_task_a():
print(">>> Running Task A logic")
def run_task_b():
print(">>> Running Task B logic")
def run_task_c():
print(">>> Running Task C logic")
def end_task():
print(">>> DAG finished successfully")
dag: ## 실행할 bash 명령어
# 브랜치 태스크
branch_task = BranchPythonOperator(
task_id="python_branch_task",
python_callable=select_random
)
# 브랜치 이후 태스크들
task_a = PythonOperator(
task_id='task_a',
python_callable=run_task_a
)
task_b = PythonOperator(
task_id='task_b',
python_callable=run_task_b
)
task_c = PythonOperator(
task_id='task_c',
python_callable=run_task_c
)
# 브랜치 이후 모든 경로를 합칠 엔드 태스크
task_end = PythonOperator(
task_id='end',
python_callable=end_task,
trigger_rule='none_failed_or_skipped' # 일부 태스크가 skipped 되어도 실행
)
branch_task >> [task_a, task_b, task_c]
task_a >> task_end
task_b >> task_end
task_c >> task_end
그림으로 도식화 해보자면 이런 task 형태일것이다.
+----------------+ | branch_task | +----------------+ / | \ / | \ v v v +---------+ +---------+ +---------+ | task_a | | task_b | | task_c | +---------+ +---------+ +---------+ \ | / \ | / v v v +----------------+ | task_end | +----------------+
task - graph

python_branch_task >> task_a >> end
task b , task c >> skipped
3번째는 python_branch_task >> task_b,task_c >> end
결과를 반환