ETL 과정중 첫번째로 해야할 부분이 데이터를 추출해내는 것이다.
OpenAPI를 이용하려고하고 KEY는 사전에 발급을 받았다.Python Operator을 사용해서 외부 API호출을 통해 데이터를 로컬에 저장해보려 한다.
request 메서드를 사용하는 것은 동일하나 크게 2가지를 고려해볼 수 있는데 다음과 같다.
데이터를 가져오는 함수를 모듈로 작성하고 import하여 PythonOperator의 python_callable에 함수를 전달하는 방식이 있다.
python경로를 환경변수에 추가해주면 airflow에서 인식하는 경로와 같게되어 바로 dags를 실행할 수 있으며, import해오는 방식이기 때문에 api를 함수에 정의하고 dags에 해당 키가 나오지 않아도 된다는 장점이 있다. wsl에 pull을 한 후에 해당 환경에서 api key를 작성해주어야 한다는 번거로움이 존재한다.두번째 방식은 DAGS에 함께 함수를 정의하여 @task 혹은 첫번째와 같은 방식으로 함수를 인자에 전달하는 방식이 있다.
wsl에서 추가적인 수정을 하지 않아도 된다는 점이 매력적이었지만, dags 스크립트에 api를 대놓고 작성해놓아야 한다는 치명적인 단점이 존재했다.airflow에서 자체적으로 제공하는 airflow.models의 Variable모듈이다.

Airflow서비스를 띄우고 Admin > Variables 탭에서 key를 설정해놓을 수 있다.
Key 부분에 api, key등등.. 이름이 들어가면 자동으로 *처리를 해준다.
Variable 메서드를 이용하여 key를 호출한 모습from airflow.models import Variable
api_key = Variable.get("apikey_openapi_nexon")
api 데이터호출 dagsfrom airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from datetime import datetime
import pendulum
import time
import requests
import json
from pprint import pprint
# params
# api key
api_key = Variable.get("apikey_openapi_nexon")
# 날짜 파싱
target_date = datetime.now().strftime("%Y-%m-%d")
# DAG
with DAG(
dag_id = "dags_get_data_python_operator",
schedule= "0 10 * * *",
start_date= pendulum.datetime(2024,12,11, tz = "Asia/Seoul"),
catchup=False
) as dag :
# GET DATA FUCTION
def get_data():
pprint(f"{target_date} 의 rankingdata 호출을 시작합니다.")
headers = {
"x-nxopen-api-key" : f"{api_key}",
"User-agent" : "Mozilla/5.0"
}
# json 파일을 담을 객체
mydata = []
# 1페이지당 200명의 랭킹정보
for i in range(1,2):
if i % 20 == 0:
time.sleep(15)
url = f"https://open.api.nexon.com/maplestory/v1/ranking/overall?date={target_date}&world_name=%EC%97%98%EB%A6%AC%EC%8B%9C%EC%9B%80&page={i}"
req = requests.get(url = url, headers = headers)
data = req.json()
mydata.append(data)
else:
# 페이지 정보 추가
url = f"https://open.api.nexon.com/maplestory/v1/ranking/overall?date={target_date}&world_name=%EC%97%98%EB%A6%AC%EC%8B%9C%EC%9B%80&page={i}"
req = requests.get(url = url, headers = headers)
data = req.json()
mydata.append(data)
pprint(mydata)
# 호출된 데이터 객체를 data 디렉토리에 저장
file_path = f"/opt/airflow/data/ranking_{target_date}.json"
with open (file_path, "w", encoding = "UTF-8-SIG") as f:
json.dump(mydata
,f
,ensure_ascii=False
,indent='\t'
)
print("done")
# task
get_data_ = PythonOperator(
task_id = "get_data",
python_callable=get_data
)
get_data_
저장위치가 airflow의 컨테이너 내부이지만, 나의 로컬 data 디렉터리에 마운트되어 있기 때문에 로컬에도 저장이 되어 있고 이를 사용할 수 있게 됐다.
해당 데이터를 이제 pyspark를 이용하여 정제하고 데이터 모델링을 진행할 예정이다.
해야할 것)
spark 이미지pull 하고 container 빌드하기docker-compose 파일에서 network 추가spark-submit을 위한 airflow컨테이너 빌드 (with dockerfile)