task1의 결과에 따라 하위 task하나만 수행해야 할때 사용

선행 task에 return 값이 후행 task의 id 인데 string 으로 return 필요
def select_random():
from random import choice
item_lst = ['1','2','3']
selected_item = choice(item_lst)
if selected_item == '1':
return 'task_1'
else:
return ['task_2', 'task_3']
def common_func(**kwargs):
print(kwargs['selected'])
python_branch_task = BranchPythonOperator(
task_id ='python_branch_task',
python_callable= select_random
)
task_1 = PythonOperator(
task_id = 'task_1',
python_callable=common_func,
op_kwargs= {'selected':'1'}
)
task_2 = PythonOperator(
task_id = 'task_2',
python_callable=common_func,
op_kwargs= {'selected':'2'}
)
task_3 = PythonOperator(
task_id = 'task_3',
python_callable=common_func,
op_kwargs={'selected':'3'}
)
python_branch_task >>[task_1,task_2,task_3]
아래 2개만 유의하면 위 와 같음 .
@task.branch(task_id = 'python_branch_task')
select_random() >>[task_1 , task_2, task_3]
@task.branch(task_id = 'python_branch_task')
def select_random():
from random import choice
item_lst = ['1','2','3']
selected_item = choice(item_lst)
if selected_item == '1':
return 'task_1'
else:
return ['task_2', 'task_3']
많이 안쓰여서 pass ... 나중에 필요하면 다시 공부핪