PythonOperator

복준수·2024년 12월 15일

🏹 1. ETL중 데이터 수집하기

ETL 과정중 첫번째로 해야할 부분이 데이터를 추출해내는 것이다.

  • 내가 즐겨하는 게임사의 OpenAPI를 이용하려고하고 KEY는 사전에 발급을 받았다.
  • 데이터를 요청하는 방식은 친절하게 설명이 되어 있었다.

https://openapi.nexon.com/ko/

🏹 2. API 호출키 숨기기

Python Operator을 사용해서 외부 API호출을 통해 데이터를 로컬에 저장해보려 한다.
request 메서드를 사용하는 것은 동일하나 크게 2가지를 고려해볼 수 있는데 다음과 같다.

  • 데이터를 가져오는 함수를 모듈로 작성하고 import하여 PythonOperatorpython_callable에 함수를 전달하는 방식이 있다.

    • 장점) 해당 방식의 경우 python경로를 환경변수에 추가해주면 airflow에서 인식하는 경로와 같게되어 바로 dags를 실행할 수 있으며, import해오는 방식이기 때문에 api를 함수에 정의하고 dags에 해당 키가 나오지 않아도 된다는 장점이 있다.
    • 단점) 하지만 wslpull을 한 후에 해당 환경에서 api key를 작성해주어야 한다는 번거로움이 존재한다.
  • 두번째 방식은 DAGS에 함께 함수를 정의하여 @task 혹은 첫번째와 같은 방식으로 함수를 인자에 전달하는 방식이 있다.

    • 이 방식은 wsl에서 추가적인 수정을 하지 않아도 된다는 점이 매력적이었지만, dags 스크립트에 api를 대놓고 작성해놓아야 한다는 치명적인 단점이 존재했다.
    • 이러한 보안상 취약점을 해결해줄 수 있는 부분이 존재했는데 바로 airflow에서 자체적으로 제공하는 airflow.modelsVariable모듈이다.

🏹 3. Variable 설정

Airflow서비스를 띄우고 Admin > Variables 탭에서 key를 설정해놓을 수 있다.
Key 부분에 api, key등등.. 이름이 들어가면 자동으로 *처리를 해준다.

  • Variable 메서드를 이용하여 key를 호출한 모습
from airflow.models import Variable
api_key = Variable.get("apikey_openapi_nexon")
  • 최종으로 작성된 api 데이터호출 dags
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable

from datetime import datetime
import pendulum
import time
import requests
import json
from pprint import pprint

# params
# api key
api_key = Variable.get("apikey_openapi_nexon")
# 날짜 파싱
target_date = datetime.now().strftime("%Y-%m-%d")

# DAG
with DAG(
    dag_id = "dags_get_data_python_operator",
    schedule= "0 10 * * *",
    start_date= pendulum.datetime(2024,12,11, tz = "Asia/Seoul"),
    catchup=False
) as dag :
    # GET DATA FUCTION
    def get_data():
        pprint(f"{target_date} 의 rankingdata 호출을 시작합니다.")
        headers = {
        "x-nxopen-api-key" : f"{api_key}",
        "User-agent" : "Mozilla/5.0"
        }
        # json 파일을 담을 객체
        mydata = []
        # 1페이지당 200명의 랭킹정보
        for i in range(1,2):
            if i % 20 == 0:
                time.sleep(15)
                url = f"https://open.api.nexon.com/maplestory/v1/ranking/overall?date={target_date}&world_name=%EC%97%98%EB%A6%AC%EC%8B%9C%EC%9B%80&page={i}"
                req = requests.get(url = url, headers = headers)
                data = req.json()
                mydata.append(data)
            else:
                # 페이지 정보 추가
                url = f"https://open.api.nexon.com/maplestory/v1/ranking/overall?date={target_date}&world_name=%EC%97%98%EB%A6%AC%EC%8B%9C%EC%9B%80&page={i}"
                req = requests.get(url = url, headers = headers)
                data = req.json()
                mydata.append(data)
        pprint(mydata)
        # 호출된 데이터 객체를 data 디렉토리에 저장
        file_path = f"/opt/airflow/data/ranking_{target_date}.json"
        with open (file_path, "w", encoding = "UTF-8-SIG") as f:
            json.dump(mydata
                      ,f
                      ,ensure_ascii=False
                      ,indent='\t'
                        )
        print("done")

    # task
    get_data_ = PythonOperator(
        task_id = "get_data",
        python_callable=get_data
    )

    get_data_

🏹 4. next

  • 저장위치가 airflow의 컨테이너 내부이지만, 나의 로컬 data 디렉터리에 마운트되어 있기 때문에 로컬에도 저장이 되어 있고 이를 사용할 수 있게 됐다.

  • 해당 데이터를 이제 pyspark를 이용하여 정제하고 데이터 모델링을 진행할 예정이다.

  • 해야할 것)

      1. spark 이미지pull 하고 container 빌드하기
      1. docker-compose 파일에서 network 추가
      1. spark-submit을 위한 airflow컨테이너 빌드 (with dockerfile)

0개의 댓글