def check_execution_time():
current_time = dt.utcnow()
con=Connect_DB()
cur=con.cursor()
useyn_1="쿼리1"
cur.execute(useyn_1)
hour_useyn_1 = cur.fetchall()
hour_useyn_1 = pd.DataFrame(hour_useyn_1, columns=["YF_UDT_H"])
hour_useyn_1['YF_UDT_H'] = hour_useyn_1['YF_UDT_H'].astype('int64')
hour_useyn_1=list(hour_useyn_1["YF_UDT_H"])
useyn_6="쿼리2"
cur.execute(useyn_6)
hour_useyn_6 = cur.fetchall()
hour_useyn_6 = pd.DataFrame(hour_useyn_6, columns=["YF_UDT_H"])
hour_useyn_6['YF_UDT_H'] = hour_useyn_6['YF_UDT_H'].astype('int64')
hour_useyn_6=list(hour_useyn_6["YF_UDT_H"])
if current_time.hour in hour_useyn_1 and current_time.hour in hour_useyn_6:
return 'cal_all'
elif current_time.hour not in hour_useyn_1 and current_time.hour not in hour_useyn_6:
raise AirflowSkipException(f"YF_UDT_H={current_time.hour}시에 해당하는 자산이 없습니다.")
elif current_time.hour in hour_useyn_1 and current_time.hour not in hour_useyn_6:
return 'only_cal_useyn_1'
else:
return 'only_cal_useyn_6'
from airflow.operators.python import BranchPythonOperator
execute_branch_task = BranchPythonOperator(
task_id='check_execution_time',
python_callable=check_execution_time,
provide_context=True,
dag=dag,
)
execute_branch_task>>[cal_all,only_cal_useyn_1,only_cal_useyn_6]
cal_all>>[price_task,price_useyn_2to3and6_task]>>price_to_mongoDB>>use_yn_6_risk>>useyn_1_dummy
only_cal_useyn_6>>price_task>>price_to_mongoDB>>use_yn_6_risk
only_cal_useyn_1>>[price_task,price_useyn_2to3and6_task]>>price_to_mongoDB>>useyn_1_dummy
- airflow docs를 보면 여러가지 trigger rule 파라미터가 있는데, 나같은 경우 한개의 상위작업이라도 성공할시 task를 실행하는 one_success를 적용했더니 요구사항에 맞게 의존성이 잘 설정되었다.
잘 설정된줄 알았는데 오류가 나서 보니깐 one_success는 상위 테스크의 완료를 기다려주지 않고 실행되는 문제가 있었다.(it does not wait for all parents to be done)
그래서 의존성을 고려해서 trigger_rule='none_failed_min_one_success'로 수정했더니 예상했던 순서로 task가 잘 실행됨을 확인했다.
task_id='price_to_mongoDB',
python_callable=to_mongoDB,
op_kwargs={'cond':'<30000000'},
dag=dag,
trigger_rule='one_success'
)