2. airflow-day4-2

data_hamster·2023년 6월 16일
0
post-custom-banner

학습주제
Open Weathermap API로 대그 구현

학습내용

풀리프래쉬로 구현
레드쉬프트에서 어떠한 형태로 PK 유일한지 설명 후
DAG를 인크리멘탈 업데이트로 바꾸어 본다.

앞서 만든 DAG와 비슷하게 진행


api key를 받아서 이를 호출시에 사용
https://openweathermap.org/price
위도와 경도를 주면 그 지역의 기후정보를 알려준다.
프리 옵션도 있지만 유료 옵션도 있음.
전에는 크레딧 카드를 등록하지 않아도 사용할 수 있었지만
이번엔 크레딧 카드도 등록해야함
API keys에서 키를 할당 받을 수 있음.

Pay as you call로 등록함

우상단에 내 닉네임 하단에
my api keys에 가면
api 키 나옴.


8일간의 향후 낮, 최소, 최대를 읽어다
레드쉬프트 테이블에 적재.
처음엔 풀리프레쉬
이후 인크리멘탈 업데이트 시도.

API 키는 open_weather_api_key라는 Variable로 에어플로우 웹 UI에서 저장.


api라는 이름이 부여되니, 값이 자동 마스킹 처리됨.
서울의 위도와 경도를 찾아야 함 (입력값)
무료버전으 one_call 만 사용할 수 있음.

파이썬의 requests 모듈을 사용해서 호출해볼 예정. -> 환경설정에 추가해놓음.
units=metrc 화씨가 아닌 섭씨로 받겠다.


다음 8일간.

CREATE TABLE kjw9684k.weather_forecast (
date date primary key,
temp float, -- 낮 온도
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);

코드 안에서 처리할 예정.
당장은 필요하지않지만 created_date라는 디폴트 값을 갖는 필드를 만들어 놓음
이 정보를 이용해 인크리멘탈 업데이트를 할 때 중복처리를 할 예정.

created_date timestamp default GETDATE()는 SQL 문에서 칼럼을 정의하는 방법 중 하나입니다. 이 문장에서 created_date는 칼럼의 이름, timestamp는 데이터 타입을 나타내고, default GETDATE()는 이 칼럼의 기본값을 현재 날짜와 시간으로 설정하겠다는 의미입니다.
즉, 새로운 레코드가 이 테이블에 추가될 때 created_date 칼럼에 명시적으로 값이 주어지지 않으면, 기본적으로 현재 날짜와 시간(GETDATE()의 결과)이 이 칼럼에 저장됩니다.

디폴트 값이 있기 때문에 현재 시간으로 세팅이 됨.

7일이 아니라 8일
requests에서 get으로 API를 받아서, .json으로 변환해주면 될꺼 같음.
url 입력 방법은 아까 앞에서 나왔음.
딕셔너리 형태를 보면 다양한 필드가 있음.

dt 필드가 숫자로 표현되어 있음 .epoch일고 70년 1월 1일 이후를 밀리세컨드로 계삲됨.
datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
이대로 가져다 쓰면 됨. 에폭에서 년월일 형태도 바꿔줄 수 있음.

인자로 주었더 위도 경도,
지역이 어디인지
daily라는 리스트가 있음 저 필드만 읽어오면 됨. 리스트고 날짜별로 레코드가 하나씩 있음. dt가 하나씩 있음 에폭이라고 함. 해가 뜬 시간, 해가 진 시간, 달이 뜬 시간 등. temp 밑에 들어가보면 day 낮온도, min, max 등이 있음.
데일리를 루프를 돌면서 dt 변환, temp 밑에 day, min, max를 뽑아서
하나의 레코드화 시켜서 레드쉬프트에 적재 예정.


이번엔 특이하게 autocommit을 False로 할 것임

리프레쉬
기존의 INSERT INTO 건바이 건으로 적재
COPY를 사용해봄. 벌크 업데이트임.
이게 될려면 extract transform 데이터 웨어하우스에 적재.
s3에 로딩되어야 함. csv, json 등 특정 로케이션에 적재. 데이터 웨어하우스에 벌크 업테이트 할 것임.
s3에 파일을 적재하는 스텝이 하나 추가될 것임.
지금까진 INSERT INTO로 적재 해볼것임


API key는 variable에 적재
https://github.com/learndataeng/learn-airflow/blob/main/dags/Weather_to_Redshift.py
코드.

from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task

from datetime import datetime
from datetime import timedelta

import requests
import logging
import json

def get_Redshift_connection():
	# autocommit is False by default
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    return hook.get_conn().cursor()
    
@task
def etl(schema, table):
	api_key = Variable.get("open_weather_api_key")
    # 서울의 위도/경도
    lat = 37.5665
    lon = 126.9780
    
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
    response = requests.get(url)
    data = json.loads(response.text)
    
    ret = []
    for d in data["daily"]:
    	day = datetime.fromtimestamp(d["dt"]).strftime("%Y-%m-%d")
        ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
        
    cur = get_Redshift_connection()
    drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
    CREATE TABLE {schema}.{table} (
    date date,
    temp float,
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
    );
    """
    # 리스트를 ,로 구분된 긴 문자열을 만들고 이걸 각 value가 됨.
    insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret)
    logging.info(drop_recreate_sql)
    logging.info(insert_wql)
    try:
    	cur.execute(drop_recreate_sql)
        cur.execute(insert_sql)
        cur.execute("Commit;")
    except Exception as e:
    	cur.execute("Rollback;")
        raise
        
with DAG(
	dag_id = 'Weather_to_Redshift',
    start_date = datetime(2023,5,30) # 날짜가 미래인 경우 실행이 안됨.
    schedule = '0 2 * * *', # 매일 새벽 2시
    max_active_run = 1,
    catchup = False,
    default_args = {
    	'retries': 1,
        'retry_delay': timedelta(minutes=3),
        }
) as dag:

	etl("kjw9684k", "weather_forecast")

오토 커밋 디폴트는 false
etl task 한개만 만들었음.
스키마, 테이블 이름을 인자로 받음
variable로 .get으로 받아놓음
위도 경도는 하드 코딩할 수 도 있고,
인크리멘탈 업데이트 시엔 위도, 경도도 함수 인자로 바꿔볼 예정
url f스트링 이용해서 위도 경도, api 키.
metircs로 받고싶다.
get으로 받아서 json.loads(response.text)
또는 response.json()도 가능함.

큰 딕셔너리가 됨
데이터의 daily만 루프를 8번 돌게됨.
hourly를 돌면 8 * 24 를 돌게됨.
일별 날씨정보 d를 읽어다가
dt 필드는 epoch이기에 datetime.fromtimestamp.strftime
특이하게 이번엔 append를 "( )"로 감싸서 스트링 형태로 받음.
-> 이후 INSERT INTO 때 사용

풀리프레쉬기 때문에
먼저 DROP TABLE IF EXISTS

전에는 루프를 돌면서 INSERT INTO를 했지만, 이번엔 8개기 때문에 .join(ret)을 이용하여 한번에 집어넣게 됨.
sql문에 대한 기본 이해 필요

except 블록은 웬만하면
rasie 쓰기

            
profile
반갑습니다 햄스터 좋아합니다
post-custom-banner

0개의 댓글