Open Weathermap DAG 구현하기
Primary Key Uniqueness 보장하기
Backfill과 Airflow
Open Weathermap API
위도 / 경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스
무료 계정으로 API Key 받아서 호출 시 사용
결과 JSON에서 daily라는 필드에 앞으로 8일 간의 날씨 정보가 들어있음
daily는 리스트이며, 각 레코드가 하나의 날짜에 해당
날짜 정보는 dt라는 필드에 들어있으며, 아래와 같이 읽을 수 있는 날짜로 변경
datetime.fromtimestamp(d[”dt”].strftime(”%Y-%m-%d”))
temp 필드가 온도 정보를 나타냄
제작하려는 DAG: 8일 간 서울의 낮 / 최소 / 최대 온도 읽기
API Key를 open_weather_api_key라는 Airflow Variable로 저장
서울의 위도 / 경도 찾기
One-Call Api 사용
API Key와 서울의 위도 / 경도 정보를 사용해 위의 API를 requests 모듈로 호출
응답 결과에서 오늘로부터 7일 간 온도 정보(평균 / 최대 / 최소)만 출력
DAG 구현
weather_forecast 테이블 생성
CREATE TABLE weather_forecast(
date DATE PRIMARY KEY,
temp FLOAT,
min_temp FLOAT,
max_temp FLOAT,
created_date TIMESTAMP DEFAULT GETDATE()
);
One-Call API는 결과를 JSON 형태로 반환
이를 읽기 위해 requests.get 결과의 text를 JSON으로 변환해야 함
혹은, requests.get 결과 오브젝트가 제공하는 .json() 함수 사용
f_js = requests.get(link).json()
@task
def etl(schema, table):
api_key = Variable.get("open_weather_api_key")
# 서울의 위도, 경도
lat = 37.5665
lon = 126.9780
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude={part}&appid={api_key}&units=metric"
response = requests.get(url)
data = json.loads(response.text)
for d in data["daily"]:
day = datetime.fromtimestamp(d[”dt”].strftime(”%Y-%m-%d”))
ret.append("('{}', {}, {}, {})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_DB_connection()
drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
CREATE TABLE {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);"""
insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret)
logging.info(drop_recreate_sql)
logging.info(insert_sql)
try:
cur.execute(drop_recreate_sql)
cur.execute(insert_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
with DAG(
dag_id = "Weather_to_Redshift",
start_date = datetime(2023, 5, 30),
schedule = "0 2 * * *",
max_active_runs = 1,
catchup = False,
default_args = {
"retries": 1,
"retry_delay": timedelta(minutes = 3)
}
) as dag:
etl("schema_name", "weather_forecast")
Primary Key Uniqueness
관계형 DB 시스템이 PK값의 중복을 막아주는 것
빅데이터 기반 DW들은 PK Uniqueness를 보장하지 않음
보장하는데 메모리와 시간이 더 소모되므로 대용량 데이터 적재가 어려워지기 때문
데이터 인력이 이를 보장하기 위한 작업을 해야 함
PK Uniqueness 보장 방법
앞서 생성한 weather_forecast 테이블을 예로 설명
CREATE TABLE weather_forecast(
date DATE PRIMARY KEY,
temp FLOAT,
min_temp FLOAT,
max_temp FLOAT,
created_date TIMESTAMP DEFAULT GETDATE()
);
PK가 중복될 경우, PK를 기준으로 Partition한 뒤 ROW_NUMBER를 사용
ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
임시 테이블(스테이징 테이블) 사용
임시 테이블을 만들고 현재 모든 레코드 복사
CREATE TEMP TABLE t AS SELECT * FROM weather_forecast;
임시 테이블에 데이터 소스에서 새롭게 읽은 레코드 복사
중복 제거
원본 테이블에서 레코드 삭제 후 임시 테이블 복사
DELETE FROM weather_forecast;
SELECT date, temp, min_temp, max_temp
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;
Upsert란?
(PK를 기준으로) 존재하는 레코드라면 새 정보로 수정, 존재하지 않는 레코드라면 새 레코드 적재
보통 DW에서 효율적인 Upsert 문법을 지원
Incremental Update는 효율성은 더 좋을 수 있지만, 실수 등으로 인해 데이터가 누락될 수 있어 운영과 유지보수의 난이도가 올라감
실패한 데이터 파이프라인을 재실행하거나, 읽어온 데이터에 문제가 있어 다시 모두 읽어오는 Backfill 기능이 중요
Full Refresh는 그냥 다시 실행하면 되지만, Incremental Update에서는 Backfill이 복잡하기 때문에 가능하면 Full Refresh를 사용하는 것이 좋음
Airflow의 Backfill
Airflow는 강력한 Backfill 기능을 제공
접근 방식
ETL별로 실행 날짜와 결과를 메타데이터 DB에 저장
모든 DAG에 지정되어있는 execution_date를 바탕으로 데이터를 갱신하도록 코드 작성
변수
start_date: DAG가 처음 읽을 데이터의 날짜 / 시간
execution_date: DAG가 읽어와야하는 데이터의 날짜 / 시간
catchup: start_date보다 이후에 DAG를 활성화한 경우, 그 사이에 실행되지 않아 누락된 데이터를 처리할 방법을 결정
True(default): 누락된 데이터를 모두 채움
False: 누락된 데이터 무시
end_date: 보통은 필요하지 않으며, Backfill을 날짜 범위에 대해 수행하는 경우에만 필요
비용이 비싼 쿼리가 Airflow에 의해 스케줄링될 경우, 의도치 않게 여러 번 수행되어 많은 비용이 발생할 수 있으니 주의
Airflow와 타임존
airflow.cfg에는 default_timezone, default_ui_timezone 두 개의 타임존 관련 키가 존재
start_date, end_date, schedule: default_timezone에 지정된 타임존을 따름
execution_date, 로그 시간: 항상 UTC를 따름
DAGS 폴더에서 코드 작성 시 주의할 점
airflow는 DAGS 폴더를 주기적으로(5분마다) 스캔함
이때 DAG 모듈이 들어있는 모든 파일의 메인 함수가 실행됨