airflow 2.0.2 버전에서는 이상 없이 import 되었던 mysql 모듈이 2.4.3 버전으로 업그레이드 하자 import 에러가 발생하였습니다. 그 해결 과정을 기술합니다.
airflow 가이드에서의 MySqlHook 모듈을 복사하여 mysqlHook.py 파일로 dag폴더에 넣었습니다.
필요한 requirements.txt는 아래와 같습니다.
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.4.3/constraints-3.10.txt"
apache-airflow-providers-common-sql
pymysql
from mysqlHook import MySqlHook
from airflow.operators.python import PythonOperator
from alert import on_failure
from date_handler import get_report_date
default_args = {
'owner' : 'airflow',
'start_date': datetime(2022, 1, 26),
'on_failure_callback': on_failure(conn_id="messenger_url")
}
default_op_args = {
'account_id' : 'all',
'start_date' : 1,
'end_date' : 1
}
dag = DAG(
dag_id="data_collector",
default_args=default_args,
schedule_interval='00 04 * * *',
tags=["collector"],
catchup=False
)
start_task = DummyOperator(task_id='start', dag=dag)
end_task = DummyOperator(task_id='end', dag=dag)
def fetch_records(**context):
account_id = ""
start_date = ""
end_date = ""
report = []
dag_run: DagRun = context['dag_run']
for env_var in ["account_id", "start_date", "end_date"]:
# dag_run.conf 값이 있을 경우 해당 값 사용
if dag_run.conf is not None and dag_run.conf.__contains__(env_var):
if env_var == "account_id":
account_id = dag_run.conf[env_var]
elif env_var == "start_date":
start_date = dag_run.conf[env_var]
elif env_var == "end_date":
end_date = dag_run.conf[env_var]
# dag_run.conf 값이 없을 경우 default_args 값 사용
else:
if env_var == "account_id":
account_id = context[env_var]
elif env_var == "start_date":
start_date = context[env_var]
elif env_var == "end_date":
end_date = context[env_var]
request = '''
SELECT a.acnt_id
FROM collector.tb_acnt a
INNER JOIN collector.tb_media b
ON a.media_acnt_id = b.media_acnt_id
WHERE b.media_cd = '1'
'''
if account_id != "all":
request += f"and a.media_acnt_id = '{account_id}'"
mysql_hook = MySqlHook(mysql_conn_id = 'collector-api-info')
conn = mysql_hook.get_conn()
cursor = conn.cursor()
cursor.execute(request)
accounts = {"account_id" : [item[0] for item in cursor.fetchall()]}
print(f"accounts : {accounts}")
op_vars = {}
op_vars["account_id"] = accounts["account_id"]
op_vars["start_date"] = start_date
op_vars["end_date"] = end_date
env_js = json.dumps(op_vars)
Variable.set('collector_env_vars', env_js)
conn.close()
accounts_db_call = PythonOperator(
task_id = 'accounts_db_call',
python_callable = fetch_records,
provide_context=True,
op_kwargs = default_op_args,
dag = dag
)