airflow.cfg에는 두 종류의 타임존 관련 키가 존재
a. default_timezone : 언제 DAG가 실행되는지, 스케쥴이 이걸 따라감.
b. default_ui_timezone
start_date, end_date, schedule
a. default_timezone에 지정된 타임존을 따름
execution_date와 로그 시간
a. 항상 UTC를 따름
b. 즉 execution_date를 사용할 때는 타임존을 고려해서 변환 후 사용필요
현재로 가장 좋은 방법은 UTC를 일관되게 사용하는 것으로 보임. 섞어서 쓰면 혼란이 옴.
Airflow는 dags폴더를 주기적으로 스캔함
[core]
dags_folder = /var/lib/airflow/dags
#How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 300
이 때 DAG 모듈이 들어있는 모든 파일들의 메인 함수가 실행이 됨
from airflow import DAG
...
cur.execute("DELETE FROM ....")
https://restcountries.com/ 을 참고
https://restcountries.com/v3/all 를 호출하여 나라별로 다양한 정보를 얻을 수 있다.
Full Refresh로 구현해서 매번 국가 정보를 읽어오게 할것
API 결과에서 아래 3개의 정보를 추출하여 Redshift에 스키마 밑에 테이블 생성
단 이 DAG는 UTC로 매주 토요일 오전 6시 30분에 실행되게 만들어볼 것
import requests
@task
def extract_transform():
url = f"https://restcountries.com/v3/all"
response = requests.get(url)
data = response.json()
records = []
for row in data:
records.append([row["name"]["official"], row["population"], row["area"]])
return records
@task
def load(schema, table, records):
logging.info("load started")
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
cur.execute(f"""
CREATE TABLE {schema}.{table} (
country varchar(255),
population bigint,
area float
);""")
for r in records:
sql = f"INSERT INTO {schema}.{table} VALUES (%s, %s, %s);"
print(sql)
cur.execute(sql, (r[0], r[1], r[2]))
cur.execute("COMMIT;") # cur.execute("END;")
except Exception as error:
print(error)
cur.execute("ROLLBACK;")
raise
logging.info("load done")
with DAG(
dag_id = 'UpdateWorldAPI',
start_date = datetime(2023,5,30),
catchup=False,
tags=['API'],
schedule_interval='30 6 * * 6', #0 - Sunday, ..., 6 - Saturday
) as dag:
results = get_worlds_info()
load("kyongjin1234", "world_info", results)
위도/경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스
무료 계정으로 api key를 받아서 이를 호출시에 사용
CREATE TABLE keeyong.weather_forecast (
date date primary key,
temp float, -- 낮 온도
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
One-Call API는 결과를 JSON 형태로 리턴해줌
결과 JSON에서 daily라는 필드에 앞으로 8일간 날씨 정보가 들어가 있음
daily라는 리스트에 앞으로 7일간의 온도 정보가 들어옴
Airflow Connections를 통해 만들어진 Redshift connection
두 가지 방식의 Full Refresh 구현 방식
CREATE TABLE 각자스키마.weather_forecast (
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
from datetime import datetime
from datetime import timedelta
import requests
import logging
import json
def get_Redshift_connection():
# autocommit is False by default
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
return hook.get_conn().cursor()
@task
def etl(schema, table):
api_key = Variable.get("open_weather_api_key")
# 서울의 위도/경도
lat = 37.5665
lon = 126.9780
# https://openweathermap.org/api/one-call-api
url = f"https://api.openweathermap.org/data/3.0/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
response = requests.get(url)
data = json.loads(response.text)
"""
{'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
"""
ret = []
for d in data["daily"]:
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_connection()
drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
CREATE TABLE {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
"""
insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret)
logging.info(drop_recreate_sql)
logging.info(insert_sql)
try:
cur.execute(drop_recreate_sql)
cur.execute(insert_sql)
cur.execute("Commit;")
except Exception as e:
cur.execute("Rollback;")
raise
with DAG(
dag_id = 'Weather_to_Redshift',
start_date = datetime(2023,5,30), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
) as dag:
etl("keeyong", "weather_forecast")