ML 학습 파이프라인 스케줄을 짜다보면, 요일 마다 다른 task 를 실행하고 싶을 때(해야 할 때)가 있다.
이 때 Airflow 내장 Operator 를 사용하면 간단하게 구현할 수 있다.
BranchDayOfWeekOperator
를 사용해 요일(week day value) 을 기준으로 workflow 를 분기 태울 수 있다.
task_id
: BranchDayOfWeekOperator 의 task idfollow_task_ids_if_true / false
: week_day 에 지정한 week_day_value 와 일치하면 실행되는 task의 id / 일치하지 않으면 실행되는 task 의 idweek_day
: 실행 기준이 되는 요일 값from airflow.operators.weekday import BranchDayOfWeekOperator
BranchDayOfWeekOperator(
task_id="task id",
follow_task_ids_if_true="task_id will be executed when the weekday value is True case",
follow_task_ids_if_false="task_id will be executed when the weekday value is False case",
week_day=week_day_value,
)
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.weekday import BranchDayOfWeekOperator
from airflow.utils.weekday import WeekDay
empty_task_1 = EmptyOperator(task_id="branch_true")
empty_task_2 = EmptyOperator(task_id="branch_false")
empty_task_3 = EmptyOperator(task_id="branch_weekend")
empty_task_4 = EmptyOperator(task_id="branch_mid_week")
branch = BranchDayOfWeekOperator(
task_id="make_choice",
follow_task_ids_if_true="branch_true",
follow_task_ids_if_false="branch_false",
week_day="Monday",
)
branch_weekend = BranchDayOfWeekOperator(
task_id="make_weekend_choice",
follow_task_ids_if_true="branch_weekend",
follow_task_ids_if_false="branch_mid_week",
week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
)
# Run empty_task_1 if branch executes on Monday, empty_task_2 otherwise
branch >> [empty_task_1, empty_task_2]
# Run empty_task_3 if it's a weekend, empty_task_4 otherwise
empty_task_2 >> branch_weekend >> [empty_task_3, empty_task_4]
아래와 같이 trigger_dag_id
파라미터를 활용하여 dag 실행을 분기태울 수 있다.
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.dummy import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.weekday import BranchDayOfWeekOperator
from aiflow.utils.weekday import WeekDay
df_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0
}
CMD = 'python $HOME/{filename}'
def get_op(filename):
return BashOperator(
task_id = filename,
queue = "queue"
bash_command=CMD.format(filename=filename)
with DAG(
'first proc',
default_args=df_args,
description = "first proc",
schedule_interval=None,
start_date=datetime(2023,09,12)
) as dag:
task0 = get_operator("procedure_0.py")
task1 = get_operator("procedure_1.py")
br_op = BranchDayOfWeekOperator(
task_id="decision execution",
follow_task_ids_if_true = "task_Sunday",
follow_task_ids_if_false = "task_Mon_Sat",
week_day = WeekDay.Sunday,
use_task_execution_day = False,
)
task_Sunday = TriggerDagRunOperator(
task_id = "task_Sunday",
trigger_dag_id = "Sunday_dag"
)
task_Mon_Sat = TriggerDagRunOperator(
task_id = "task_Mon_Sat",
trigger_dag_id = "Mon_Sat_dag"
)
task0 >> task1 >> br_op >> [task_Sunday, task_Mon_Sat]
Ref