
์ฃผ์ : ๊ธฐ์์ฒญ Open API๋ฅผ ํ์ฉํด ๋ค์ํ ์ฃผ์ ์ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ํ๋ณ๋ก ๊ตฌ์ฑ
ํ ์ ๋ต: ํ๋์ ํฐ ์ฃผ์ ๋ฅผ '๊ธฐ์์ฒญ API ํ์ฉ'์ผ๋ก ์ก๊ณ , ๊ฐ์ ๋ค๋ฅธ ์ธ๋ถ ์ฃผ์ ๋ก ํ์ดํ๋ผ์ธ์ ์ค๊ณ
๋ด ํํธ: "๊ณตํด์ผ ๋ฐ ์ฃผ๋ง์ ๋ ์จ ๋ง์กฑ๋ ๋ถ์"
๊ธฐ๊ฐ: 2025.06.03 ~ 2025.06.12
๊ธฐ์ ์คํ:
requests, holidays, datetime, pandasApache AirflowSnowflakeSlack Webhook
๊ธฐ์์ฒญ ๋ ์จ ์ ๋ณด API์์ ํ ์คํธ ํฌ๋งท ๋ฐ์ดํฐ๋ฅผ ์์งํ๊ณ ํ์ฑํ์ฌ ์ฌ์ฉ
holidays ํจํค์ง๋ฅผ ์ฌ์ฉํด ๊ณตํด์ผ ์ฌ๋ถ ํ๋ณ, ์ฃผ๋ง ํฌํจ ์ฌ๋ถ ํ๋จ
๋ ์ง๋ณ๋ก ๊ธฐ์จ, ๊ฐ์๋, ์๋์ต๋ ๋ฑ์ ํ์ฉํ์ฌ ๋ง์กฑ๋ ์ ์ ์ฐ์
Airflow DAG๋ก ์ ์ฒด ํ๋ฆ์ ์๋ํํ๊ณ , Snowflake์ ์ต์ข ๊ฒฐ๊ณผ ํ ์ด๋ธ ์ ์ฌ
Slack Webhook์ ์ด์ฉํ ๊ฒฐ๊ณผ ์๋ฆผ ์๋ํ๊น์ง ํฌํจํ End-to-End ์ค๊ณ
[๊ธฐ์์ฒญ ๋ ์จ API]
โ extract
[ํ์ํ ์ง์ญ/๊ธฐ๊ฐ ๋ฐ์ดํฐ ์์ง]
โ transform
[์ฃผ๋ง/๊ณตํด์ผ ํํฐ๋ง โ ์์ฝ โ ์ ์ ์ฐ์ถ]
โ load
[Snowflake ๋๋ ๋ด๋ถ DB์ ์ ์ฌ]
โ notify
[Slack ๋ฉ์์ง๋ก ์๋ฆผ ์ ์ก]
extract(region_name: str) (TASK)region_name โ region_code๋ก ๋ณํ ํ ์์ฒญYYYYMMDD0000 ~ YYYYMMDD2300records) ๋ฐํtransform(extracted: dict) (TASK)
is_weekend ๋๋ is_holiday ์กฐ๊ฑด ํํฐ๋งsummarize_weather()๋ก ํ๊ท ๊ธฐ์จ, ์ต๋, ํ์, ๊ฐ์๋ ์์ฝcalculate_outdoor_score()๋ก ์ ์ ์ฐ์ถ load(transformed: dict) (TASK)
insert_weather_score() ํจ์ ํธ์ถnotify(transformed_listL List[dict]) (TASK)

Airflow DAG ๊ตฌ์ฑ:
with DAG(
dag_id="weather_pipeline_dag",
start_date=datetime(2025, 1, 6), # ์์์ผ ์์
schedule_interval="@weekly",
catchup=True,
...
)
๋ฐฑํ ๋ช ๋ น ์คํ:
airflow dags backfill -s 2025-01-02 -e 2025-06-02 weather_pipeline_dag
๋ฌธ์ : backfill์ ์คํํ์์๋ ๋ถ๊ตฌํ๊ณ ๊ณผ๊ฑฐ ๋ฐ์ดํฐ๊ฐ DB์ ์ ์ฅ๋์ง ์์๋ค ๐ฑ
โ ํด๊ฒฐ
- backfill์ ์คํ ์ด๋ ฅ์ด ์๋ ๋ ์ง์๋ง ์๋
- DAG ์คํ ์ด๋ ฅ์ด ์๊ฑฐ๋ ์คํจ ๊ธฐ๋ก์ด ์์ผ๋ฉด ํด๋น ๋ ์ง๋ ๊ฑด๋๋
- ํด๊ฒฐ: Airflow UI์์ DAG๋ฅผ ์ญ์ ํ๊ณ ๋ค์ ๋ฑ๋กํ ํ backfill ์คํ
๐ ๋ฐฐ์ด ์
โ ํด๊ฒฐ
extracted_list = extract.expand(region_name=regions)
transformed_list = transform.expand(extracted=extracted_list)
loaded_list = load.expand(transformed=transformed_list)
notify.override(trigger_rule=TriggerRule.ALL_SUCCESS)(loaded_list)
๐ ๋ฐฐ์ด ์
expand()๋ก ๋ฆฌ์คํธ ๊ธฐ๋ฐ ๋ณ๋ ฌ ์ฒ๋ฆฌ ๊ฐ๋ฅ MERGE ๋ฌธ์ผ๋ก ๋์ผ ๋ ์ง์ ๋ฐ์ดํฐ๋ฅผ ์
๋ฐ์ดํธํ๊ฑฐ๋ ์๋ก ์ฝ์
โ ํด๊ฒฐ
MERGE INTO WEATHER_DB.JINYOUNG.WEATHER_SCORE tgt
USING (
SELECT '{region_code}' AS region_code, '{date}' AS date, ...
) src
ON tgt.region_code = src.region_code AND tgt.date = src.date
WHEN MATCHED THEN
UPDATE SET ...
WHEN NOT MATCHED THEN
INSERT (...) VALUES (...);
๐ ๋ฐฐ์ด ์
MERGE ๋ฌธ์ ๋ฉฑ๋ฑ์ฑ๊ณผ ์ต์ ์ฑ ๋์ ๋ณด์ฅ NoneType, ๋ฆฌ์คํธ ์ธ๋ฑ์ค ์๋ฌ, Slack ๋ฉ์์ง ํฌ๋งท ์ค๋ฅ ๋ฑ ๋ค์ํ ๋ฌธ์ ๊ฐ ๋ฐ์โ ํด๊ฒฐ
if not filtered:
print("์ฃผ๋ง/๊ณตํด์ผ ๋ฐ์ดํฐ ์์, ์ ์ฌ ์คํต")
return None

๐ ๋ฐฐ์ด ์
โ ํด๊ฒฐ
def send_slack_message(message: str):
webhook_url = Variable.get("slack_webhook_url")
payload = {"text": message}
response = requests.post(webhook_url, json=payload)
if response.status_code == 200:
print("โ
Slack ์ ์ก ์ฑ๊ณต")
else:
print(f"โ Slack ์ ์ก ์คํจ: {response.status_code}, {response.text}")
Slack Webhook URL์ Airflow ํ๊ฒฝ ๋ณ์์ ์ ์ฅ
์๋ฆผ ๋ฉ์์ง๋ ํ ์คํธ ์ ๋ ฌ ๋ฐ format_slack_message() ํจ์๋ก ํฌ๋งท ๊ตฌ์ฑ
notify() task์์ ์ ์ฒด ๊ฒฐ๊ณผ๋ฅผ ์ทจํฉ ํ ์ฌ๋ ๋ฉ์์ง ์ ์ก

๐ ๋ฐฐ์ด ์
๋ฐ์ดํฐ ๋ถ์ ๊ฒฐ๊ณผ๋ฅผ ๋จ์ ์ ์ฅ์ด ์๋๋ผ ์๋ฆผ/์๊ฐํ๋ก ์ฐ๊ฒฐํ๋ ๊ฒฝํ์ด ์ธ์ฌ์ดํธ ์ ๋ฌ์ ์ค์ํจ
ํ์๋ค๊ณผ ๋น ๋ฅด๊ฒ ๊ฒฐ๊ณผ๋ฅผ ๊ณต์ ํ ์ ์์ด ์ค์๊ฐ ํ์ ์๋ ๋งค์ฐ ์ ์ฉ