๐ŸŒค๏ธ ๊ณตํœด์ผ/์ฃผ๋ง ๋งŒ์กฑ๋„ ๋ถ„์„ ํŒŒ์ดํ”„๋ผ์ธ ๊ตฌ์ถ•๊ธฐ (feat. ๊ธฐ์ƒ์ฒญ API)

Jinyoung Cheonยท2025๋…„ 6์›” 12์ผ
0
post-thumbnail

1. ํ”„๋กœ์ ํŠธ ๊ฐœ์š”

  • ์ฃผ์ œ: ๊ธฐ์ƒ์ฒญ Open API๋ฅผ ํ™œ์šฉํ•ด ๋‹ค์–‘ํ•œ ์ฃผ์ œ์˜ ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์„ ํŒ€๋ณ„๋กœ ๊ตฌ์„ฑ

  • ํŒ€ ์ „๋žต: ํ•˜๋‚˜์˜ ํฐ ์ฃผ์ œ๋ฅผ '๊ธฐ์ƒ์ฒญ API ํ™œ์šฉ'์œผ๋กœ ์žก๊ณ , ๊ฐ์ž ๋‹ค๋ฅธ ์„ธ๋ถ€ ์ฃผ์ œ๋กœ ํŒŒ์ดํ”„๋ผ์ธ์„ ์„ค๊ณ„

  • ๋‚ด ํŒŒํŠธ: "๊ณตํœด์ผ ๋ฐ ์ฃผ๋ง์˜ ๋‚ ์”จ ๋งŒ์กฑ๋„ ๋ถ„์„"

  • ๊ธฐ๊ฐ„: 2025.06.03 ~ 2025.06.12

  • ๊ธฐ์ˆ  ์Šคํƒ:

    • requests, holidays, datetime, pandas
    • Apache Airflow
    • Snowflake
    • Slack Webhook


2. ๋‚ด๊ฐ€ ๋งก์€ ์—ญํ• 

  1. ๊ธฐ์ƒ์ฒญ ๋‚ ์”จ ์ •๋ณด API์—์„œ ํ…์ŠคํŠธ ํฌ๋งท ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•˜๊ณ  ํŒŒ์‹ฑํ•˜์—ฌ ์‚ฌ์šฉ

  2. holidays ํŒจํ‚ค์ง€๋ฅผ ์‚ฌ์šฉํ•ด ๊ณตํœด์ผ ์—ฌ๋ถ€ ํŒ๋ณ„, ์ฃผ๋ง ํฌํ•จ ์—ฌ๋ถ€ ํŒ๋‹จ

  3. ๋‚ ์งœ๋ณ„๋กœ ๊ธฐ์˜จ, ๊ฐ•์ˆ˜๋Ÿ‰, ์ƒ๋Œ€์Šต๋„ ๋“ฑ์„ ํ™œ์šฉํ•˜์—ฌ ๋งŒ์กฑ๋„ ์ ์ˆ˜ ์‚ฐ์ •

  4. Airflow DAG๋กœ ์ „์ฒด ํ๋ฆ„์„ ์ž๋™ํ™”ํ•˜๊ณ , Snowflake์— ์ตœ์ข… ๊ฒฐ๊ณผ ํ…Œ์ด๋ธ” ์ ์žฌ

  5. Slack Webhook์„ ์ด์šฉํ•œ ๊ฒฐ๊ณผ ์•Œ๋ฆผ ์ž๋™ํ™”๊นŒ์ง€ ํฌํ•จํ•œ End-to-End ์„ค๊ณ„


3. DAG ๊ตฌ์„ฑ

[๊ธฐ์ƒ์ฒญ ๋‚ ์”จ API]
	โ†“ extract
[ํ•„์š”ํ•œ ์ง€์—ญ/๊ธฐ๊ฐ„ ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘]
	โ†“ transform
[์ฃผ๋ง/๊ณตํœด์ผ ํ•„ํ„ฐ๋ง โ†’ ์š”์•ฝ โ†’ ์ ์ˆ˜ ์‚ฐ์ถœ]
	โ†“ load
[Snowflake ๋˜๋Š” ๋‚ด๋ถ€ DB์— ์ ์žฌ]
	โ†“ notify
[Slack ๋ฉ”์‹œ์ง€๋กœ ์•Œ๋ฆผ ์ „์†ก]
  • extract(region_name: str) (TASK)
    • ๊ธฐ๋Šฅ: ์ง€์—ญ๋ณ„ ์ฃผ๊ฐ„ ๋‚ ์”จ ๋ฐ์ดํ„ฐ๋ฅผ ๊ธฐ์ƒ์ฒญ API์—์„œ ์ˆ˜์ง‘
    • ๋กœ์ง:
      • ์›”์š”์ผ์— DAG ์‹คํ–‰ ์‹œ, ์ „ ์ฃผ ์›”์š”์ผ~์ผ์š”์ผ ๋ฐ์ดํ„ฐ ์š”์ฒญ
      • region_name โ†’ region_code๋กœ ๋ณ€ํ™˜ ํ›„ ์š”์ฒญ
      • ์š”์ฒญ ๊ธฐ๊ฐ„: YYYYMMDD0000 ~ YYYYMMDD2300
      • ๊ฒฐ๊ณผ: ๋‚ ์”จ ๋ฐ์ดํ„ฐ ๋ฆฌ์ŠคํŠธ (records) ๋ฐ˜ํ™˜
  • transform(extracted: dict) (TASK)

    • ๊ธฐ๋Šฅ: ์ฃผ๋ง ๋˜๋Š” ๊ณตํœด์ผ ๋ฐ์ดํ„ฐ ํ•„ํ„ฐ๋ง ํ›„, ๋‚ ์”จ ์š”์•ฝ ๋ฐ ์•ผ์™ธ ์ ์ˆ˜ ๊ณ„์‚ฐ
    • ๋กœ์ง:
      • ๋‚ ์งœ ๊ธฐ์ค€์œผ๋กœ is_weekend ๋˜๋Š” is_holiday ์กฐ๊ฑด ํ•„ํ„ฐ๋ง
      • summarize_weather()๋กœ ํ‰๊ท  ๊ธฐ์˜จ, ์Šต๋„, ํ’์†, ๊ฐ•์ˆ˜๋Ÿ‰ ์š”์•ฝ
      • calculate_outdoor_score()๋กœ ์ ์ˆ˜ ์‚ฐ์ถœ
  • load(transformed: dict) (TASK)

    • ๊ธฐ๋Šฅ: ๊ณ„์‚ฐ๋œ ์ ์ˆ˜๋ฅผ DB ๋˜๋Š” Snowflake์— ์ €์žฅ
    • ๋กœ์ง:
      • insert_weather_score() ํ•จ์ˆ˜ ํ˜ธ์ถœ
      • ๊ฐ ์ง€์—ญ๋ณ„ ์š”์•ฝ ์ •๋ณด + ์ ์ˆ˜ โ†’ ํ…Œ์ด๋ธ”์— ์ €์žฅ
      • None์ด๋ฉด ์ ์žฌ ์Šคํ‚ต
  • notify(transformed_listL List[dict]) (TASK)

    • ๊ธฐ๋Šฅ: ์ƒ์œ„/ํ•˜์œ„ ์ง€์—ญ 3๊ณณ์”ฉ ์„ ์ • ํ›„, Slack ๋ฉ”์‹œ์ง€ ์ „์†ก
    • ๋กœ์ง:
      • ์ ์ˆ˜ ๊ธฐ์ค€์œผ๋กœ ๋‚ด๋ฆผ์ฐจ์ˆœ ์ •๋ ฌ
      • ์ƒ์œ„ 3๊ฐœ, ํ•˜์œ„ 3๊ฐœ ์ง€์—ญ ๊ฐ๊ฐ ๋ฉ”์‹œ์ง€ ํฌ๋งทํŒ…
      • Slack ๋ฉ”์‹œ์ง€ ์ „์†ก (๋‚ด์šฉ: ์ง€์—ญ๋ช…, ์ ์ˆ˜, ๋‚ ์”จ ์š”์•ฝ, ๋‚ ์งœ ๋“ฑ)


4. ๊ธฐ์ˆ ์  ๋„์ „ & ํ•ด๊ฒฐ ๊ณผ์ •

๐Ÿ”Ž ๋ฌธ์ œ 1: Backfill ๋ช…๋ น์–ด ์‹คํ–‰ํ–ˆ๋Š”๋ฐ ๋ฐ์ดํ„ฐ๊ฐ€ ์•ˆ ๋“ค์–ด๊ฐ

Airflow DAG ๊ตฌ์„ฑ:

with DAG(
    dag_id="weather_pipeline_dag",
    start_date=datetime(2025, 1, 6),  # ์›”์š”์ผ ์‹œ์ž‘
    schedule_interval="@weekly",
    catchup=True,
    ...
)

๋ฐฑํ•„ ๋ช…๋ น ์‹คํ–‰:

airflow dags backfill -s 2025-01-02 -e 2025-06-02 weather_pipeline_dag

๋ฌธ์ œ: backfill์„ ์‹คํ–‰ํ–ˆ์Œ์—๋„ ๋ถˆ๊ตฌํ•˜๊ณ  ๊ณผ๊ฑฐ ๋ฐ์ดํ„ฐ๊ฐ€ DB์— ์ €์žฅ๋˜์ง€ ์•Š์•˜๋‹ค ๐Ÿ˜ฑ

โœ… ํ•ด๊ฒฐ

- backfill์€ ์‹คํ–‰ ์ด๋ ฅ์ด ์—†๋Š” ๋‚ ์งœ์—๋งŒ ์ž‘๋™  
- DAG ์‹คํ–‰ ์ด๋ ฅ์ด ์žˆ๊ฑฐ๋‚˜ ์‹คํŒจ ๊ธฐ๋ก์ด ์žˆ์œผ๋ฉด ํ•ด๋‹น ๋‚ ์งœ๋Š” ๊ฑด๋„ˆ๋œ€  
- ํ•ด๊ฒฐ: Airflow UI์—์„œ DAG๋ฅผ ์‚ญ์ œํ•˜๊ณ  ๋‹ค์‹œ ๋“ฑ๋กํ•œ ํ›„ backfill ์‹คํ–‰

๐Ÿ“š ๋ฐฐ์šด ์ 

  • ์‹คํ–‰ ์ด๋ ฅ์ด ์žˆ๋Š” ๋‚ ์งœ๋Š” backfill ๋Œ€์ƒ์ด ์•„๋‹˜
  • ๊ฐœ๋ฐœ ์ค‘์—” DAG ์ดˆ๊ธฐํ™”๊ฐ€ ๋น ๋ฅธ ํ•ด๊ฒฐ์ฑ…์ผ ์ˆ˜ ์žˆ์Œ

๐Ÿ”Ž ๋ฌธ์ œ 2: ์ง€์—ญ๋ณ„ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ ๋™์  ํƒœ์Šคํฌ ๋งคํ•‘

  • ์ „๊ตญ ๋ฐ์ดํ„ฐ๋ฅผ ํ•œ ํƒœ์Šคํฌ์—์„œ ์ฒ˜๋ฆฌํ•˜๋ฉด ์ฝ”๋“œ ๋ณต์žก์„ฑ๊ณผ ์‹คํŒจ ์œ„ํ—˜ ์ฆ๊ฐ€

โœ… ํ•ด๊ฒฐ

extracted_list = extract.expand(region_name=regions)
transformed_list = transform.expand(extracted=extracted_list)
loaded_list = load.expand(transformed=transformed_list)
notify.override(trigger_rule=TriggerRule.ALL_SUCCESS)(loaded_list)

๐Ÿ“š ๋ฐฐ์šด ์ 

  • expand()๋กœ ๋ฆฌ์ŠคํŠธ ๊ธฐ๋ฐ˜ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ
  • ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ + ๊ฐœ๋ณ„ ์‹คํŒจ ๊ฐ์ง€์— ํšจ๊ณผ์ 

๐Ÿ”Ž ๋ฌธ์ œ 3: ๋ฐ์ดํ„ฐ ๋ฉฑ๋“ฑ์„ฑ(idempotency) ์ฒ˜๋ฆฌ

  • DAG ์žฌ์‹คํ–‰ ์‹œ ์ค‘๋ณต ์ ์žฌ ๋ฐฉ์ง€ ํ•„์š”
  • MERGE ๋ฌธ์œผ๋กœ ๋™์ผ ๋‚ ์งœ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์—…๋ฐ์ดํŠธํ•˜๊ฑฐ๋‚˜ ์ƒˆ๋กœ ์‚ฝ์ž…

โœ… ํ•ด๊ฒฐ

MERGE INTO WEATHER_DB.JINYOUNG.WEATHER_SCORE tgt
USING (
    SELECT '{region_code}' AS region_code, '{date}' AS date, ...
) src
ON tgt.region_code = src.region_code AND tgt.date = src.date
WHEN MATCHED THEN
    UPDATE SET ...
WHEN NOT MATCHED THEN
    INSERT (...) VALUES (...);

๐Ÿ“š ๋ฐฐ์šด ์ 

  • MERGE ๋ฌธ์€ ๋ฉฑ๋“ฑ์„ฑ๊ณผ ์ตœ์‹ ์„ฑ ๋™์‹œ ๋ณด์žฅ
  • Airflow ์žฌ์‹คํ–‰/๋ฐฑํ•„์— ๋งค์šฐ ์œ ์šฉํ•œ ๋ฐฉ์‹

๐Ÿ”Ž ๋ฌธ์ œ 4: Task ์‹คํ–‰ ์˜ค๋ฅ˜ ๋””๋ฒ„๊น…์˜ ์—ฐ์†

  • NoneType, ๋ฆฌ์ŠคํŠธ ์ธ๋ฑ์Šค ์—๋Ÿฌ, Slack ๋ฉ”์‹œ์ง€ ํฌ๋งท ์˜ค๋ฅ˜ ๋“ฑ ๋‹ค์–‘ํ•œ ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒ

โœ… ํ•ด๊ฒฐ

  • ๋กœ๊ทธ ํ™•์ธ โ†’ ๋ณ€์ˆ˜ ์ถœ๋ ฅ โ†’ ์กฐ๊ฑด ๋ถ„๊ธฐ ์ฒ˜๋ฆฌ ์ˆœ์„œ๋กœ ๋””๋ฒ„๊น…
if not filtered:
    print("์ฃผ๋ง/๊ณตํœด์ผ ๋ฐ์ดํ„ฐ ์—†์Œ, ์ ์žฌ ์Šคํ‚ต")
    return None

๐Ÿ“š ๋ฐฐ์šด ์ 

  • ๋กœ๊ทธ ๋ถ„์„๊ณผ ๋ณ€์ˆ˜ ์ถ”์ ์ด ์—๋Ÿฌ ํ•ด๊ฒฐ์˜ ํ•ต์‹ฌ
  • ์˜ˆ์™ธ ์ƒํ™ฉ์„ ๊ณ ๋ คํ•œ ๋ฐฉ์–ด์  ์ฝ”๋”ฉ ์ค‘์š”

๐Ÿ”Ž ๋ฌธ์ œ 5: Slack ๋ด‡ ์—ฐ๋™ ๋ฐ ์•Œ๋ฆผ ํฌ๋งท ์„ค๊ณ„

  • ๋ชจ๋“  ์ง€์—ญ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•˜๊ณ  ๋งŒ์กฑ๋„ ์ ์ˆ˜๋ฅผ ๊ณ„์‚ฐํ•œ ๋’ค, ๊ฒฐ๊ณผ๋ฅผ ํŒ€ Slack ์ฑ„๋„์— ์ž๋™ ์•Œ๋ฆผ์œผ๋กœ ์ „์†ก
  • ์ƒ์œ„ 3๊ฐœ ์ง€์—ญ๊ณผ ํ•˜์œ„ 3๊ฐœ ์ง€์—ญ์„ ๋งŒ์กฑ๋„ ๊ธฐ์ค€์œผ๋กœ ์ •๋ ฌํ•˜์—ฌ ์‹œ๊ฐ์ ์œผ๋กœ ๋ณด๊ธฐ ์ข‹๊ฒŒ ํฌ๋งทํŒ…

โœ… ํ•ด๊ฒฐ

def send_slack_message(message: str):
    webhook_url = Variable.get("slack_webhook_url")  
    payload = {"text": message}
    response = requests.post(webhook_url, json=payload)

    if response.status_code == 200:
        print("โœ… Slack ์ „์†ก ์„ฑ๊ณต")
    else:
        print(f"โŒ Slack ์ „์†ก ์‹คํŒจ: {response.status_code}, {response.text}")

Slack Webhook URL์„ Airflow ํ™˜๊ฒฝ ๋ณ€์ˆ˜์— ์ €์žฅ

์•Œ๋ฆผ ๋ฉ”์‹œ์ง€๋Š” ํ…์ŠคํŠธ ์ •๋ ฌ ๋ฐ format_slack_message() ํ•จ์ˆ˜๋กœ ํฌ๋งท ๊ตฌ์„ฑ

notify() task์—์„œ ์ „์ฒด ๊ฒฐ๊ณผ๋ฅผ ์ทจํ•ฉ ํ›„ ์Šฌ๋ž™ ๋ฉ”์‹œ์ง€ ์ „์†ก

๐Ÿ“š ๋ฐฐ์šด ์ 

  • ๋ฐ์ดํ„ฐ ๋ถ„์„ ๊ฒฐ๊ณผ๋ฅผ ๋‹จ์ˆœ ์ €์žฅ์ด ์•„๋‹ˆ๋ผ ์•Œ๋ฆผ/์‹œ๊ฐํ™”๋กœ ์—ฐ๊ฒฐํ•˜๋Š” ๊ฒฝํ—˜์ด ์ธ์‚ฌ์ดํŠธ ์ „๋‹ฌ์— ์ค‘์š”ํ•จ

  • ํŒ€์›๋“ค๊ณผ ๋น ๋ฅด๊ฒŒ ๊ฒฐ๊ณผ๋ฅผ ๊ณต์œ ํ•  ์ˆ˜ ์žˆ์–ด ์‹ค์‹œ๊ฐ„ ํ˜‘์—…์—๋„ ๋งค์šฐ ์œ ์šฉ


5. ๋ฐฐ์šด ์  & ๋А๋‚€ ์ 

  • backfill, ๋ฉฑ๋“ฑ์„ฑ, ๋™์  ํƒœ์Šคํฌ ๋งคํ•‘ ๋“ฑ ์‹ค๋ฌด์—์„œ ๊ผญ ํ•„์š”ํ•œ Airflow ๊ธฐ๋Šฅ์„ ์ง์ ‘ ์จ๋ดค๋‹ค
  • extract, transform, load, notify ๊ฐ„์˜ task๋“ค์„ ์ง์ ‘ ์ •์˜ํ•˜๊ณ , ํŒŒ์ดํ”„๋ผ์ธ์„ ์ž๋™ํ™”ํ•˜์˜€๋‹ค.
  • ์˜ค๋ฅ˜ ๋””๋ฒ„๊น…, ๋กœ๊ทธ ํ™•์ธ, ํŒŒ๋ผ๋ฏธํ„ฐ ์ „๋‹ฌ ๋“ฑ ๋ชจ๋“  ๊ณผ์ •์—์„œ ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด๋ง์„ ์ฒดํ—˜ํ•  ์ˆ˜ ์žˆ์—ˆ๋‹ค

๐Ÿ”— 6. GitHub ๋งํฌ

๐Ÿ‘‰ ํ”„๋กœ์ ํŠธ ์ „์ฒด ์ฝ”๋“œ ๋ณด๊ธฐ

profile
๋ฐ์ดํ„ฐ๋ฅผ ํ–ฅํ•ด, ํ•œ ๊ฑธ์Œ์”ฉ ์ฒœ์ฒœํžˆ.

0๊ฐœ์˜ ๋Œ“๊ธ€