우선, 6주차 강의가 올라간지 한참 지나서 5주차 리뷰를 쓰는 자신을 반성하며 글을 시작합니다...🥲
🈯 복습 및 숙제 해설
4주차 복습
질문 리뷰
Explain
을 붙이면 됨. 좀 복잡한 형태로 반환함.강조하고 또 강조해도 부족한 start_date, execution_date, backfill
8/10
데이터가 전부 다 쌓인 8/11
8/10
8/11
에 가져와야 하는 데이터의 날짜로 전달받는 execution_date
는 8/10
8/12
에 또 DAG를 시행하면, execution_date
는 8/11
8/14
인데, 데이터를 뜯어보니 8/10~8/12
까지 데이터가 잘못들어갔다. 이 것을 다시 시행하는 게 backfill
try/except 사용시 유의할 점
try :
cur.execute(<sql>)
cur.execute("COMMIT.")
except Exception as e :
cur.execute("ROLLBACK")
raise
autocommit
False
BEGIN
해도 영향 없음.🈯 복습 및 숙제 해설
숙제 해설
# 내 버전
def load(**context):
schema = context["params"]["schema"]
table = context["params"]["table"]
cur = get_Redshift_connection()
lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
sql = """CREATE TABLE {schema}.{table} (
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
""".format(schema=schema, table=table)
sql += "BEGIN; DELETE FROM {schema}.{table};".format(schema=schema, table=table)
for line in lines:
if line != "":
(date, temp, min_temp, max_temp) = (datetime.fromtimestamp(line.get("dt")).strftime('%Y-%m-%d'),
line.get("temp").get("day"),
line.get("temp").get("min"),
line.get("temp").get("max"))
sql += f"""INSERT INTO {schema}.{table} VALUES ('{date}', '{temp}', '{min_temp}', '{max_temp}');"""
sql += "END;"
logging.info(sql)
cur.execute(sql)
# 모범 답안
def etl(**context):
api_key = Variable.get("open_weather_api_key")
# 서울의 위도/경도
lat = 37.5665
lon = 126.9780
# https://openweathermap.org/api/one-call-api
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
response = requests.get(url)
data = json.loads(response.text)
"""
{'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
"""
ret = []
for d in data["daily"]:
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_connection()
insert_sql = """DELETE FROM keeyong.weather_forecast;INSERT INTO keeyong.weather_forecast VALUES """ + ",".join(ret)
logging.info(insert_sql)
try:
cur.execute(insert_sql)
cur.execute("Commit;")
except Exception as e:
cur.execute("Rollback;")
raise
"""
CREATE TABLE <db>.<table> (
date date,
temp float,
min_temp float,
max_temp float,
updated_date timestamp default GETDATE()
);
"""
# 내 버전
def load(**context):
schema = context["params"]["schema"]
table = context["params"]["table"]
cur = get_Redshift_connection()
lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
sql = "CREATE TABLE {schema}.temp_{table} AS SELECT * FROM {schema}.{table};".format(schema=schema, table=table)
for line in lines:
if line != "":
(date, temp, min_temp, max_temp) = (datetime.fromtimestamp(line.get("dt")).strftime('%Y-%m-%d'),
line.get("temp").get("day"),
line.get("temp").get("min"),
line.get("temp").get("max"))
sql += f"""INSERT INTO {schema}.temp_{table} VALUES ('{date}', '{temp}', '{min_temp}', '{max_temp}');"""
sql += "BEGIN; DELETE FROM {schema}.{table};".format(schema=schema, table=table)
sql += """
INSERT INTO {schema}.{table}
SELECT date, temp, min_temp, max_temp, created_date
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY date
ORDER BY created_date DESC
) seq
FROM {schema}.temp_{table}
)
WHERE seq = 1;
""".format(schema=schema, table=table)
sql += "END;"
logging.info(sql)
cur.execute(sql)
# 모범 답안
def etl(**context):
schema = context["params"]["schema"]
table = context["params"]["table"]
lat = context["params"]["lat"]
lon = context["params"]["lon"]
api_key = Variable.get("open_weather_api_key")
# https://openweathermap.org/api/one-call-api
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
response = requests.get(url)
data = json.loads(response.text)
"""
{'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
"""
ret = []
for d in data["daily"]:
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_connection()
# 임시 테이블 생성
create_sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table};
CREATE TABLE {schema}.temp_{table} (LIKE {schema}.{table} INCLUDING DEFAULTS);INSERT INTO {schema}.temp_{table} SELECT * FROM {schema}.{table};"""
logging.info(create_sql)
try:
cur.execute(create_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
# 임시 테이블 데이터 입력
insert_sql = f"INSERT INTO {schema}.temp_{table} VALUES " + ",".join(ret)
logging.info(insert_sql)
try:
cur.execute(insert_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
# 기존 테이블 대체
alter_sql = f"""DELETE FROM {schema}.{table};
INSERT INTO {schema}.{table}
SELECT date, temp, min_temp, max_temp FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY updated_date DESC) seq
FROM {schema}.temp_{table}
)
WHERE seq = 1;"""
logging.info(alter_sql)
try:
cur.execute(alter_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
"""
CREATE TABLE keeyong.weather_forecast (
date date,
temp float,
min_temp float,
max_temp float,
updated_date timestamp default GETDATE()
);
"""
내 코드의 문제점
created_date
컬럼값을 못 받아옴.차이점
Like {schema}.{table} INCLUDING DEFAULTS
(기존에 있던 테이블 속성 그대로 받아옴)를 사용.Drop if exitst
랑 Like INCLUDING DEFAULTS
를 사용하지 않은게 패인.Airflow 환경 설정 변경
sudo systemctl restart airflow-webserver
b. sudo systemctl restart airflow-scheduler
from airflow import DAG
쓰면 이걸 airflow가 스캔해서 실행해버림. → 테스트 코드도 돌아가버려서 비용 Boom🤯🧐 5주차
production db(MySQL) → DW(Redshift)
기본적인 구조
MySQL → S3 → Redshift
순서