⚙️ Airflow에 알라딘 크롤링 자동화

문해피·2023년 5월 31일
0

airflow

목록 보기
4/5

처음 혼자 연습했던 알라딘의 API를 이용해서 중고책들의 정보를 가져오는 코드를 통해서 매일 같은시간에 데이터를 받아오기 위해서 어떤 워크플로우가 좋을지 luigi와 Airflow 고민하던중 다양한 커뮤니티와 다양한 통합및 플러그인이 있는 Airflow에서 간단하게 작업을 수행해보겠습니다.

🐜 연습했던 크롤링 코드


import requests
import json

TTBKey = '발급받으신 TTBKey'
book_dic = {}
book_lists=[]

for i in range(1,11):
    url = f"http://www.aladin.co.kr/ttb/api/ItemList.aspx?ttbkey={TTBKey}&QueryType=ItemNewAll&SearchTarget=Used&SubSearchTarget=Book&MaxResults=50&start={i}&output=js&Version=20131101&OptResult=usedList"
    res = requests.get(url)
    items = json.loads(res.text)['item']
    for items2 in items:
        book_dic['itemId']=items2['itemId']# 책 아이디
        book_dic['title']=items2['title'][5:] #제목
        book_dic['author']=items2['author'] # 글쓴이
        book_dic['priceStandard']=items2['priceStandard'] # 원가
        book_dic['SitepriceSales']=items2['subInfo']['newBookList'][0]['priceSales'] # 사이트 새책 할인가
        book_dic['priceSales']=items2['priceSales'] # 할인가
        book_dic['categoryName_L']=items2['categoryName'].split('>')[1] # 큰 카테고리
        book_dic['categoryName_S']=items2['categoryName'].split('>')[2] # 작은 카테고리
        book_dic['customerReviewRank']=items2['customerReviewRank'] # 순위
        book_lists.append(book_dic)
    print(f'{i}번째 입니다')

🐜 Airflow의 workflow 작성하기

  • 이제 위의 코드를 통해서 DAG를 작성해보겠습니다.
    하지만 위에서의 코드에서 카테고리를 나누는곳에서 문자열이 아닌 공백으로 되어있는곳이 있기 때문에 그런곳은 None로 하였습니다.
  • 연습목적으로 csv파일을 저장하고 불러오는것을 하였습니다.
from datetime import date, datetime 
import requests
import pandas as pd
import json
import pathlib
import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import os

dag = DAG(
    dag_id="aladin_book", #dag id
    description="aladin book data", #dag의 설명
    start_date=datetime(2023, 5, 24, 0, 0),  # 시작 날짜 및 시간 설정
    schedule_interval='30 16 * * *', #날짜 상관없이 주기적으로 받을 시간 
)


def _get_symbol():
    pathlib.Path("/home/airflow/data").mkdir(parents=True, exist_ok=True)
    TTBKey = '받아오신 TTBKey'
    items = []

    for start_value in range(1, 11):
        url = f"http://www.aladin.co.kr/ttb/api/ItemList.aspx?ttbkey={TTBKey}&QueryType=ItemNewAll&SearchTarget=Used&SubSearchTarget=Book&MaxResults=50&start={start_value}&output=js&Version=20131101&OptResult=usedList"
        res = requests.get(url)
        items.extend(res.json()['item'])

    pd.DataFrame(items).to_csv("/home/airflow/data/aladinbook.csv", index=False)


def _get_data():
    total = []
    today = str(date.today()).replace("-","")
    book = pd.read_csv("/home/airflow/data/aladinbook.csv")
    for index, items2 in book.iterrows():
        book_dic = {
            'itemId': items2['itemId'],
            'title': items2['title'][5:],
            'author': items2['author'],
            'priceStandard': items2['priceStandard'],
            'priceSales': items2['priceSales'],
            'customerReviewRank': items2['customerReviewRank'],
        }

        if isinstance(items2['categoryName'], str): #문자열인 경우에만 데이터를 처리
            category_splits = items2['categoryName'].split('>')
            book_dic['categoryName_L'] = category_splits[1]
            book_dic['categoryName_S'] = category_splits[2]
        else:
            book_dic['categoryName_L'] = None
            book_dic['categoryName_S'] = None

        subInfo = json.loads(items2['subInfo'].replace("'", '"'))

        if 'newBookList' in subInfo and subInfo['newBookList']:
            book_dic['SitepriceSales'] = subInfo['newBookList'][0]['priceSales']
        else:
            book_dic['SitepriceSales'] = None

        total.append(book_dic)
    pd.DataFrame(total).to_csv(f"/home/airflow/data/aladin{today}.csv", index=False)
    
get_url = PythonOperator(
	task_id="get_url", python_callable=_get_url, dag=dag
)

get_data = PythonOperator(
    task_id="get_data", python_callable=_get_data, dag=dag
)

get_url >> get_data #순서
  • 이렇게 하게되면 처음 Airflow컨테이너를 만들었을때 로컬의 data라는 폴더와 컨테이너안의 data라는 폴더를 볼륨시켰기 때문에 코드에서는 컨테이너에서만 파일이 저장되었지만 결과적으로는 로컬의 data라는 폴더에서도 볼수 있게 됩니다.

처음 작성했던 그대로 DAG에 넣었을때 안됐었지만 그럴때마다 로그를 확인하면서 에러를 분석할수 있었습니다.

  • 성찰한 것들
  1. 받아온 데이터가 문자열이 아니라 아무것도 없는 값일때 어떻게 처리를 해야하는지
  2. 받아온 데이터가 일반적인 JSON형태를 구분해서 어떻게 처리해야하는지
  3. 최종으로 저장된 데이터를 관계형 데이터 베이스에 저장 가능 여부를 확인해 볼 수 없는지
  4. 왜 많은 워크플로우 관리 도구중 Airflow인지
profile
행복하려고 개발공부하는 문광식의 로그파일입니다.

0개의 댓글