Airflow ์์ REST API๋ฅผ ํตํด ๊ฐ์ ธ์จ ๊ตญ๊ฐ์ ๋ณด๋ฅผ ๋ฐ์ดํฐ๋ฒ ์ด์ค(Redshift)์ ์ ์ฌํ๋ DAG์ ๊ตฌํํ๋ค.
๋งํฌ
https://github.com/jinyoung0711/learn-airflow/tree/main/country_info
Full Refresh ๋ฐฉ์์ผ๋ก ๊ตฌํํ์ฌ, ์คํ ์๋ง๋ค ํ ์ด๋ธ์ ์๋ก ์์ฑํ๊ณ ์ ์ฒด ๋ฐ์ดํฐ๋ฅผ ๋ฎ์ด์
API URL: https://restcountries.com/v3.1/all
์ถ์ถ ํญ๋ชฉ (์์ธ ์ฒ๋ฆฌ ํฌํจ):
name โ ['name']['official']
population โ ['population']
area โ ['area']
Redshift ์ ์ฌ ๋์: ๋ณธ์ธ ์คํค๋ง์ country_info ํ ์ด๋ธ
์ค์ผ์ค๋ง: ๋งค์ฃผ ํ ์์ผ ์ค์ 6์ 30๋ถ(UTC ๊ธฐ์ค)์ ์คํ
country_info_dag.py
DAG ์ ์, @task ๋ฐ์ฝ๋ ์ดํฐ๋ฅผ ํ์ฉํ extract/load ๊ตฌ์กฐํ
schedule ๋ฐ Redshift ๋์ ์คํค๋ง ์ง์ ํฌํจ
country_utils.py
API ์์ฒญ ๋ฐ JSON ํ์ฑ (get_country_data())
Redshift ์ปค๋ฅ์ ํ ํ ์ด๋ธ ์์ฑ ๋ฐ INSERT (load_country_data())
๋ฐ์ดํฐ ๋ณํ ์ ํน์๋ฌธ์ ์ฒ๋ฆฌ ๋ฐ ์์ธ ๋์ ํฌํจ
with DAG(...) as dag:
@task
def extract():
return get_country_data()
@task
def load(data):
return load_country_data("jk990711", "country_info", data)
load(extract())
extract() โ REST API์์ JSON ๊ฐ์ ธ์ค๊ธฐ
load() โ Redshift์ FULL REFRESH ๊ณ์ฐ ์ ์ฌ
Airflow UI์์ DAG๊ฐ ์ฃผ๊ธฐ์ ์ผ๋ก ์คํ๋๋ฉฐ ๊ตญ๊ฐ ์ ๋ณด๊ฐ ์ฑ๊ณต์ ์ผ๋ก Redshift์ ์ ์ฅ๋จ

๋ฐ์ดํฐ๋ name, population, area ์ปฌ๋ผ์ผ๋ก ๊ตฌ์ฑ๋ ํ ์ด๋ธ๋ก ์ ๋ฆฌ๋จ