[Airflow] dag 스케줄을 요일을 기준으로 분기태우고 싶을 때

sosimeow·2023년 9월 12일
0

Data Engineering

목록 보기
5/6
post-thumbnail

ML 학습 파이프라인 스케줄을 짜다보면, 요일 마다 다른 task 를 실행하고 싶을 때(해야 할 때)가 있다.

이 때 Airflow 내장 Operator 를 사용하면 간단하게 구현할 수 있다.


BranchDayOfWeekOperator

BranchDayOfWeekOperator 를 사용해 요일(week day value) 을 기준으로 workflow 를 분기 태울 수 있다.


Basic Format

  • task_id : BranchDayOfWeekOperator 의 task id
  • follow_task_ids_if_true / false : week_day 에 지정한 week_day_value 와 일치하면 실행되는 task의 id / 일치하지 않으면 실행되는 task 의 id
  • week_day : 실행 기준이 되는 요일 값
    ex) 특정요일 만: WeekDay.SATURDAY
        , 주말: {WeekDay.SATURDAY, WeekDay.SUNDAY})
from airflow.operators.weekday import BranchDayOfWeekOperator

BranchDayOfWeekOperator(
    task_id="task id",
    follow_task_ids_if_true="task_id will be executed when the weekday value is True case",
    follow_task_ids_if_false="task_id will be executed when the weekday value is False case",
    week_day=week_day_value,
)

Airflow docs 제공 활용 예시

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.weekday import BranchDayOfWeekOperator
from airflow.utils.weekday import WeekDay

empty_task_1 = EmptyOperator(task_id="branch_true")
empty_task_2 = EmptyOperator(task_id="branch_false")
empty_task_3 = EmptyOperator(task_id="branch_weekend")
empty_task_4 = EmptyOperator(task_id="branch_mid_week")

branch = BranchDayOfWeekOperator(
    task_id="make_choice",
    follow_task_ids_if_true="branch_true",
    follow_task_ids_if_false="branch_false",
    week_day="Monday",
)

branch_weekend = BranchDayOfWeekOperator(
    task_id="make_weekend_choice",
    follow_task_ids_if_true="branch_weekend",
    follow_task_ids_if_false="branch_mid_week",
    week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
)

# Run empty_task_1 if branch executes on Monday, empty_task_2 otherwise
branch >> [empty_task_1, empty_task_2]
# Run empty_task_3 if it's a weekend, empty_task_4 otherwise
empty_task_2 >> branch_weekend >> [empty_task_3, empty_task_4]

다른 dag 와 trigger 되도록 활용

아래와 같이 trigger_dag_id 파라미터를 활용하여 dag 실행을 분기태울 수 있다.

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.dummy import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.weekday import BranchDayOfWeekOperator
from aiflow.utils.weekday import WeekDay

df_args = {
	'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0
}

CMD = 'python $HOME/{filename}'

def get_op(filename):
	return BashOperator(
    	task_id = filename,
        queue = "queue"
        bash_command=CMD.format(filename=filename)
        
with DAG(
	'first proc',
    default_args=df_args,
    description = "first proc",
    schedule_interval=None,
    start_date=datetime(2023,09,12)
) as dag:

	task0 = get_operator("procedure_0.py")
    task1 = get_operator("procedure_1.py")
    
    br_op = BranchDayOfWeekOperator(
    	task_id="decision execution",
        follow_task_ids_if_true = "task_Sunday",
        follow_task_ids_if_false = "task_Mon_Sat",
        week_day = WeekDay.Sunday,
        use_task_execution_day = False,
    )
    
    task_Sunday = TriggerDagRunOperator(
    	task_id = "task_Sunday",
        trigger_dag_id = "Sunday_dag"
    )
    
    task_Mon_Sat = TriggerDagRunOperator(
    	task_id = "task_Mon_Sat",
        trigger_dag_id = "Mon_Sat_dag"
    )
    
task0 >> task1 >> br_op >> [task_Sunday, task_Mon_Sat]
    

Ref

profile
데이터 엔지니어 ing

0개의 댓글