
Airflow의 다양한 고급 기능과 CI/CD 환경 설정에 대해 학습 (1)
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime
from datetime import timedelta
from airflow import AirflowException
import requests
import logging
import psycopg2
from airflow.exceptions import AirflowException
def get_Redshift_connection():
hook = PostgresHook(postgres_conn_id = 'redshift_dev_db')
return hook.get_conn().cursor()
def execSQL(**context):
schema = context['params']['schema']
table = context['params']['table']
select_sql = context['params']['sql']
logging.info(schema)
logging.info(table)
logging.info(select_sql)
cur = get_Redshift_connection()
sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table};CREATE TABLE {schema}.temp_{table} AS """
sql += select_sql
cur.execute(sql)
cur.execute(f"""SELECT COUNT(1) FROM {schema}.temp_{table}""")
count = cur.fetchone()[0]
if count == 0:
raise ValueError(f"{schema}.{table} didn't have any record")
try:
sql = f"""DROP TABLE IF EXISTS {schema}.{table};ALTER TABLE {schema}.temp_{table} RENAME to {table};"""
sql += "COMMIT;"
logging.info(sql)
cur.execute(sql)
except Exception as e:
cur.execute("ROLLBACK")
logging.error('Failed to sql. Completed ROLLBACK!')
raise AirflowException("")
dag = DAG(
dag_id = "Build_Summary",
start_date = datetime(2021,12,10),
schedule = '@once',
catchup = False
)
execsql = PythonOperator(
task_id = 'mau_summary',
python_callable = execSQL,
params = {
'schema' : 'keeyong',
'table': 'mau_summary',
'sql' : """SELECT
TO_CHAR(A.ts, 'YYYY-MM') AS month,
COUNT(DISTINCT B.userid) AS mau
FROM raw_data.session_timestamp A
JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid
GROUP BY 1
;"""
},
dag = dag
)
비슷하게 PythonOperator를 만들고 아래처럼 params 파라미터를 설정
params = {
'schema' : 'keeyong',
'table': 'channel_summary',
'sql' : """SELECT
DISTINCT A.userid,
FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and
unbounded following) AS First_Channel,
LAST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and
unbounded following) AS Last_Channel
FROM raw_data.user_session_channel A
LEFT JOIN raw_data.session_timestamp B ON A.sessionid = B.sessionid;"""
}
.py 확장자를 가져야함) https://api.slack.com/messaging/webhooks
해당 링크를 따라 Incoming Webhooks App을 생성
Create your Slack app -> From scratch 선택 -> App의 이름(여기서는 DataAlert)을 작성하고 Workspace 선택 후 Create App -> Incoming Webhooks 선택 후 Activate Incoming Webhooks을 on으로 -> Add New Webhook to Workspace 누르고 #data-alert채널 선택 (Webhook URL Copy)
Webhook URL에서 https://hooks.slack.com/services/ 뒤의 부분은 이후에 필요하게 됨
curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!"}' 복사한 Webhook URL
-> 슬랙 채널에 Hello, World! 메세지를 보냄
https://hooks.slack.com/services/ 뒤의 부분을 “slack_url” Variable로 저장slack.py)