분기 처리가 필요한 이유
1) BranchPythonOperator
2) task.branch 데코레이터
3) BaseBranchOperator 상속하여 직접개발
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
with DAG(
dag_id='dags_branch_python_operator',
start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
schedule='0 1 * * *',
catchup=False
) as dag:
def select_random():
import random
item_lst = ['A','B','C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a' # 이 값의 task_id를 가진 task를 실행하기로 함.
elif selected_item in ['B','C']:
return ['task_b','task_c']
python_branch_task = BranchPythonOperator(
task_id='python_branch_task',
python_callable=select_random # 분기할 로직이 담긴 함수
)
def common_func(**kwargs):
print(kwargs['selected'])
task_a = PythonOperator(
task_id='task_a',
python_callable=common_func,
op_kwargs={'selected':'A'}
)
task_b = PythonOperator(
task_id='task_b',
python_callable=common_func,
op_kwargs={'selected':'B'}
)
task_c = PythonOperator(
task_id='task_c',
python_callable=common_func,
op_kwargs={'selected':'C'}
)
python_branch_task >> [task_a, task_b, task_c]
from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
from airflow.decorators import task
with DAG(
dag_id='dags_python_with_branch_decorator',
start_date=datetime(2024,6,17),
schedule=None,
catchup=False
) as dag:
@task.branch(task_id='python_branch_task')
def select_random():
import random
item_lst = ['A', 'B', 'C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B','C']:
return ['task_b','task_c']
def common_func(**kwargs):
print(kwargs['selected'])
task_a = PythonOperator(
task_id='task_a',
python_callable=common_func,
op_kwargs={'selected':'A'}
)
task_b = PythonOperator(
task_id='task_b',
python_callable=common_func,
op_kwargs={'selected':'B'}
)
task_c = PythonOperator(
task_id='task_c',
python_callable=common_func,
op_kwargs={'selected':'C'}
)
select_random() >> [task_a, task_b, task_c]
from airflow import DAG
import pendulum
from airflow.operators.branch import BaseBranchOperator
from airflow.operators.python import PythonOperator
with DAG(
dag_id='dags_base_branch_operator',
start_date=pendulum.datetime(2024,6,17, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
class CustomBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
import random
print(context)
item_lst = ['A', 'B', 'C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B','C']:
return ['task_b','task_c']
custom_branch_operator = CustomBranchOperator(task_id='python_branch_task')
def common_func(**kwargs):
print(kwargs['selected'])
task_a = PythonOperator(
task_id='task_a',
python_callable=common_func,
op_kwargs={'selected':'A'}
)
task_b = PythonOperator(
task_id='task_b',
python_callable=common_func,
op_kwargs={'selected':'B'}
)
task_c = PythonOperator(
task_id='task_c',
python_callable=common_func,
op_kwargs={'selected':'C'}
)
custom_branch_operator >> [task_a, task_b, task_c]