처음 혼자 연습했던 알라딘의 API를 이용해서 중고책들의 정보를 가져오는 코드를 통해서 매일 같은시간에 데이터를 받아오기 위해서 어떤 워크플로우가 좋을지 luigi와 Airflow 고민하던중 다양한 커뮤니티와 다양한 통합및 플러그인이 있는 Airflow에서 간단하게 작업을 수행해보겠습니다.
import requests
import json
TTBKey = '발급받으신 TTBKey'
book_dic = {}
book_lists=[]
for i in range(1,11):
url = f"http://www.aladin.co.kr/ttb/api/ItemList.aspx?ttbkey={TTBKey}&QueryType=ItemNewAll&SearchTarget=Used&SubSearchTarget=Book&MaxResults=50&start={i}&output=js&Version=20131101&OptResult=usedList"
res = requests.get(url)
items = json.loads(res.text)['item']
for items2 in items:
book_dic['itemId']=items2['itemId']# 책 아이디
book_dic['title']=items2['title'][5:] #제목
book_dic['author']=items2['author'] # 글쓴이
book_dic['priceStandard']=items2['priceStandard'] # 원가
book_dic['SitepriceSales']=items2['subInfo']['newBookList'][0]['priceSales'] # 사이트 새책 할인가
book_dic['priceSales']=items2['priceSales'] # 할인가
book_dic['categoryName_L']=items2['categoryName'].split('>')[1] # 큰 카테고리
book_dic['categoryName_S']=items2['categoryName'].split('>')[2] # 작은 카테고리
book_dic['customerReviewRank']=items2['customerReviewRank'] # 순위
book_lists.append(book_dic)
print(f'{i}번째 입니다')
- 이제 위의 코드를 통해서 DAG를 작성해보겠습니다.
하지만 위에서의 코드에서 카테고리를 나누는곳에서 문자열이 아닌 공백으로 되어있는곳이 있기 때문에 그런곳은 None로 하였습니다.
- 연습목적으로 csv파일을 저장하고 불러오는것을 하였습니다.
from datetime import date, datetime
import requests
import pandas as pd
import json
import pathlib
import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import os
dag = DAG(
dag_id="aladin_book", #dag id
description="aladin book data", #dag의 설명
start_date=datetime(2023, 5, 24, 0, 0), # 시작 날짜 및 시간 설정
schedule_interval='30 16 * * *', #날짜 상관없이 주기적으로 받을 시간
)
def _get_symbol():
pathlib.Path("/home/airflow/data").mkdir(parents=True, exist_ok=True)
TTBKey = '받아오신 TTBKey'
items = []
for start_value in range(1, 11):
url = f"http://www.aladin.co.kr/ttb/api/ItemList.aspx?ttbkey={TTBKey}&QueryType=ItemNewAll&SearchTarget=Used&SubSearchTarget=Book&MaxResults=50&start={start_value}&output=js&Version=20131101&OptResult=usedList"
res = requests.get(url)
items.extend(res.json()['item'])
pd.DataFrame(items).to_csv("/home/airflow/data/aladinbook.csv", index=False)
def _get_data():
total = []
today = str(date.today()).replace("-","")
book = pd.read_csv("/home/airflow/data/aladinbook.csv")
for index, items2 in book.iterrows():
book_dic = {
'itemId': items2['itemId'],
'title': items2['title'][5:],
'author': items2['author'],
'priceStandard': items2['priceStandard'],
'priceSales': items2['priceSales'],
'customerReviewRank': items2['customerReviewRank'],
}
if isinstance(items2['categoryName'], str): #문자열인 경우에만 데이터를 처리
category_splits = items2['categoryName'].split('>')
book_dic['categoryName_L'] = category_splits[1]
book_dic['categoryName_S'] = category_splits[2]
else:
book_dic['categoryName_L'] = None
book_dic['categoryName_S'] = None
subInfo = json.loads(items2['subInfo'].replace("'", '"'))
if 'newBookList' in subInfo and subInfo['newBookList']:
book_dic['SitepriceSales'] = subInfo['newBookList'][0]['priceSales']
else:
book_dic['SitepriceSales'] = None
total.append(book_dic)
pd.DataFrame(total).to_csv(f"/home/airflow/data/aladin{today}.csv", index=False)
get_url = PythonOperator(
task_id="get_url", python_callable=_get_url, dag=dag
)
get_data = PythonOperator(
task_id="get_data", python_callable=_get_data, dag=dag
)
get_url >> get_data #순서
처음 작성했던 그대로 DAG에 넣었을때 안됐었지만 그럴때마다 로그를 확인하면서 에러를 분석할수 있었습니다.