[Airflow] mysql hook 모듈 import

남영민·2023년 3월 8일
0

airflow 2.0.2 버전에서는 이상 없이 import 되었던 mysql 모듈이 2.4.3 버전으로 업그레이드 하자 import 에러가 발생하였습니다. 그 해결 과정을 기술합니다.

  1. airflow 가이드에서의 MySqlHook 모듈을 복사하여 mysqlHook.py 파일로 dag폴더에 넣었습니다.

  2. 필요한 requirements.txt는 아래와 같습니다.

--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.4.3/constraints-3.10.txt" 
apache-airflow-providers-common-sql
pymysql
  1. Dag 코드는 아래와 같습니다.
  • default_op_args 변수를 사용하여 기본 변수를 활용하는 방식과 dag_run.conf를 사용하여 기본 변수를 바꿔주는 방식도 포함되어 있습니다.
from mysqlHook import MySqlHook
from airflow.operators.python import PythonOperator
from alert import on_failure
from date_handler import get_report_date

default_args = {
    'owner' : 'airflow',
    'start_date': datetime(2022, 1, 26),
    'on_failure_callback': on_failure(conn_id="messenger_url")
}

default_op_args = {
    'account_id' : 'all',
    'start_date' : 1,
    'end_date' : 1
}

dag = DAG(
    dag_id="data_collector",
    default_args=default_args,
    schedule_interval='00 04 * * *',
    tags=["collector"],
    catchup=False
)

start_task = DummyOperator(task_id='start', dag=dag)
end_task = DummyOperator(task_id='end', dag=dag)

def fetch_records(**context):
    account_id = ""
    start_date = ""
    end_date = ""
    report = []

    dag_run: DagRun = context['dag_run']
    for env_var in ["account_id", "start_date", "end_date"]:
    	# dag_run.conf 값이 있을 경우 해당 값 사용
        if dag_run.conf is not None and dag_run.conf.__contains__(env_var):
            if env_var == "account_id":
                account_id = dag_run.conf[env_var]
            elif env_var == "start_date":
                start_date = dag_run.conf[env_var]
            elif env_var == "end_date":
                end_date = dag_run.conf[env_var]
        # dag_run.conf 값이 없을 경우 default_args 값 사용
        else:
            if env_var == "account_id":
                account_id = context[env_var]
            elif env_var == "start_date":
                start_date = context[env_var]
            elif env_var == "end_date":
                end_date = context[env_var]

    request = '''
        SELECT a.acnt_id
        FROM collector.tb_acnt a
        INNER JOIN collector.tb_media b
        ON a.media_acnt_id = b.media_acnt_id
        WHERE b.media_cd = '1'
        '''
    if account_id != "all":
        request += f"and a.media_acnt_id = '{account_id}'"
    mysql_hook = MySqlHook(mysql_conn_id = 'collector-api-info')
    conn = mysql_hook.get_conn()
    cursor = conn.cursor()
    cursor.execute(request)
    accounts = {"account_id" : [item[0] for item in cursor.fetchall()]}
    print(f"accounts : {accounts}")

    op_vars = {}
    op_vars["account_id"] = accounts["account_id"]
    op_vars["start_date"] = start_date
    op_vars["end_date"] = end_date

    env_js = json.dumps(op_vars)

    Variable.set('collector_env_vars', env_js)
    conn.close()
    
accounts_db_call = PythonOperator(
    task_id = 'accounts_db_call',
    python_callable = fetch_records,
    provide_context=True,
    op_kwargs = default_op_args,
    dag = dag
)
profile
성장하는 개발자

0개의 댓글