크롤링을 한사이트에서만 하는게 아닌 여러 사이트에서 데이터를 수집할 예정이다. 할 수 있는 한도 내에서 최대한 데이터 정합성을 맞추기 위해 Airflow를 통해 전체적인 작업을 스케줄링하고자 한다.
전체적인 설계는 다음과 같다.
수집할 쿠팡 카테고리 아이디를 미리 수집해두고 해당 카테고리의 마지막 페이지가 몇 페이지인지 확인한다. 확인한 페이지를 바탕으로 URL를 생성하여 크롤링을 진행하고 원시데이터는 PostgresSQL에 적재한다.
Airflow를 로컬에서 간편하게 구동시키기 위하여 Docker로 필요한 컨테이너를 먼저 구축했다.
각 컨테이너의 역할은 다음과 같다.
airflow-cli
airflow init
airflow worker
airflow scheduler
airflow triggerer
airflow webserver
redis
코드는 다음 링크를 참고하길 바란다.
https://github.com/barabobBOB/PriceTracker/blob/%234/docker-compose.yml
이제 Airflow로 Job를 나눠야 하기 때문에 앞전에서 작성한 코드를 수정해보자.
def check_last_page(category_id: int) -> int:
response = requests.get(construct_url(category_id, 1), headers=set_header())
crawling_waiting_time()
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
page = soup.find('div', class_='product-list-paging')
return int(page['data-total'])
def get_last_pages(categories_id: int, idx: int, **context) -> None:
last_pages = [check_last_page(category_id) for category_id in categories_id]
context["task_instance"].xcom_push(key="last_pages_" + idx, value=last_pages)
def create_url_list(categories_id: list[int], idx: int, **context) -> None:
last_pages = context["task_instance"].xcom_pull(
key="last_pages_" + idx
)
url_list = [
[construct_url(category_id, page), category_id]
for category_id, last_page in zip(categories_id, last_pages)
for page in range(1, last_page + 1)
]
context["task_instance"].xcom_push(key="url_list_" + idx, value=url_list)
xcom_push
를 활용해서 이전 작업들의 결과값을 저장하고 이를 다음 작업에서 꺼내어 사용할 수 있도록 구현하였다. get_last_pages
로 마지막 페이지가 몇 페이지인지 찾고 create_url_list
를 통해서 URL를 만든다.
크롤링한 원시데이터를 저장하기 위해서 데이터베이스 핸들러를 작성하였다. 특히, airflow.providers
에서 PostgresHook을 활용해서 간단하게 쿼리문을 작성해봤다. 데이터베이스 연결과 커밋 등 손수 해줘야 하는 작업이 많지만 PostgresHook을 활용하면 간편하게 데이터를 적재할 수 있다.
import logging
from airflow.providers.postgres.hooks.postgres import PostgresHook
class DatabaseHandler:
def __init__(self):
self.hook = PostgresHook(postgres_conn_id='postgres_conn')
self.logger = logging.getLogger(self.__class__.__name__)
def create_coupang_products(self):
query = """
CREATE TABLE IF NOT EXISTS coupang_products (
id SERIAL PRIMARY KEY,
product_id BIGINT,
title TEXT,
price TEXT,
per_price TEXT,
star FLOAT,
review_count TEXT,
category_id BIGINT
)
"""
self.hook.run(query)
self.logger.info("Table coupang_products created successfully.")
def insert_product(self, product_id, title, price, per_price, star, review_count, category_id):
query = """
INSERT INTO coupang_products (product_id, title, price, per_price, star, review_count, category_id)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
self.hook.run(query, parameters=(product_id, title, price, per_price, star, review_count, category_id))
self.logger.info(f"Product {title} inserted successfully.")
DatabaseHandler
를 바탕으로 데이터를 적재하는 부분만 바꾸어주었다.
class CoupangCrawler:
def __init__(self) -> None:
self.logger = setup_logging()
self.db_handler = DatabaseHandler()
self.db_handler.create_coupang_products()
def crawl(self, idx: int, **context) -> None:
url_list = context["task_instance"].xcom_pull(
key="url_list_" + idx
)
collection_datetime = datetime.datetime.now()
for url in url_list:
self.crawl_page(url[0], url[1], idx, collection_datetime, **context)
crawling_waiting_time()
def crawl_page(self, url: str, category_id: int, idx: int, collection_datetime: datetime, **context) -> None:
response = requests.get(url, headers=set_header())
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
try:
items = soup.find('ul', id='productList').find_all('li')
self.extract_items(items, category_id, collection_datetime)
except AttributeError as e:
error_info = {
"error_message": str(e),
"failed_url": url,
"index": idx,
"success": False,
"timestamp": collection_datetime
}
context["task_instance"].xcom_push(key="error_log_" + idx, value=error_info)
def extract_items(self, items: list[bs4.BeautifulSoup], category_id: int, collection_datetime: datetime) -> None:
product = {}
for item in items:
try:
product["product_id"] = item['data-product-id']
product["title"] = item.find('div', class_='name').text
product["price"] = item.find('strong', class_='price-value').text
product["star"] = item.find('em', class_='rating').text
product["per_price"] = item.find('span', class_='unit-price').text
product["review_count"] = item.find('span', class_='rating-total-count').text
product["category_id"] = category_id
product["collection_datetime"] = collection_datetime
self.db_handler.insert_product(product)
except Exception:
# 리뷰, 별점 등의 정보가 없는 경우
continue
Python의 pendulum
모듈을 사용하여 한국 시간대를 설정하였다. 그 외 필요한 정보들을 입력해준다.
(사실 개인 프로젝트이니 굳이 필요하지 않긴 하지만? 있으니 넣어주자.)
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from crawling.coupang import crawling
kst = pendulum.timezone("Asia/Seoul")
default_args = {
'owner': 'seyeon',
'depends_on_past': False,
'start_date': datetime(2024, 6, 28),
'email': ['choi20014830@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
from airflow.operators.python import PythonOperator
with DAG(
dag_id='coupang_crawling',
default_args=default_args,
start_date=datetime(2024, 6, 29, tzinfo=kst),
description='쿠팡 식료품 카테고리별 크롤링',
schedule_interval='@once',
tags=['test']
) as dag:
get_last_pages_task = PythonOperator(
task_id='get_last_pages',
python_callable=crawling.get_last_pages,
)
create_url_list = PythonOperator(
task_id='create_url_list',
python_callable=crawling.create_url_list,
)
coupang_crawling = PythonOperator(
task_id='coupang_crawling',
python_callable=crawling.crawl
)
get_last_pages_task >> create_url_list >> coupang_crawling
PythonOperator를 활용하여 기존에 설계한 크롤링 코드를 바탕으로 DAG를 작성하였다.
설계한 대로 마지막 페이지 확인 → URL 생성 → 크롤링(+ 원시데이터저장)하는 DAG를 완성하였다.
http://localhost:8080/ 에 접속하면 Airflow 모니터링 UI를 확인할 수 있다.
확인해보고 Task를 돌려본 결과, 정상적으로 작동했음을 알 수 있다.
여기서 문제점은 무려 실행시간이 58분이 걸린다는 사실이다.
또한, 만약 중간에 에러가 발생했을 때는 어떻게 대처를 해야하는 지에 대한 명시가 되어있지 않다. 그저 동작에만 취중한 DAG이다. 이를 개선하기 위한 여정은 다음 장에서 계속 됩니다..
Project LINK -> https://github.com/barabobBOB/PriceTracker