
- Open Weathermap API DAG
- Primary Key Uniqueness 보장하기
- Airflow Backfill
DAG 작성 실습을 더 해보기 위해서 이번엔 Open Weathermap API를 이용하여 DAG를 작성해보자.
- Open Weathermap API : 위도 / 경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스, 무료 계정으로 api key를 받아서 이를 호출 시에 사용하면 된다.
https://openweathermap.org/price
우리가 만드려는 DAG는 서울 8일 낮/최소/최대 온도를 가져오는 DAG를 만드려고 한다. 절차는 다음과 같다.
request_url = https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude={part}&appid={APIkey}&units=metric
위의 Open Weather 링크에서 회원가입을 진행하고 API key를 발급받은 뒤 Airflow에서 Variable을 추가하자. Variable 이름은 open_weather_api_key로 만들어 주자.

from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
from datetime import datetime
from datetime import timedelta
import requests
import logging
import json
def get_Redshift_connection():
# autocommit is False by default
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
return hook.get_conn().cursor()
@task
def etl(schema, table):
api_key = Variable.get("open_weather_api_key")
# 서울의 위도/경도
lat = 37.5665
lon = 126.9780
# https://openweathermap.org/api/one-call-api
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
response = requests.get(url)
data = json.loads(response.text)
"""
{'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
"""
ret = []
for d in data["daily"]:
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_connection()
drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
CREATE TABLE {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
"""
insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret)
logging.info(drop_recreate_sql)
logging.info(insert_sql)
try:
cur.execute(drop_recreate_sql)
cur.execute(insert_sql)
cur.execute("Commit;")
except Exception as e:
cur.execute("Rollback;")
raise
with DAG(
dag_id = 'Weather_to_Redshift',
start_date = datetime(2023,5,30), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
) as dag:
etl("jwa4610", "weather_forecast")
primary key란 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드를 말한다. 보통 이를 CREATE TABLE 사용 시 지정한다. 관계형 DB에서는 Primary Key의 값이 중복 존재하는 것을 시스템적으로 막아주지만, 데이터 웨어하우스 시스템에서는 이를 보장해주지 않는다. 그 이유는 보장하는데 메모리와 시간이 더 들기 때문에 대용량 데이터의 적재가 걸림돌이 되기 때문이다.
앞서 만든 날씨 정보 테이블을 보자.
CREATE TABLE {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);

uniqueness를 보장하기 위해 ROW_NUMBER를 사용해서 테이블을 구성하자.

작업들을 다시 나열하면
이렇게 존재하는 Primary Key를 기준으로 존재하는 레코드라면 새 정보롤 수정하거나, 존재하지 않는 레코드라면 새 레코드로 적재하는 과정을
UPSERT라고 한다. 보통 데이터 웨어하우스마다 UPSERT를 효율적으로 해주는 문법을 지원한다.
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from datetime import timedelta
import requests
import logging
import json
def get_Redshift_connection():
# autocommit is False by default
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
return hook.get_conn().cursor()
@task
def etl(schema, table, lat, lon, api_key):
# https://openweathermap.org/api/one-call-api
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
response = requests.get(url)
data = json.loads(response.text)
"""
{'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
"""
ret = []
for d in data["daily"]:
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_connection()
# 원본 테이블이 없다면 생성
create_table_sql = f"""CREATE TABLE IF NOT EXISTS {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);"""
logging.info(create_table_sql)
# 임시 테이블 생성
create_t_sql = f"""CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};"""
logging.info(create_t_sql)
try:
cur.execute(create_table_sql)
cur.execute(create_t_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
# 임시 테이블 데이터 입력
insert_sql = f"INSERT INTO t VALUES " + ",".join(ret)
logging.info(insert_sql)
try:
cur.execute(insert_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
# 기존 테이블 대체
alter_sql = f"""DELETE FROM {schema}.{table};
INSERT INTO {schema}.{table}
SELECT date, temp, min_temp, max_temp FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;"""
logging.info(alter_sql)
try:
cur.execute(alter_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
with DAG(
dag_id = 'Weather_to_Redshift_v2',
start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 4 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
) as dag:
etl("jwa4610", "weather_forecast_v2", 37.5665, 126.9780, Variable.get("open_weather_api_key"))
관리하는 데이터 파이프라인의 수가 늘어나면 이 중의 몇은 항상 실패하게 되며 이를 어떻게 관리하느냐가 중요하다. 가능하면
Full Refresh를 사용하는 것이 좋지만,Incremental Update시 실수등으로 데이터가 빠지는 일이 생길 수 있고, 과거 데이터를 다시 다 읽어와야하는 경우 다시 모두 재실행을 해주어야할 상황이 발생할 수 있다.

Backfill 이란 실패한 데이터 파이프라인을 재실행 혹은 읽어온 데이터들의 문제로 다시 다 읽어와야하는 경우를 의미한다. Backfill 해결은 Incremental Update에서 복잡해지는데, Airflow에서는 실패한 데이터 파이프라인의 재실행이 잘 디자인되어 있기 때문에 Backfill 문제에서 용이하다.
Airflow의 접근방식은 다음과 같다
