📖 학습주제
데이터 파이프라인, Airflow (4)
Open Weathermap DAG 구현하기
Open Weathermap API 소개
만들려는 DAG: 서울 8일 낮/최소/최대 온도 읽기
- API Key를 open_weather_api_key라는 Variable로 저장
- 서울의 위도와 경도를 찾을 것
- One-Call API를 사용: https://openweathermap.org/api/one-call-api
- 앞서 API KEY와 서울의 위도/경도를 사용해서 위의 API를 requests 모듈을 사용해서 호출
- 응답 결과에서 온도 정보(평균/최소/최대)만 앞으로 7일을 대상으로 출력해볼 것
날짜, 낮 온도(day), 최소 온도(min), 최대 온도(max)
DAG 구현
- Open Weathermap의 one call API를 사용해서 서울의 다음 8일간의
낮/최소/최대 온도를 읽어다가 각자 스키마 밑의 weather_forecast라는
테이블로 저장
- https://openweathermap.org/api/one-call-api를 호출해서 테이블을 채움
- weather_forecast라는 테이블이 대상이 됨
- 여기서 유의할 점은 created_date은 레코드 생성시간으로 자동 채워지는 필드라는 점
CREATE TABLE keeyong.weather_forecast (
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
-
One-Call API는 결과를 JSON 형태로 리턴해줌
- 이를 읽어들이려면 requests.get 결과의 text를 JSON으로 변환해 주어야함(아니면 requests.get 결과 오브젝트가 제공해주는 .json()이란 함수 사용)
f = requests.get(link)
f_js = f.json()
-
결과 JSON에서 daily라는 필드에 앞으로 8일간 날씨 정보가 들어감 있음
- daily 필드는 리스트이며 각 레코드가 하나의 날짜에 해당
- 날짜 정보는 “dt”라는 필드에 들어 있음. 이는 epoch이라고 해서 1970년 1월 1일 이후 밀리세컨드로 시간을 표시. 이는 아래와 같은 코드로 읽을 수 있는 날짜로 변경 가능
datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') # 2021-10-09
-
Airflow Connections를 통해 만들어진 Redshift connection
- 기본 autocommit의 값은 False인 점을 유의
-
두 가지 방식의 Full Refresh 구현 방식
- Full Refresh와 INSERT INTO를 사용
-
DW상의 테이블은 아래처럼 정의
CREATE TABLE 각자스키마.weather_forecast (
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
Primary Key Uniqueness 보장하기
Primary Key Uniqueness
- 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드(들)
- 하나의 필드가 일반적이지만 다수의 필드를 사용할 수도 있음
- 이를 CREATE TABLE 사용시 지정
- 관계형 데이터베이스 시스템이 Primary key의 값이 중복 존재하는 것을 막아줌
예) 아래 테이블의 product_id
CREATE TABLE products (
product_id INT PRIMARY KEY,
name VARCHAR(50),
price decimal(7, 2)
);
- 빅데이터 기반 데이터 웨어하우스들은 Primary Key를 지켜주지 않음 : Primary key를 기준으로 유일성 보장을 해주지 않음, 이를 보장하는 것은 데이터 인력의 책임
- Primary key 유일성을 보장해주지 않는 이유 : 보장하는데 메모리와 시간이 더 들기 때문에 대용량 데이터의 적재가 걸림돌이 됨
Primary Key 유지 방법
CREATE TABLE trick.weather_forecast (
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
- 날씨 정보이기 때문에 최근 정보가 더 신뢰할 수 있음
- 그래서 어느 정보가 더 최근 정보인지를 created_date 필드에 기록하고 이를 활용, 즉 date이 같은 레코드들이 있다면 created_date을 기준으로 더 최근 정보를 선택
- 이를 하는데 적합한 SQL 문법이
ROW_NUMBER
혹은
- 임시 테이블(스테이징 테이블)을 만들고 거기로 현재 모든 레코드를 복사
- 임시 테이블에 새로 데이터소스에서 읽어들인 레코드들을 복사 (이 때 중복 존재 가능)
- 중복을 걸러주는 SQL 작성
- 최신 레코드를 우선 순위로 선택
- ROW_NUMBER를 이용해서 primary key로 partition을 잡고 적당한 다른 필드(보통 타임스탬프 필드)로 ordering(역순 DESC)을 수행해 primary key별로 하나의 레코드를 잡아냄
- 위의 SQL을 바탕으로 최종 원본 테이블로 복사
- 이때 원본 테이블에서 레코드들을 삭제
- 임시 temp 테이블을 원본 테이블로 복사
Upsert
- Primary Key를 기준으로 존재하는 레코드라면 새 정보로 수정
- 존재하지 않는 레코드라면 새 레코드로 적재
- 보통 데이터 웨어하우스마다 UPSERT를 효율적으로 해주는 문법을 지원해줌
Backfill과 Airflow
Backfill
- 실패한 데이터 파이프라인을 재실행 혹은 읽어온 데이터들의 문제로 다시 다 읽어와야하는 경우를 의미
- Backfill 해결은 Incremental Update에서 복잡해짐
- 즉 실패한 데이터 파이프라인의 재실행이 얼마나 용이한 구조인가?