๐ŸŒ REST API๋ฅผ ํ†ตํ•œ ๊ตญ๊ฐ€์ •๋ณด ์ˆ˜์ง‘ ๋ฐ Redshift ์ ์žฌ DAG ๊ตฌํ˜„

Jinyoung Cheonยท2025๋…„ 5์›” 29์ผ
0

Airflow ์—์„œ REST API๋ฅผ ํ†ตํ•ด ๊ฐ€์ ธ์˜จ ๊ตญ๊ฐ€์ •๋ณด๋ฅผ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค(Redshift)์— ์ ์žฌํ•˜๋Š” DAG์„ ๊ตฌํ˜„ํ–ˆ๋‹ค.

๋งํฌ
https://github.com/jinyoung0711/learn-airflow/tree/main/country_info

ํŠน์ง• & ์ž‘์—… ๋ช…์„ธ

๐Ÿ”น ์ž‘์—… ๋ช…์„ธ

  1. Full Refresh ๋ฐฉ์‹์œผ๋กœ ๊ตฌํ˜„ํ•˜์—ฌ, ์‹คํ–‰ ์‹œ๋งˆ๋‹ค ํ…Œ์ด๋ธ”์„ ์ƒˆ๋กœ ์ƒ์„ฑํ•˜๊ณ  ์ „์ฒด ๋ฐ์ดํ„ฐ๋ฅผ ๋ฎ์–ด์”€

  2. API URL: https://restcountries.com/v3.1/all

  3. ์ถ”์ถœ ํ•ญ๋ชฉ (์˜ˆ์™ธ ์ฒ˜๋ฆฌ ํฌํ•จ):

    • name โ†’ ['name']['official']

    • population โ†’ ['population']

    • area โ†’ ['area']

  4. Redshift ์ ์žฌ ๋Œ€์ƒ: ๋ณธ์ธ ์Šคํ‚ค๋งˆ์˜ country_info ํ…Œ์ด๋ธ”

  5. ์Šค์ผ€์ค„๋ง: ๋งค์ฃผ ํ† ์š”์ผ ์˜ค์ „ 6์‹œ 30๋ถ„(UTC ๊ธฐ์ค€)์— ์‹คํ–‰

๐Ÿ“‚ ๊ตฌํ˜„ ํŒŒ์ผ

  1. country_info_dag.py

    • DAG ์ •์˜, @task ๋ฐ์ฝ”๋ ˆ์ดํ„ฐ๋ฅผ ํ™œ์šฉํ•œ extract/load ๊ตฌ์กฐํ™”

    • schedule ๋ฐ Redshift ๋Œ€์ƒ ์Šคํ‚ค๋งˆ ์ง€์ • ํฌํ•จ

  2. country_utils.py

    • API ์š”์ฒญ ๋ฐ JSON ํŒŒ์‹ฑ (get_country_data())

    • Redshift ์ปค๋„ฅ์…˜ ํ›„ ํ…Œ์ด๋ธ” ์ƒ์„ฑ ๋ฐ INSERT (load_country_data())

    • ๋ฐ์ดํ„ฐ ๋ณ€ํ™˜ ์‹œ ํŠน์ˆ˜๋ฌธ์ž ์ฒ˜๋ฆฌ ๋ฐ ์˜ˆ์™ธ ๋Œ€์‘ ํฌํ•จ

๐Ÿš€ DAG ๊ตฌ์กฐ

with DAG(...) as dag:
    @task
    def extract():
        return get_country_data()

    @task
    def load(data):
        return load_country_data("jk990711", "country_info", data)

    load(extract())
extract() โ†’ REST API์—์„œ JSON ๊ฐ€์ ธ์˜ค๊ธฐ

load() โ†’ Redshift์— FULL REFRESH ๊ณ„์‚ฐ ์ ์žฌ

โœ… ๊ฒฐ๊ณผ

Airflow UI์—์„œ DAG๊ฐ€ ์ฃผ๊ธฐ์ ์œผ๋กœ ์‹คํ–‰๋˜๋ฉฐ ๊ตญ๊ฐ€ ์ •๋ณด๊ฐ€ ์„ฑ๊ณต์ ์œผ๋กœ Redshift์— ์ €์žฅ๋จ

๋ฐ์ดํ„ฐ๋Š” name, population, area ์ปฌ๋Ÿผ์œผ๋กœ ๊ตฌ์„ฑ๋œ ํ…Œ์ด๋ธ”๋กœ ์ •๋ฆฌ๋จ

profile
๋ฐ์ดํ„ฐ๋ฅผ ํ–ฅํ•ด, ํ•œ ๊ฑธ์Œ์”ฉ ์ฒœ์ฒœํžˆ.

0๊ฐœ์˜ ๋Œ“๊ธ€