주요 정보 : https://hongcana.tistory.com/122
airflow.cfg에는 두 종류의 타임존 관련 키가 존재함.
start_date, end_date, schedule은 default_timezone에 지정된 타임존을 따라감.
execution_date와 로그 시간
가장 좋은 방법은 UTC로 일관하여 사용하는 것.
Airflow는 dags 폴더를 주기적으로 스캔하는데 DAG 모듈이 있는 모든 파일들의 메인 함수가 실행되며, 본의 아니게 테스트 코드도 실행될 수 있음.
# 5분 마다 테이블 삭제.........
from airflow import DAG
...
cur.execute("DELETE FROM ...")
https://restcountries.com/v3/all 에서 나라들의 정보를 얻어 ETL 프로세스를 적용해 보는 것이 실습 내용.
- Full Refresh로 구현.
- API 결과에서 아래 다음 3개의 정보를 추출하여 Redshift에 각자 스키마 밑에 테이블 생성. ( 1.country->["name"]["official"], 2.population->["population"], 3.area->["area"] )
- 해당 DAG가 UTC로 매주 토요일 오전 06:30에 실행되도록. (참고 자료: https://velog.io/@jskim/Airflow-%EC%8A%A4%EC%BC%80%EC%A4%84%EB%A7%81-%ED%8C%8C%ED%97%A4%EC%B9%98%EA%B8%B0-1%ED%8E%B8)
- 이를 개인 repo에 만들기.
내가 쓴 코드.
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
import pandas as pd
import logging
import requests
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
def _create_table(cur, schema, table, drop_first):
if drop_first:
cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
country TEXT, # primary key가 빠짐
population TEXT, # INT로 변환해야 함.
area TEXT # FLOAT으로 변환해야 함.
);""")
@task
def extract():
logging.info(datetime.utcnow())
data = requests.get("https://restcountries.com/v3/all").json()
return data.text
@task
def transform(data):
logging.info("transform started")
records = []
for i in range(len(data)):
records.append([data[i]["name"]["official"], data[i]["population"], data[i]["area"]])
return records
@task
def load(schema, table, records):
logging.info("load started")
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
# 원본 테이블이 없으면 생성 - 테이블이 처음 한번 만들어질 때 필요한 코드
_create_table(cur, schema, table, False)
# 임시 테이블로 원본 테이블을 복사
cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")
for r in records:
sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]});"
print(sql)
cur.execute(sql)
# 원본 테이블 생성
_create_table(cur, schema, table, True)
# 임시 테이블 내용을 원본 테이블로 복사
cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;")
cur.execute("COMMIT;") # cur.execute("END;")
except Exception as error:
print(error)
cur.execute("ROLLBACK;")
raise
logging.info("load done")
with DAG(
dag_id = 'CountryInfo',
start_date = datetime(2023,5,30),
catchup=False,
tags=['API'],
schedule = '30 6 * * SAT' # 매 주 토요일 오전 06:30
) as dag:
data = extract()
records = transform(data)
load("jaeho", "country_info", records)
https://openweathermap.org/price 에서 무료로 API를 받아서 (위도, 경도)로 해당 위치의 기후 정보 얻기.
- 서울 향후 8일 간의 낮/최소 및 최대 온도 읽기.
- API Key를 open_weather_api_key라는 Variable로 저장.
- 처음에 Full Refresh로 작성.
- 이를 Incremental Update 방식으로 수정하기.
- API URL :
https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude={part}&appid={API key}
테이블 정의
CREATE TABLE jaeho.weather_forecast(
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
Full Refresh 버전(autocommit=False). Weather_to_Redshift.py
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
from datetime import datetime
from datetime import timedelta
import requests
import logging
import json
def get_Redshift_connection():
# autocommit is False by default
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
return hook.get_conn().cursor()
@task
def etl(schema, table):
api_key = Variable.get("open_weather_api_key")
# 서울의 위도/경도
lat = 37.5665
lon = 126.9780
# https://openweathermap.org/api/one-call-api
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
response = requests.get(url)
data = json.loads(response.text)
"""
{'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
"""
ret = []
for d in data["daily"]:
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_connection()
drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
CREATE TABLE {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
"""
insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret)
logging.info(drop_recreate_sql)
logging.info(insert_sql)
try:
cur.execute(drop_recreate_sql)
cur.execute(insert_sql)
cur.execute("Commit;")
except Exception as e:
cur.execute("Rollback;")
raise
with DAG(
dag_id = 'Weather_to_Redshift',
start_date = datetime(2023,5,30), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 매일 02:00에 실행됨.
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
) as dag:
etl("jaeho", "weather_forecast")
```
데이터 웨어하우스에서 Primary Key Uniqueness를 보장하는 방법을 알아봄. (일반적으로 데이터 웨어하우스는 PK Uniqueness를 보장하지 않음.)
먼저 Primary Key Uniqueness란, 테이블에서 하나의 레코드를 유일하게 식별할 수 있는 필드(들)를 의미함.
빅데이터 기반 데이터 웨어하우스들이 PK를 지키지 않는 이유.
- 데이터가 많다 보니, 이를 보장하기 위한 메모리와 시간이 많이 소요되기 때문에 대용량 데이터의 적재에 걸림돌이 됨. (이를 보장하는 것은 데이터 인력(엔지니어, 분석가)의 역량임. ETL 혹은 ELT 프로세스에서 처리.)
PK 유지 방법.
- 날씨 정보의 경우 최근 정보가 신뢰성이 더 높으니, 중복 레코드에 대해서 created_date 필드값이 더 최근인 레코드를 선택함.
- SQL 문법 중 ROW_NUMBER를 활용할 수 있음.
예시) date / created_date / temp 의 필드를 갖는 레코드.
ROW_NUMBER() OVER(partition by date order by created_date DESC) seq
로 같은 date 중 created_date가 최근인(seq=1인) 레코드를 선택 가능함.
PK 유지 방법 구체화.
- 임시 테이블(스테이징 테이블)을 만들고 현재 모든 레코드를 복사.
- 임시 테이블에 새롭게 읽은 데이터 소스 레코드 복사. (중복 존재 가능)
- 중복을 걸러주는 SQL 작성. (위처럼 ROW_NUMBER() 활용 등)
- 위의 SQL의 결과를 최종 원본 테이블에 복사. (원본 테이블 레코드 삭제 -> 임시 테이블 값 복사)
weather_forecast 버전(autocommit=False). Weather_to_Redshift_v2.py
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from datetime import timedelta
import requests
import logging
import json
def get_Redshift_connection():
# autocommit is False by default
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
return hook.get_conn().cursor()
@task
def etl(schema, table, lat, lon, api_key):
# https://openweathermap.org/api/one-call-api
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
response = requests.get(url)
data = json.loads(response.text)
"""
{'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
"""
ret = []
for d in data["daily"]:
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_connection()
# 원본 테이블이 없다면 생성
create_table_sql = f"""CREATE TABLE IF NOT EXISTS {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);"""
logging.info(create_table_sql)
# 임시 테이블 생성
create_t_sql = f"""CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};"""
logging.info(create_t_sql)
try:
cur.execute(create_table_sql)
cur.execute(create_t_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
# 임시 테이블 데이터 입력
insert_sql = f"INSERT INTO t VALUES " + ",".join(ret)
logging.info(insert_sql)
try:
cur.execute(insert_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
# 기존 테이블 대체
alter_sql = f"""DELETE FROM {schema}.{table};
INSERT INTO {schema}.{table}
SELECT date, temp, min_temp, max_temp FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;"""
logging.info(alter_sql)
try:
cur.execute(alter_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
with DAG(
dag_id = 'Weather_to_Redshift_v2',
start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 4 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
) as dag:
etl("jaeho", "weather_forecast_v2", 37.5665, 126.9780, Variable.get("open_weather_api_key"))
Upsert란, PK를 기준으로 중복되는 레코드가 존재한다면 새 정보로 수정하고, 아니면 새 레코드로 적재하는 것을 의미함. (앞의 과정을 Upsert라고 함.)
- 보통 데이터 웨어하우스마다 본인만의 UPSERT 문법이 존재함.
본 내용은 Incremental Update 시에만 의미가 있는 내용임.
Backfill의 용이성 여부 -> 데이터 엔지니어의 업무 능률을 좌우함.
- Backfill : 실패한 데이터 파이프라인을 재실행 혹은 읽어온 데이터를 다시 다 읽어오는 것.
- Backfill은 Full Refresh에서는 간단하지만 Incremental Update에선 복잡함.
- 즉, 실패한 데이터 파이프라인의 재실행이 얼마나 용이한 구조인가가 중요하며, 잘 디자인된 대표적인 예가 Airflow임.
그렇다면 어떤 식으로 ETL을 구현해야 좋을까?
- 날짜별로 backfill 결과를 기록하고 성공 여부를 기록.
- Airflow의 경우, ETL 별로 실행 날짜와 결과를 메타데이터 데이터베이스에 기록함.
- 따라서 DAG 실행 시에 저장되는 execution_date를 바탕으로 데이터를 갱신하도록 코드를 작성해야함.