[5/23] TIL - DAG 실습, Airflow Backfill

Sangwon Jwa·2024년 5월 25일

데브코스 TIL

목록 보기
35/54
post-thumbnail

📖 학습 주제


  1. Open Weathermap API DAG
  2. Primary Key Uniqueness 보장하기
  3. Airflow Backfill

✏️ 주요 메모 사항 소개


Open Weather DAG

DAG 작성 실습을 더 해보기 위해서 이번엔 Open Weathermap API를 이용하여 DAG를 작성해보자.

  • Open Weathermap API : 위도 / 경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스, 무료 계정으로 api key를 받아서 이를 호출 시에 사용하면 된다.
    https://openweathermap.org/price

우리가 만드려는 DAG는 서울 8일 낮/최소/최대 온도를 가져오는 DAG를 만드려고 한다. 절차는 다음과 같다.

  1. API Key를 open_weather_api_key 라는 Variable로 저장
  2. 서울의 위도와 경도 찾기
  3. One-Call API를 사용 : https://openweathermap.org/api/one-call-api
    • 앞서 API KEY와 서울의 위도/경도를 사용해서 위의 API를 requests 모듈을 사용해서 호출
    • 응답 결과에서 온도 정보(평균/최소/최대)만 앞으로 7일을 대상으로 출력
      • 날짜, 낮 온도(day), 최소 온도(min), 최대 온도(max)
request_url = https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude={part}&appid={APIkey}&units=metric

1. Variable 추가

위의 Open Weather 링크에서 회원가입을 진행하고 API key를 발급받은 뒤 Airflow에서 Variable을 추가하자. Variable 이름은 open_weather_api_key로 만들어 주자.

2. 전체 코드

from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task

from datetime import datetime
from datetime import timedelta

import requests
import logging
import json


def get_Redshift_connection():
    # autocommit is False by default
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    return hook.get_conn().cursor()

@task
def etl(schema, table):
    api_key = Variable.get("open_weather_api_key")
    # 서울의 위도/경도
    lat = 37.5665
    lon = 126.9780

    # https://openweathermap.org/api/one-call-api
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
    response = requests.get(url)
    data = json.loads(response.text)
    """
    {'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
    """
    ret = []
    for d in data["daily"]:
        day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
        ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))

    cur = get_Redshift_connection()
    drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
CREATE TABLE {schema}.{table} (
    date date,
    temp float,
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
);
"""
    insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret)
    logging.info(drop_recreate_sql)
    logging.info(insert_sql)
    try:
        cur.execute(drop_recreate_sql)
        cur.execute(insert_sql)
        cur.execute("Commit;")
    except Exception as e:
        cur.execute("Rollback;")
        raise

with DAG(
    dag_id = 'Weather_to_Redshift',
    start_date = datetime(2023,5,30), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
) as dag:

    etl("jwa4610", "weather_forecast")

Primary Key Uniqueness 보장하기

primary key란 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드를 말한다. 보통 이를 CREATE TABLE 사용 시 지정한다. 관계형 DB에서는 Primary Key의 값이 중복 존재하는 것을 시스템적으로 막아주지만, 데이터 웨어하우스 시스템에서는 이를 보장해주지 않는다. 그 이유는 보장하는데 메모리와 시간이 더 들기 때문에 대용량 데이터의 적재가 걸림돌이 되기 때문이다.

앞서 만든 날씨 정보 테이블을 보자.

CREATE TABLE {schema}.{table} (
    date date,
    temp float,
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
);
  • 날씨 정보이기 대문에 최근 정보가 더 신뢰할 수 있다.
  • 그래서 어느 정보가 더 최근 정보인지를 created_date 필드에 기록하고 이를 활용하면 된다.
  • 즉 date이 같은 레코드들이 있다면 created_date를 기준으로 더 최근 정보를 선택하면 된다.

uniqueness를 보장하기 위해 ROW_NUMBER를 사용해서 테이블을 구성하자.

작업들을 다시 나열하면

  1. 이렇게 임시 테이블(스테이징 테이블)을 만들고 거기로 현재 모든 레코드를 복사
  2. 임시 테이블에 새로 데이터소스에서 읽어들인 레코드들을 복사 (이 때 중복 존재 가능)
  3. 중복을 걸러주는 SQL 작성
    • 최신 레코드를 우선 순위로 선택
    • ROW_NUMBER를 이용해서 primary key로 partition을 잡고 적당한 다른 필드(timestamp)로 ordering(DESC)을 수행해 primary key별로 하나의 레코드를 잡아냄
  4. 위의 SQL을 바탕으로 최종 원본 테이블로 복사
    • 이때 원본 테이블에서 레코드들을 삭제
    • 임시 temp 테이블을 원본 테이블로 복사

이렇게 존재하는 Primary Key를 기준으로 존재하는 레코드라면 새 정보롤 수정하거나, 존재하지 않는 레코드라면 새 레코드로 적재하는 과정을 UPSERT라고 한다. 보통 데이터 웨어하우스마다 UPSERT를 효율적으로 해주는 문법을 지원한다.


전체 코드

from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook

from datetime import datetime
from datetime import timedelta

import requests
import logging
import json


def get_Redshift_connection():
    # autocommit is False by default
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    return hook.get_conn().cursor()


@task
def etl(schema, table, lat, lon, api_key):

    # https://openweathermap.org/api/one-call-api
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
    response = requests.get(url)
    data = json.loads(response.text)

    """
    {'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
    """
    ret = []
    for d in data["daily"]:
        day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
        ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))

    cur = get_Redshift_connection()
    
    # 원본 테이블이 없다면 생성
    create_table_sql = f"""CREATE TABLE IF NOT EXISTS {schema}.{table} (
    date date,
    temp float,
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
);"""
    logging.info(create_table_sql)

    # 임시 테이블 생성
    create_t_sql = f"""CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};"""
    logging.info(create_t_sql)
    try:
        cur.execute(create_table_sql)
        cur.execute(create_t_sql)
        cur.execute("COMMIT;")
    except Exception as e:
        cur.execute("ROLLBACK;")
        raise

    # 임시 테이블 데이터 입력
    insert_sql = f"INSERT INTO t VALUES " + ",".join(ret)
    logging.info(insert_sql)
    try:
        cur.execute(insert_sql)
        cur.execute("COMMIT;")
    except Exception as e:
        cur.execute("ROLLBACK;")
        raise

    # 기존 테이블 대체
    alter_sql = f"""DELETE FROM {schema}.{table};
      INSERT INTO {schema}.{table}
      SELECT date, temp, min_temp, max_temp FROM (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
        FROM t
      )
      WHERE seq = 1;"""
    logging.info(alter_sql)
    try:
        cur.execute(alter_sql)
        cur.execute("COMMIT;")
    except Exception as e:
        cur.execute("ROLLBACK;")
        raise


with DAG(
    dag_id = 'Weather_to_Redshift_v2',
    start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 4 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
) as dag:

    etl("jwa4610", "weather_forecast_v2", 37.5665, 126.9780, Variable.get("open_weather_api_key"))

Airflow Backfill

관리하는 데이터 파이프라인의 수가 늘어나면 이 중의 몇은 항상 실패하게 되며 이를 어떻게 관리하느냐가 중요하다. 가능하면 Full Refresh를 사용하는 것이 좋지만, Incremental Update 시 실수등으로 데이터가 빠지는 일이 생길 수 있고, 과거 데이터를 다시 다 읽어와야하는 경우 다시 모두 재실행을 해주어야할 상황이 발생할 수 있다.

Backfill 이란 실패한 데이터 파이프라인을 재실행 혹은 읽어온 데이터들의 문제로 다시 다 읽어와야하는 경우를 의미한다. Backfill 해결은 Incremental Update에서 복잡해지는데, Airflow에서는 실패한 데이터 파이프라인의 재실행이 잘 디자인되어 있기 때문에 Backfill 문제에서 용이하다.

Airflow의 접근방식은 다음과 같다

  • ETL 별로 실행날짜와 결과를 메타 데이터 데이터베이스에 기록
  • 모든 DAG 실행에는 "execution_date"가 지정되어 있음
    • execution_date으로 채워야하는 날짜와 시간이 넘어옴
  • 이를 바탕으로 데이터를 갱신하도록 코드를 작성해야함
  • 이렇게 하면 backfill이 쉬워진다.

0개의 댓글