[Airflow] 주식 데이터 크롤링 파이프라인 구현하기 (1)

joowonseo·2025년 2월 23일
0
post-thumbnail

⏰ 온프레미스 환경에서 매일 정기적으로 전일의 정보를 Airflow를 통해 DB에 적재하는 과정을 기술한 글입니다.

Airflow?

Apache Airflow는 워크플로우(workflow) 자동화를 위한 오픈소스 플랫폼으로, 데이터 파이프라인을 효율적으로 관리하고 조율할 수 있도록 도와줍니다.

주요 특징

1. DAG(Directed Acyclic Graph) 기반 워크플로우 관리
Airflow는 작업을 DAG(방향성 비순환 그래프) 형태로 정의하여, 태스크(task) 간의 의존성을 명확하게 설정할 수 있습니다. DAG를 활용하면 복잡한 데이터 흐름을 논리적으로 구성하고, 실행 순서를 쉽게 조정할 수 있습니다.

2. 동적 워크플로우 작성
Airflow의 워크플로우는 단순한 JSON 또는 YAML 파일이 아니라, Python 코드로 정의됩니다. 이를 통해 조건문, 반복문, 변수 등을 활용하여 동적인 파이프라인을 쉽게 구성할 수 있습니다.

3. 다양한 스케줄링 및 실행 옵션
Airflow는 특정 주기(Cron 표현식 기반)나 이벤트 트리거를 활용하여 워크플로우를 자동으로 실행할 수 있습니다. 또한, 태스크 실행 시 병렬 처리 및 재시도 기능을 지원하여 안정성을 높입니다.

4. 강력한 모니터링 및 로깅 기능
Airflow는 웹 기반 UI를 제공하여 DAG 및 태스크 실행 상태를 직관적으로 모니터링할 수 있습니다. 또한, 로그를 저장하고 분석할 수 있는 기능을 갖추고 있어 디버깅이 용이합니다.

5. 확장성과 유연성
Airflow는 Celery Executor, Kubernetes Executor 등을 활용하여 확장 가능하며, AWS, GCP, Azure 등의 클라우드 환경과도 쉽게 통합할 수 있습니다. 또한, API 및 플러그인 시스템을 제공하여 다양한 시스템과 연동이 가능합니다.


🤔 Airflow를 사용한 이유

  • airflow 워크플로우 및 스케줄링을 구현하여 주식 기본 정보 크롤링
  • open api로 주식에 대한 기본 정보를 받아오는데에는 한계가 있기 때문에 KOSPI / KOSDAQ 주식에 대한 기본 정보 수집

Airflow 주식 데이터 크롤링 파이프라인 구현하기

한국 주식 공공데이터 라이브러리인 pykrx를 통해 데이터 수집

  • ticker 정보 조회
  • 일자별 OHLCV 조회
  • 시가 총액 조회
  • 일자별 DIV/BPS/PER/EPS 조회

Airflow 설치 및 환경 설정

# 가상 환경 활성화
source venv/bin/activate

# Airflow 설치
AIRFLOW_VERSION=2.10.3

PYTHON_VERSION="$(python -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')"

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# 데이터베이스 초기화
airflow db init

# 관리자 사용자 생성
airflow users create \
    --username joowojr \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email codepingkea@gmail.com \
    --password codeping1!

디렉토리 구조

── dags
│   └── stock
│       ├── __init__.py
│       └── stock_data_dag.py
└── modules
    └── stock
        ├── __init__.py
        └── stock_data_collection.py
export PYTHONPATH=$PYTHONPATH:/home/codeping/FLEX-BE-airflow/airflow/dags

설정 파일 편집

홈 디렉토리에서 실행

sudo nano ~/airflow/airflow.cfg
# dag 위치 설정
export AIRFLOW_HOME=

# 시간 설정
default_timezone = Asia/Seoul

# 예제 Dag 비활성화
load_examples = False

DAG 파일 생성

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pendulum
import sys
import os

sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))

from modules.stock.stock_data_collection import (
    get_ticker_list,
    collect_ohlcv_data,
    collect_market_cap_data,
    collect_fundamental_data,
)
from modules.stock.database_connection import load_csv_to_mysql

local_tz = pendulum.timezone("Asia/Seoul")
now = pendulum.now("Asia/Seoul") 
today_date1 = now.strftime('%Y%m%d')
start_date = pendulum.datetime(2024, 11, 1, tz="Asia/Seoul")

default_args = {
    'owner': 'airflow',
    'start_date': start_date, 
    'retries': 0,
    'catchup': False
}

def get_tickers_and_return(**kwargs):
    kor_ticker_list_df = get_ticker_list()
    kor_ticker_list = kor_ticker_list_df['stockcode'].tolist()
    kwargs['ti'].xcom_push(key='kor_ticker_list', value=kor_ticker_list)
    return kor_ticker_list

def collect_ohlcv_with_tickers(**kwargs):
    kor_ticker_list = kwargs['ti'].xcom_pull(key='kor_ticker_list', task_ids='get_ticker_list')
    collect_ohlcv_data(kor_ticker_list)

def collect_market_cap_with_tickers(**kwargs):
    kor_ticker_list = kwargs['ti'].xcom_pull(key='kor_ticker_list', task_ids='get_ticker_list')
    collect_market_cap_data(kor_ticker_list)

def collect_fundamental_with_tickers(**kwargs):
    kor_ticker_list = kwargs['ti'].xcom_pull(key='kor_ticker_list', task_ids='get_ticker_list')
    collect_fundamental_data(kor_ticker_list)

with DAG(
    dag_id='data_collection_and_loading_dag',
    default_args=default_args,
    schedule_interval='0 6 * * *',  
    catchup=False,
    tags=['pykrx'],
) as dag:

    get_tickers_task = PythonOperator(
        task_id='get_ticker_list',
        python_callable=get_tickers_and_return,
        provide_context=True,
        dag=dag,
    )

    collect_ohlcv_task = PythonOperator(
        task_id='collect_ohlcv_data',
        python_callable=collect_ohlcv_with_tickers,
        op_kwargs={'start_date': start_date, 'today_date1': today_date1},
        provide_context=True,
        dag=dag,
    )

    collect_market_cap_task = PythonOperator(
        task_id='collect_market_cap_data',
        python_callable=collect_market_cap_with_tickers,
        op_kwargs={'start_date': start_date, 'today_date1': today_date1},
        provide_context=True,
        dag=dag,
    )

    collect_fundamental_task = PythonOperator(
        task_id='collect_fundamental_data',
        python_callable=collect_fundamental_with_tickers,
        op_kwargs={'start_date': start_date, 'today_date1': today_date1},
        provide_context=True,
        dag=dag,
    )

    load_to_mysql_task = PythonOperator(
        task_id='load_csv_to_mysql',
        python_callable=load_csv_to_mysql,
        op_kwargs={'start_date': start_date, 'today_date1': today_date1},
        provide_context=True,
        dag=dag,
    )

    get_tickers_task >> [collect_ohlcv_task, collect_market_cap_task, collect_fundamental_task] >> load_to_mysql_task

requirements 편집

pip freeze > requirements.txt
pip install -r requirements.txt

Airflow 웹 서버 및 스케줄러 시작

airflow webserver -p 8080 &
airflow scheduler

# 백그라운드 실행
nohup airflow standalone > airflow.log 2>&1 &

nohup airflow webserver > /home/codeping/airflow/logs/webserver.log 2>&1 &
nohup airflow scheduler > /home/codeping/airflow/logs/scheduler.log 2>&1 &

Airflow web dashboard

📔 참고 자료
https://unfinishedgod.netlify.app/2023/07/29/airflow-1/

profile
백엔드 개발자 ˚₊✩‧₊ ໒꒱

0개의 댓글