Airflow를 활용한 데이터 수집 스케줄링

최세연·2024년 7월 11일
0
post-thumbnail

개요

크롤링을 한사이트에서만 하는게 아닌 여러 사이트에서 데이터를 수집할 예정이다. 할 수 있는 한도 내에서 최대한 데이터 정합성을 맞추기 위해 Airflow를 통해 전체적인 작업을 스케줄링하고자 한다.

DAG 설계

전체적인 설계는 다음과 같다.

수집할 쿠팡 카테고리 아이디를 미리 수집해두고 해당 카테고리의 마지막 페이지가 몇 페이지인지 확인한다. 확인한 페이지를 바탕으로 URL를 생성하여 크롤링을 진행하고 원시데이터는 PostgresSQL에 적재한다.

Docker 컨테이너 구축

Airflow를 로컬에서 간편하게 구동시키기 위하여 Docker로 필요한 컨테이너를 먼저 구축했다.

각 컨테이너의 역할은 다음과 같다.

airflow-cli

  • Airflow CLI를 실행하여 Airflow 명령을 실행할 수 있는 서비스이다.

airflow init

  • Airflow의 초기화 명령어로, Airflow 환경을 설정하는 데 사용된다.
  • 데이터베이스를 초기화하고, Airflow의 기본 설정 파일을 생성한다.

airflow worker

  • 실제로 작업(task)을 실행하는 데 사용된다.

airflow scheduler

  • 스케줄러는 DAG 파일을 주기적으로 읽고 작업의 종속성을 확인하며 실행할 준비가 된 태스크를 큐에 넣는다.

airflow triggerer

  • 특정 조건이나 이벤트에 따라 태스크를 트리거하는 역할을 한다.

airflow webserver

  • 사용자 인터페이스(UI)를 제공하며, 이를 통해 사용자는 DAGs를 관리하고 모니터링할 수 있다.

redis

  • Airflow에서 CeleryExecutor를 사용하기 위한 메시지 브로커 역할을 수행한다.

코드는 다음 링크를 참고하길 바란다.

https://github.com/barabobBOB/PriceTracker/blob/%234/docker-compose.yml

코드 수정

이제 Airflow로 Job를 나눠야 하기 때문에 앞전에서 작성한 코드를 수정해보자.

def check_last_page(category_id: int) -> int:
    response = requests.get(construct_url(category_id, 1), headers=set_header())
    crawling_waiting_time()
    response.raise_for_status()
    soup = BeautifulSoup(response.text, 'html.parser')
    page = soup.find('div', class_='product-list-paging')
    return int(page['data-total'])

def get_last_pages(categories_id: int, idx: int, **context) -> None:
    last_pages = [check_last_page(category_id) for category_id in categories_id]
    context["task_instance"].xcom_push(key="last_pages_" + idx, value=last_pages)

def create_url_list(categories_id: list[int], idx: int, **context) -> None:
    last_pages = context["task_instance"].xcom_pull(
        key="last_pages_" + idx
    )
    url_list = [
        [construct_url(category_id, page), category_id]
        for category_id, last_page in zip(categories_id, last_pages)
        for page in range(1, last_page + 1)
    ]
    context["task_instance"].xcom_push(key="url_list_" + idx, value=url_list)

xcom_push를 활용해서 이전 작업들의 결과값을 저장하고 이를 다음 작업에서 꺼내어 사용할 수 있도록 구현하였다. get_last_pages 로 마지막 페이지가 몇 페이지인지 찾고 create_url_list 를 통해서 URL를 만든다.

크롤링한 원시데이터를 저장하기 위해서 데이터베이스 핸들러를 작성하였다. 특히, airflow.providers에서 PostgresHook을 활용해서 간단하게 쿼리문을 작성해봤다. 데이터베이스 연결과 커밋 등 손수 해줘야 하는 작업이 많지만 PostgresHook을 활용하면 간편하게 데이터를 적재할 수 있다.

import logging

from airflow.providers.postgres.hooks.postgres import PostgresHook

class DatabaseHandler:
    def __init__(self):
        self.hook = PostgresHook(postgres_conn_id='postgres_conn')
        self.logger = logging.getLogger(self.__class__.__name__)

    def create_coupang_products(self):
        query = """
        CREATE TABLE IF NOT EXISTS coupang_products (
            id SERIAL PRIMARY KEY,
            product_id BIGINT,
            title TEXT,
            price TEXT,
            per_price TEXT,
            star FLOAT,
            review_count TEXT,
            category_id BIGINT
        )
        """
        self.hook.run(query)
        self.logger.info("Table coupang_products created successfully.")

    def insert_product(self, product_id, title, price, per_price, star, review_count, category_id):
        query = """
        INSERT INTO coupang_products (product_id, title, price, per_price, star, review_count, category_id)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
        """
        self.hook.run(query, parameters=(product_id, title, price, per_price, star, review_count, category_id))
        self.logger.info(f"Product {title} inserted successfully.")

DatabaseHandler 를 바탕으로 데이터를 적재하는 부분만 바꾸어주었다.

class CoupangCrawler:
    def __init__(self) -> None:
        self.logger = setup_logging()
        self.db_handler = DatabaseHandler()
        self.db_handler.create_coupang_products()

    def crawl(self, idx: int, **context) -> None:
        url_list = context["task_instance"].xcom_pull(
            key="url_list_" + idx
        )
        collection_datetime = datetime.datetime.now()
        for url in url_list:
            self.crawl_page(url[0], url[1], idx, collection_datetime, **context)
            crawling_waiting_time()

    def crawl_page(self, url: str, category_id: int, idx: int, collection_datetime: datetime, **context) -> None:
        response = requests.get(url, headers=set_header())
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')
        try:
            items = soup.find('ul', id='productList').find_all('li')
            self.extract_items(items, category_id, collection_datetime)

        except AttributeError as e:
            error_info = {
                "error_message": str(e),
                "failed_url": url,
                "index": idx,
                "success": False,
                "timestamp": collection_datetime
            }
            context["task_instance"].xcom_push(key="error_log_" + idx, value=error_info)

    def extract_items(self, items: list[bs4.BeautifulSoup], category_id: int, collection_datetime: datetime) -> None:
        product = {}
        for item in items:
            try:
                product["product_id"] = item['data-product-id']
                product["title"] = item.find('div', class_='name').text
                product["price"] = item.find('strong', class_='price-value').text
                product["star"] = item.find('em', class_='rating').text
                product["per_price"] = item.find('span', class_='unit-price').text
                product["review_count"] = item.find('span', class_='rating-total-count').text
                product["category_id"] = category_id
                product["collection_datetime"] = collection_datetime
                self.db_handler.insert_product(product)

            except Exception:
                # 리뷰, 별점 등의 정보가 없는 경우
                continue

DAG

Python의 pendulum 모듈을 사용하여 한국 시간대를 설정하였다. 그 외 필요한 정보들을 입력해준다.

(사실 개인 프로젝트이니 굳이 필요하지 않긴 하지만? 있으니 넣어주자.)

import pendulum

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from crawling.coupang import crawling

kst = pendulum.timezone("Asia/Seoul")

default_args = {
    'owner': 'seyeon',
    'depends_on_past': False,
    'start_date': datetime(2024, 6, 28),
    'email': ['choi20014830@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}
from airflow.operators.python import PythonOperator

with DAG(
        dag_id='coupang_crawling',
        default_args=default_args,
        start_date=datetime(2024, 6, 29, tzinfo=kst),
        description='쿠팡 식료품 카테고리별 크롤링',
        schedule_interval='@once',
        tags=['test']
) as dag:
    get_last_pages_task = PythonOperator(
        task_id='get_last_pages',
        python_callable=crawling.get_last_pages,
    )

    create_url_list = PythonOperator(
        task_id='create_url_list',
        python_callable=crawling.create_url_list,
    )

    coupang_crawling = PythonOperator(
        task_id='coupang_crawling',
        python_callable=crawling.crawl
    )

    get_last_pages_task >> create_url_list >> coupang_crawling

PythonOperator를 활용하여 기존에 설계한 크롤링 코드를 바탕으로 DAG를 작성하였다.

설계한 대로 마지막 페이지 확인 → URL 생성 → 크롤링(+ 원시데이터저장)하는 DAG를 완성하였다.

http://localhost:8080/ 에 접속하면 Airflow 모니터링 UI를 확인할 수 있다.

확인해보고 Task를 돌려본 결과, 정상적으로 작동했음을 알 수 있다.

문제점

여기서 문제점은 무려 실행시간이 58분이 걸린다는 사실이다.

또한, 만약 중간에 에러가 발생했을 때는 어떻게 대처를 해야하는 지에 대한 명시가 되어있지 않다. 그저 동작에만 취중한 DAG이다. 이를 개선하기 위한 여정은 다음 장에서 계속 됩니다..

Project LINK -> https://github.com/barabobBOB/PriceTracker

profile
오물쪼물 코딩생활 ๑•‿•๑

0개의 댓글

관련 채용 정보