[airflow] branch operator 로 조건별로 task 수행시키기

오현우·2022년 6월 13일
0

airflow

목록 보기
15/20

Branch operator

airflow dag를 설계하면서 내가 불러오려는 값이 존재하지 않을 때 다른 명령을 내리고 싶다고 가정해보자. 이러한 경우 어떻게 설계 해야할까?

branch operator의 종류

BranchSQLOperator: Branches based on whether a given SQL query returns true or false
BranchDayOfWeekOperator: Branches based on whether the current day of week is equal to a given week_day parameter
BranchDateTimeOperator: Branches based on whether the current time is between target_lower and target_upper times
All of these operators take follow_task_ids_if_true and follow_task_ids_if_false parameters which provide the list of task(s) to include in the branch based on the logic returned by the operator.

ShortCircuitOperator : This operator also takes a Python callable that returns True or False based on logic implemented for your use case. If True is returned, the DAG will continue, and if False is returned, all downstream tasks will be skipped.

branch operator 구현해보기

해당 모델중 5만 넘으면 정확하다고 판단하는 DAG를 구현해보자.

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.operators.dummy import DummyOperator

from random import uniform
from datetime import datetime

default_args = {
    'start_date': datetime(2020, 1, 1)
}

def _training_model(ti):
    accuracy = uniform(0.1, 10.0)
    print(f'model\'s accuracy: {accuracy}')
    ti.xcom_push(key='model_accuracy', value=accuracy)

def _choose_best_model(ti):
    accuracies = ti.xcom_pull(key="model_accuracy", task_ids=[
        'processing_tasks.training_model_a',
        'processing_tasks.training_model_b',
        'processing_tasks.training_model_c'
    ])
    models = ["a", "b", "c"]
    accuracies = zip(models, accuracies)

    for model, accuracy in accuracies:
        if accuracy > 5:
            ti.xcom_push(key="best_model", value=model)
            return "accurate"
        else:
            return "inaccurate"


with DAG('xcom_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
    downloading_data = BashOperator(
        do_xcom_push=False,
        task_id='downloading_data',
        bash_command='sleep 3'
    )

    with TaskGroup('processing_tasks') as processing_tasks:
        training_model_a = PythonOperator(
            task_id='training_model_a',
            python_callable=_training_model
        )

        training_model_b = PythonOperator(
            task_id='training_model_b',
            python_callable=_training_model
        )

        training_model_c = PythonOperator(
            task_id='training_model_c',
            python_callable=_training_model
        )

    choose_best_model = BranchPythonOperator(
        task_id='choose_best_model',
        python_callable=_choose_best_model
    )

    accurate = DummyOperator(
        task_id='accurate'
    )
    inaccurate = DummyOperator(
        task_id='inaccurate'
    )
    


    downloading_data >> processing_tasks >> choose_best_model

    choose_best_model >> [accurate, inaccurate]

우리는 처음 5를 넘는 모델을 베스트 모델이라 생각하고 xcom을 통해 best 모델을 저장한 뒤 5넘는 모델이 있다고 accurate를 return 했다.

이처럼 파이썬 브랜치 오퍼레이터를 통해 조건에 맞는 부분만 실행이 가능하다.

꿀팁

return시 원하는 task들을 여러개 넘기면 해당 태스크들도 여러개 수행된다.

참조
https://www.astronomer.io/guides/airflow-branch-operator/

profile
핵심은 같게, 생각은 다르게

0개의 댓글