지금까지 dag를 만들어 Airflow에 업로드 하는 과정도 작업을 진행해보았고 dags에 업로드할 파이썬 코드를 직접 구현해보기도 했다.
오늘은 이어서 airflow를 사용하는 가장 큰 이유 중 하나인 backfill에 대해 학습을 진행하고자 한다.
그전에 Airflow 타임존에 대해 살펴보면 다음과 같다.
- airflow.cfg에는 default_timezone, default_ui_timezon으로 2 종류가 존재.
- start_date, end_date, schedule -> default_timezone을 따름
- 웹 서버 UI 상 타임존(webserver 간 타임 표시) -> default_ui_timezone
- execution_date와 로그시간은 항상 UTC를 따르므로 타임존 고려해서 변환 해 사용할 필요 있음.
v1_OpenWeather API를 이용해 오늘로부터 향후 8일의 시간별 날씨 정보 가져옴
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
import requests
import logging
import json
from datetime import datetime
from datetime import timedelta
# autocommit is False by default
def get_Redshift_connection():
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
return hook.get_conn().cursor()
@task
def etl(schema, table):
api_key = Variable.get("open_weather_api_key")
# 서울의 위도/경도
lat = 37.5665
lon = 126.9780
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
response = requests.get(url)
data = json.loads(response.text)
ret = []
print("check data : ", data)
# daily에는 향후 8일간 날씨 정보 들어감
for d in data["daily"]:
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') # epoch의 형식 변환
ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_connection()
drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
CREATE TABLE {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
"""
insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret) # 한 번에 insert함
logging.info(drop_recreate_sql)
logging.info(insert_sql)
# 예외처리 (raise 잊지말기)
try:
cur.execute(drop_recreate_sql)
cur.execute(insert_sql)
cur.execute("Commit;")
except Exception as e:
cur.execute("Rollback;")
raise
with DAG(
dag_id = 'Weather_to_Redshift',
start_date = datetime(2023,5,30),
schedule = '0 2 * * *',
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
) as dag:
etl("taejun3305", "weather_forecast")
테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드로 보통 create table 구문 사용 시 지정하게 된다.
일반적으로 사용되는 primary key로는 email이나 product_id 등이 사용된다.
create table 구문에서는 다음과 같이 예시로 사용된다.
CREATE TABLE table (
product_id VARCHAR(50) PRIMARY KEY,
name VARCHAR(50),
);
# 2개의 필드를 primary ket로 지정하는 경우
CREATE TABLE table_a (
user_id INT,
product_id VARCHAR(50),
PRIMARY KEY (user_id, product_id)
# 아래는 안적어도 되긴 함. (그러나 가독성을 위해 작성하면 good)
FOREIGN KEY (product_id) REFERENCES table (product_id)
);
그러나 일반적으로 빅데이터 기반 DW들은 Primary key를 보장하지 않는다.
- primary key 기준으로 유일성 보장 X (보장 시 메모리, 시간이 더 드는 비효율성 문제 발생)
-> 데이터 엔지니어가 직접 이를 구현할 줄 알아야 함. (필드 별 유니크 저장 등)- 자체적인 필드 생성(ROW_NUMBER 등)으로 유니크한 필드 생성
- staging table 생성으로 원본 유지 必
🎈 Upsert
- Primary key 기준으로 존재하는 레코드인 경우 새 정보로 수정
- 존재하지 않는 레코드인 경우 새 레코드로 적재
- 보통 DW마다 UPSERT 작업을 효율적으로 해주는 문법을 지원해줌.
구현한 코드를 바탕으로 설정한 schedule에 따라 작업이 매번 실행될텐데 어느 시점만 실행되지 않았다면 문제가 발생할 수 있다. 이러한 문제는 airflow를 통해 효과적으로 해결할 수 있다.
Incremental Update의 경우 Full Refresh에 비해 효율성이 더 좋을 수 있지만 유지보수의 난이도가 올라가게 된다.
이는 어쩌면 당연한 얘기일 수 있는데 은행 입출금 내역을 예시로 생각해보면 다음과 같다.
유저당 입출금 내역 정보들은 기존 데이터가 절대로 변경되서는 안되므로 새로운 데이터에 한해 Insert 작업만 처리하면 되므로 Incremental Update가 가능하다. 하지만 유저정보만을 보면 주소나 휴대폰 번호가 변경, 회원 탈퇴의 경우 기존 데이터를 update, delete 작업을 거쳐야 하므로 Full Refresh를 통해 처리할 수 있다. 이때 새로운 데이터를 단순 Insert하는 과정에 있어 Incremental Update의 경우 schedule의 주기가 상당히 중요해지게 된다.앞서 이야기 한대로 엔지니어가 구축한 인프라 내 파이프라인을 거치며 데이터가 이동하게 되는데 실행되지 않은 특정 시점의 데이터를 처리해주기 위해선 backfill이 반드시 필요하다.
- backfill : 실패한 데이터 처리로 인해 다시 읽어와야 하는 경우
-> Full Refresh인 경우 다시 실행하면 되지만, Incremental의 경우 복잡한 backfill이 된다.
💯 이때 파이프라인의 재실행이 상당히 용이한 framework가 우리가 지금 학습하고 있는 Airflow이다.
일반적으로 daily DAG의 경우 지금 시간 기준으로 어제 날짜를 계산해 어제에 해당하는 데이터를 읽게 된다.
< 코드 예시 >from datetime import datetime, timedelta y = datetime.now() - timedelta(1) yesterday = datetime.strftime(y, '%Y-%m-%d') sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
그러나, 만약 1년치 데이터를 backfill해야 한다면 위 작업을 365번 루프 실행해야 하는 문제가 발생해 error가 발생하거나 정합성이 깨질 수 있다.
-> 따라서 결국 dag 작성 시 backfill을 고려하여 코드를 구현하는 것이 상당히 중요하다
Airflow에서 해결하는 방법은 다음과 같다.
1. execution_date (시스템적으로 구현) -> 시스템이 지정해준 날짜를 사용하기
-> 즉, backfill을 쉽게 하도록 해당 dag가 실행되는 시점을 결과로 나오도록 구현하여 에러 발생한 시점 확실히 하자는 뜻이다.
2. daily incremental update 구현 시 하루 전날 기준 데이터 작동함.
-> 즉, 2023년 1월 1일에 dag를 실행하면 2022년 12월 31일 데이터를 이용하게 된다는 의미로 start_date은 DAG 실행 날짜 전날로 데이터를 처음 읽어오는 시점이라는 것을 뜻하고 실제 첫 실행된 날짜(시점)는 start_date + DAG Schedule을 뜻한다.🎈 catchup 상세 해설
DAG생성 시 변수인 catchup은 default로 True를 가진다.
daily DAG ("0 * * * *")에서 catchup이 True라는 의미는 start_date이 만일 2023 5월 30일이고 6월 5일 오전 9시에 해당 DAG를 활성화시켰다면 execution_date은 5월 31일, 6월 1일, 6월 2일, 6월 3일, 6월 4일, 6월 5일로 총 6번 실행될 것이다.
yahoo finance API로부터 주식 정보 가져와 DAG 생성하기
단 Primary key Uniqueness 보장하지 않는 상황에서 테이블 생성하는 방법
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp
import yfinance as yf
import pandas as pd
import logging
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
@ task
def get_historical_prices(symbol):
ticket = yf.Ticker(symbol)
data = ticket.history()
records = []
for index, row in data.iterrows():
date = index.strftime('%Y-%m-%d %H:%M:%S')
records.append([date, row['Open'], row['High'], row['Low'], row['Close'], row['Volume']])
return records
def create_table(cur, schema, table, drop_first):
if drop_first:
cur.execute(f"DROP TABLES IF EXISTS {schema}.{table};")
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
date date,
"open" float,
high float,
low float,
close float,
volume bigint,
created_date timestamp DEFAULT GETDATE()
);""")
@task
def load(schema, table, records):
logging.info('load started')
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
# 원본 테이블 없으면 생성
_create_table(cur, schema, table, False)
# 임시 테이블을 원본테이블로 복사
cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")
for r in records:
sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
print(sql)
cur.execute(sql)
# 임시테이블을 원본테이블블로 복사
cur.execute(f"DELETE FROM {schema}.{table}")
cur.execute(f"""
INSERT INTO {schema}.{table}
SELECT date, "open", high, low, close, volume FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
HWERE seq = 1; """)
cur.execute("COMMIT;")
except Exception as error:
print(error)
cur.execute("ROLLBACK;")
raise
logging.info('load finished')
with DAG(
dag_id = 'UpdateSymbol_v3',
start_date = datetime(2022, 5, 30),
catchup=False,
tags=['API'],
schedule='0 10 * * *'
) as dag:
results = get_historical_prices("AAPL")
load("taejun3305", 'AAPL_Stock_Info_v3', results)