[프로그래머스] 데브코스 데이터엔지니어링 TIL Day 44

주재민·2023년 12월 14일
0
post-thumbnail

📖 학습주제

데이터 파이프라인, Airflow (4)


Open Weathermap DAG 구현하기

Open Weathermap API 소개

  • 위도/경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스
  • 무료 계정으로 api key를 받아서 이를 호출시에 사용
    https://openweathermap.org/price

만들려는 DAG: 서울 8일 낮/최소/최대 온도 읽기

  • API Key를 open_weather_api_key라는 Variable로 저장
  • 서울의 위도와 경도를 찾을 것
  • One-Call API를 사용: https://openweathermap.org/api/one-call-api
    - 앞서 API KEY와 서울의 위도/경도를 사용해서 위의 API를 requests 모듈을 사용해서 호출
    - 응답 결과에서 온도 정보(평균/최소/최대)만 앞으로 7일을 대상으로 출력해볼 것
    날짜, 낮 온도(day), 최소 온도(min), 최대 온도(max)

DAG 구현

  • Open Weathermap의 one call API를 사용해서 서울의 다음 8일간의
    낮/최소/최대 온도를 읽어다가 각자 스키마 밑의 weather_forecast라는
    테이블로 저장
  • https://openweathermap.org/api/one-call-api를 호출해서 테이블을 채움
  • weather_forecast라는 테이블이 대상이 됨
    - 여기서 유의할 점은 created_date은 레코드 생성시간으로 자동 채워지는 필드라는 점
CREATE TABLE keeyong.weather_forecast (
 date date primary key,
 temp float, -- 낮 온도
 min_temp float,
 max_temp float,
 created_date timestamp default GETDATE()
);
  • One-Call API는 결과를 JSON 형태로 리턴해줌
    - 이를 읽어들이려면 requests.get 결과의 text를 JSON으로 변환해 주어야함(아니면 requests.get 결과 오브젝트가 제공해주는 .json()이란 함수 사용)

    f = requests.get(link)
    f_js = f.json()

  • 결과 JSON에서 daily라는 필드에 앞으로 8일간 날씨 정보가 들어감 있음
    - daily 필드는 리스트이며 각 레코드가 하나의 날짜에 해당
    - 날짜 정보는 “dt”라는 필드에 들어 있음. 이는 epoch이라고 해서 1970년 1월 1일 이후 밀리세컨드로 시간을 표시. 이는 아래와 같은 코드로 읽을 수 있는 날짜로 변경 가능

    datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') # 2021-10-09

  • Airflow Connections를 통해 만들어진 Redshift connection
    - 기본 autocommit의 값은 False인 점을 유의

  • 두 가지 방식의 Full Refresh 구현 방식
    - Full Refresh와 INSERT INTO를 사용

  • DW상의 테이블은 아래처럼 정의

CREATE TABLE 각자스키마.weather_forecast (
 date date primary key,
 temp float,
 min_temp float,
 max_temp float,
 created_date timestamp default GETDATE()
);

Primary Key Uniqueness 보장하기

Primary Key Uniqueness

  • 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드(들)
    - 하나의 필드가 일반적이지만 다수의 필드를 사용할 수도 있음
    - 이를 CREATE TABLE 사용시 지정
  • 관계형 데이터베이스 시스템이 Primary key의 값이 중복 존재하는 것을 막아줌

예) 아래 테이블의 product_id

CREATE TABLE products (
 product_id INT PRIMARY KEY,
 name VARCHAR(50),
 price decimal(7, 2)
);
  • 빅데이터 기반 데이터 웨어하우스들은 Primary Key를 지켜주지 않음 : Primary key를 기준으로 유일성 보장을 해주지 않음, 이를 보장하는 것은 데이터 인력의 책임
  • Primary key 유일성을 보장해주지 않는 이유 : 보장하는데 메모리와 시간이 더 들기 때문에 대용량 데이터의 적재가 걸림돌이 됨

Primary Key 유지 방법

CREATE TABLE trick.weather_forecast (
 date date primary key,
 temp float,
 min_temp float,
 max_temp float,
 created_date timestamp default GETDATE()
);
  • 날씨 정보이기 때문에 최근 정보가 더 신뢰할 수 있음
  • 그래서 어느 정보가 더 최근 정보인지를 created_date 필드에 기록하고 이를 활용, 즉 date이 같은 레코드들이 있다면 created_date을 기준으로 더 최근 정보를 선택
  • 이를 하는데 적합한 SQL 문법이 ROW_NUMBER

혹은

  • 임시 테이블(스테이징 테이블)을 만들고 거기로 현재 모든 레코드를 복사
  • 임시 테이블에 새로 데이터소스에서 읽어들인 레코드들을 복사 (이 때 중복 존재 가능)
  • 중복을 걸러주는 SQL 작성
    - 최신 레코드를 우선 순위로 선택
    - ROW_NUMBER를 이용해서 primary key로 partition을 잡고 적당한 다른 필드(보통 타임스탬프 필드)로 ordering(역순 DESC)을 수행해 primary key별로 하나의 레코드를 잡아냄
  • 위의 SQL을 바탕으로 최종 원본 테이블로 복사
    - 이때 원본 테이블에서 레코드들을 삭제
    - 임시 temp 테이블을 원본 테이블로 복사

Upsert

  • Primary Key를 기준으로 존재하는 레코드라면 새 정보로 수정
  • 존재하지 않는 레코드라면 새 레코드로 적재
  • 보통 데이터 웨어하우스마다 UPSERT를 효율적으로 해주는 문법을 지원해줌

Backfill과 Airflow

Backfill

  • 실패한 데이터 파이프라인을 재실행 혹은 읽어온 데이터들의 문제로 다시 다 읽어와야하는 경우를 의미
  • Backfill 해결은 Incremental Update에서 복잡해짐
  • 즉 실패한 데이터 파이프라인의 재실행이 얼마나 용이한 구조인가?

0개의 댓글