[Airflow] branch opeartor랑 trigger_rule로 복잡한 DAG 의존성 설정하기

Yeolsim's logs·2024년 2월 19일
0
post-thumbnail

dag 요구사항

  • 쿼리 결과에 따라 4가지의 서로 다른 task루틴으로 분기 되어야함
  • 각 하위 task루틴들 중 겹치는 것도 존재함

branch operator

함수작성

  • pymysql을 이용하여 쿼리 결과에 따라 다른 task루틴을 실행하도록 함수를 작성
  • 여기서 return되는 task들은 전부 dummy operator
  • return되는 task를 제외한 나머지 task와 해당 task들의 하위 작업들까지도 전부 skipped됨
def check_execution_time():
    current_time = dt.utcnow()
    con=Connect_DB()
    cur=con.cursor()
    
    useyn_1="쿼리1"
    cur.execute(useyn_1)
    hour_useyn_1 = cur.fetchall()
    hour_useyn_1 = pd.DataFrame(hour_useyn_1, columns=["YF_UDT_H"])
    hour_useyn_1['YF_UDT_H'] = hour_useyn_1['YF_UDT_H'].astype('int64')
    hour_useyn_1=list(hour_useyn_1["YF_UDT_H"])
    
    useyn_6="쿼리2"
    cur.execute(useyn_6)
    hour_useyn_6 = cur.fetchall()
    hour_useyn_6 = pd.DataFrame(hour_useyn_6, columns=["YF_UDT_H"])
    hour_useyn_6['YF_UDT_H'] = hour_useyn_6['YF_UDT_H'].astype('int64')
    hour_useyn_6=list(hour_useyn_6["YF_UDT_H"])
    

    if current_time.hour in hour_useyn_1 and current_time.hour in hour_useyn_6:
        return 'cal_all'
    
    elif current_time.hour not in hour_useyn_1 and current_time.hour not in hour_useyn_6:
        raise AirflowSkipException(f"YF_UDT_H={current_time.hour}시에 해당하는 자산이 없습니다.")
    
    elif current_time.hour in hour_useyn_1 and current_time.hour not in hour_useyn_6:
        return 'only_cal_useyn_1'
    
    else:
        return 'only_cal_useyn_6'

branch operator task작성

from airflow.operators.python import BranchPythonOperator

execute_branch_task = BranchPythonOperator(
    task_id='check_execution_time',
    python_callable=check_execution_time,
    provide_context=True,
    dag=dag,
)

의존성 설정

  • 아래와 같이 하위작업 목록을 설정해주면 된다.
execute_branch_task>>[cal_all,only_cal_useyn_1,only_cal_useyn_6]

cal_all>>[price_task,price_useyn_2to3and6_task]>>price_to_mongoDB>>use_yn_6_risk>>useyn_1_dummy
only_cal_useyn_6>>price_task>>price_to_mongoDB>>use_yn_6_risk
only_cal_useyn_1>>[price_task,price_useyn_2to3and6_task]>>price_to_mongoDB>>useyn_1_dummy

하위task에 trigger rule설정

  • 위 그래프를 보면 각 task별 하위작업들이 일부 겹치는걸 확인 할 수 있는데,
    만약 branch operator가 cal_all을 return할 경우 나머지 task(only_cal_useyn1,only_cal_useyn_6)에 포함된 하위 task들이 전부 skip된다.
  • 이런 문제를 막기 위해 하위task들에 trigger rule을 설정해줘야한다.

    https://airflow.apache.org/docs/apache-airflow/1.10.9/concepts.html

- airflow docs를 보면 여러가지 trigger rule 파라미터가 있는데, 나같은 경우 한개의 상위작업이라도 성공할시 task를 실행하는 one_success를 적용했더니 요구사항에 맞게 의존성이 잘 설정되었다.

잘 설정된줄 알았는데 오류가 나서 보니깐 one_success는 상위 테스크의 완료를 기다려주지 않고 실행되는 문제가 있었다.(it does not wait for all parents to be done)
그래서 의존성을 고려해서 trigger_rule='none_failed_min_one_success'로 수정했더니 예상했던 순서로 task가 잘 실행됨을 확인했다.

    task_id='price_to_mongoDB',
    python_callable=to_mongoDB,
    op_kwargs={'cond':'<30000000'},
    dag=dag,
    trigger_rule='one_success'
)

0개의 댓글