[프로그래머스] 데브코스 데이터엔지니어링 TIL Day 51

주재민·2024년 1월 1일
0
post-thumbnail

📖 학습주제

Airflow의 다양한 고급 기능과 CI/CD 환경 설정에 대해 학습 (1)


Summary 테이블 구현

간단한 DAG 구현

Build_Summary.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime
from datetime import timedelta

from airflow import AirflowException

import requests
import logging
import psycopg2

from airflow.exceptions import AirflowException

def get_Redshift_connection():
    hook = PostgresHook(postgres_conn_id = 'redshift_dev_db')
    return hook.get_conn().cursor()


def execSQL(**context):

    schema = context['params']['schema'] 
    table = context['params']['table']
    select_sql = context['params']['sql']

    logging.info(schema)
    logging.info(table)
    logging.info(select_sql)

    cur = get_Redshift_connection()

    sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table};CREATE TABLE {schema}.temp_{table} AS """
    sql += select_sql
    cur.execute(sql)

    cur.execute(f"""SELECT COUNT(1) FROM {schema}.temp_{table}""")
    count = cur.fetchone()[0]
    if count == 0:
        raise ValueError(f"{schema}.{table} didn't have any record")

    try:
        sql = f"""DROP TABLE IF EXISTS {schema}.{table};ALTER TABLE {schema}.temp_{table} RENAME to {table};"""
        sql += "COMMIT;"
        logging.info(sql)
        cur.execute(sql)
    except Exception as e:
        cur.execute("ROLLBACK")
        logging.error('Failed to sql. Completed ROLLBACK!')
        raise AirflowException("")


dag = DAG(
    dag_id = "Build_Summary",
    start_date = datetime(2021,12,10),
    schedule = '@once',
    catchup = False
)

execsql = PythonOperator(
    task_id = 'mau_summary',
    python_callable = execSQL,
    params = {
        'schema' : 'keeyong',
        'table': 'mau_summary',
        'sql' : """SELECT 
  TO_CHAR(A.ts, 'YYYY-MM') AS month,
  COUNT(DISTINCT B.userid) AS mau
FROM raw_data.session_timestamp A
JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid
GROUP BY 1 
;"""
    },
    dag = dag
)

사용자별 channel 정보 요약

비슷하게 PythonOperator를 만들고 아래처럼 params 파라미터를 설정

params = {
 'schema' : 'keeyong',
 'table': 'channel_summary',
 'sql' : """SELECT
 DISTINCT A.userid,
 FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and
unbounded following) AS First_Channel,
 LAST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and
unbounded following) AS Last_Channel
 FROM raw_data.user_session_channel A
 LEFT JOIN raw_data.session_timestamp B ON A.sessionid = B.sessionid;"""
 }

CTAS 부분을 아예 별도의 환경설정 파일로

  • 환경 설정 중심의 접근 방식
    - config 폴더를 생성
    - 그 안에 써머리 테이블별로 하나의 환경설정 파일 생성 (파이썬 dictionary 형태로 유지할 것이라 .py 확장자를 가져야함)
  • 이렇게 하면 비개발자들이 사용할 때 어려움을 덜 느끼게 됨
  • 그러면서 더 다양한 테스트를 추가

Slack 연동

어느 Workspace의 어느 Channel로 보낼 것인지 결정

  • 임의의 Workspace 밑에 DataAlert이라는 App 생성
  • 이 App이 #data-alert이라는 채널에 메세지를 보낼 수 있게 설정

데이터 파이프라인 문제를 슬랙에 표시

https://api.slack.com/messaging/webhooks
해당 링크를 따라 Incoming Webhooks App을 생성

Create your Slack app -> From scratch 선택 -> App의 이름(여기서는 DataAlert)을 작성하고 Workspace 선택 후 Create App -> Incoming Webhooks 선택 후 Activate Incoming Webhooks을 on으로 -> Add New Webhook to Workspace 누르고 #data-alert채널 선택 (Webhook URL Copy)

Webhook URL에서 https://hooks.slack.com/services/ 뒤의 부분은 이후에 필요하게 됨

앞서 Webhook으로 메세지 보내기

curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!"}' 복사한 Webhook URL
-> 슬랙 채널에 Hello, World! 메세지를 보냄

데이터 파이프라인 실패/경고를 슬랙으로 보내는 방법

  • Webhook URL에서 https://hooks.slack.com/services/ 뒤의 부분을 “slack_url” Variable로 저장
  • slack에 에러 메세지를 보내는 별도 모듈로 개발(slack.py)
  • 이를 DAG 인스턴스를 만들 때 에러 콜백으로 지정

0개의 댓글