Airflow Study16- BranchPython Operator

박성현·2024년 6월 8일
0

Airflow

목록 보기
23/28

Task 분기처리 유형

task1의 결과에 따라 하위 task하나만 수행해야 할때 사용

1. BranchPythonOperator

선행 task에 return 값이 후행 task의 id 인데 string 으로 return 필요

 
def select_random():
	from random import choice
        
	item_lst = ['1','2','3']
	selected_item = choice(item_lst)
        
	if selected_item == '1':
		return 'task_1'
        
	else:
		return ['task_2', 'task_3'] 
    
def common_func(**kwargs):
	print(kwargs['selected'])
    
python_branch_task = BranchPythonOperator(
	task_id ='python_branch_task',
	python_callable= select_random
)    
    
task_1 = PythonOperator(
	task_id = 'task_1',
	python_callable=common_func,
	op_kwargs= {'selected':'1'}
)
    
task_2 = PythonOperator(
	task_id = 'task_2',
	python_callable=common_func,
	op_kwargs= {'selected':'2'}
)
    
task_3 = PythonOperator(
	task_id = 'task_3',
	python_callable=common_func,
	op_kwargs={'selected':'3'}
)
    
python_branch_task >>[task_1,task_2,task_3]
 
 
 

2. @task.brach Decorator

아래 2개만 유의하면 위 와 같음 .
@task.branch(task_id = 'python_branch_task')
select_random() >>[task_1 , task_2, task_3]


@task.branch(task_id = 'python_branch_task')
def select_random():
	from random import choice
	item_lst = ['1','2','3']
	selected_item = choice(item_lst)
        
	if selected_item == '1':
		return 'task_1'
        
	else:
		return ['task_2', 'task_3'] 

3. BashBranchOperator 상속 개발

많이 안쓰여서 pass ... 나중에 필요하면 다시 공부핪

profile
다소Good한 데이터 엔지니어

0개의 댓글