
⏰ 온프레미스 환경에서 매일 정기적으로 전일의 정보를 Airflow를 통해 DB에 적재하는 과정을 기술한 글입니다.
Apache Airflow는 워크플로우(workflow) 자동화를 위한 오픈소스 플랫폼으로, 데이터 파이프라인을 효율적으로 관리하고 조율할 수 있도록 도와줍니다.
1. DAG(Directed Acyclic Graph) 기반 워크플로우 관리
Airflow는 작업을 DAG(방향성 비순환 그래프) 형태로 정의하여, 태스크(task) 간의 의존성을 명확하게 설정할 수 있습니다. DAG를 활용하면 복잡한 데이터 흐름을 논리적으로 구성하고, 실행 순서를 쉽게 조정할 수 있습니다.
2. 동적 워크플로우 작성
Airflow의 워크플로우는 단순한 JSON 또는 YAML 파일이 아니라, Python 코드로 정의됩니다. 이를 통해 조건문, 반복문, 변수 등을 활용하여 동적인 파이프라인을 쉽게 구성할 수 있습니다.
3. 다양한 스케줄링 및 실행 옵션
Airflow는 특정 주기(Cron 표현식 기반)나 이벤트 트리거를 활용하여 워크플로우를 자동으로 실행할 수 있습니다. 또한, 태스크 실행 시 병렬 처리 및 재시도 기능을 지원하여 안정성을 높입니다.
4. 강력한 모니터링 및 로깅 기능
Airflow는 웹 기반 UI를 제공하여 DAG 및 태스크 실행 상태를 직관적으로 모니터링할 수 있습니다. 또한, 로그를 저장하고 분석할 수 있는 기능을 갖추고 있어 디버깅이 용이합니다.
5. 확장성과 유연성
Airflow는 Celery Executor, Kubernetes Executor 등을 활용하여 확장 가능하며, AWS, GCP, Azure 등의 클라우드 환경과도 쉽게 통합할 수 있습니다. 또한, API 및 플러그인 시스템을 제공하여 다양한 시스템과 연동이 가능합니다.
한국 주식 공공데이터 라이브러리인 pykrx를 통해 데이터 수집
# 가상 환경 활성화
source venv/bin/activate
# Airflow 설치
AIRFLOW_VERSION=2.10.3
PYTHON_VERSION="$(python -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
# 데이터베이스 초기화
airflow db init
# 관리자 사용자 생성
airflow users create \
--username joowojr \
--firstname Admin \
--lastname User \
--role Admin \
--email codepingkea@gmail.com \
--password codeping1!
── dags
│ └── stock
│ ├── __init__.py
│ └── stock_data_dag.py
└── modules
└── stock
├── __init__.py
└── stock_data_collection.py
export PYTHONPATH=$PYTHONPATH:/home/codeping/FLEX-BE-airflow/airflow/dags
홈 디렉토리에서 실행
sudo nano ~/airflow/airflow.cfg
# dag 위치 설정
export AIRFLOW_HOME=
# 시간 설정
default_timezone = Asia/Seoul
# 예제 Dag 비활성화
load_examples = False

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pendulum
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
from modules.stock.stock_data_collection import (
get_ticker_list,
collect_ohlcv_data,
collect_market_cap_data,
collect_fundamental_data,
)
from modules.stock.database_connection import load_csv_to_mysql
local_tz = pendulum.timezone("Asia/Seoul")
now = pendulum.now("Asia/Seoul")
today_date1 = now.strftime('%Y%m%d')
start_date = pendulum.datetime(2024, 11, 1, tz="Asia/Seoul")
default_args = {
'owner': 'airflow',
'start_date': start_date,
'retries': 0,
'catchup': False
}
def get_tickers_and_return(**kwargs):
kor_ticker_list_df = get_ticker_list()
kor_ticker_list = kor_ticker_list_df['stockcode'].tolist()
kwargs['ti'].xcom_push(key='kor_ticker_list', value=kor_ticker_list)
return kor_ticker_list
def collect_ohlcv_with_tickers(**kwargs):
kor_ticker_list = kwargs['ti'].xcom_pull(key='kor_ticker_list', task_ids='get_ticker_list')
collect_ohlcv_data(kor_ticker_list)
def collect_market_cap_with_tickers(**kwargs):
kor_ticker_list = kwargs['ti'].xcom_pull(key='kor_ticker_list', task_ids='get_ticker_list')
collect_market_cap_data(kor_ticker_list)
def collect_fundamental_with_tickers(**kwargs):
kor_ticker_list = kwargs['ti'].xcom_pull(key='kor_ticker_list', task_ids='get_ticker_list')
collect_fundamental_data(kor_ticker_list)
with DAG(
dag_id='data_collection_and_loading_dag',
default_args=default_args,
schedule_interval='0 6 * * *',
catchup=False,
tags=['pykrx'],
) as dag:
get_tickers_task = PythonOperator(
task_id='get_ticker_list',
python_callable=get_tickers_and_return,
provide_context=True,
dag=dag,
)
collect_ohlcv_task = PythonOperator(
task_id='collect_ohlcv_data',
python_callable=collect_ohlcv_with_tickers,
op_kwargs={'start_date': start_date, 'today_date1': today_date1},
provide_context=True,
dag=dag,
)
collect_market_cap_task = PythonOperator(
task_id='collect_market_cap_data',
python_callable=collect_market_cap_with_tickers,
op_kwargs={'start_date': start_date, 'today_date1': today_date1},
provide_context=True,
dag=dag,
)
collect_fundamental_task = PythonOperator(
task_id='collect_fundamental_data',
python_callable=collect_fundamental_with_tickers,
op_kwargs={'start_date': start_date, 'today_date1': today_date1},
provide_context=True,
dag=dag,
)
load_to_mysql_task = PythonOperator(
task_id='load_csv_to_mysql',
python_callable=load_csv_to_mysql,
op_kwargs={'start_date': start_date, 'today_date1': today_date1},
provide_context=True,
dag=dag,
)
get_tickers_task >> [collect_ohlcv_task, collect_market_cap_task, collect_fundamental_task] >> load_to_mysql_task
requirements 편집
pip freeze > requirements.txt
pip install -r requirements.txt
airflow webserver -p 8080 &
airflow scheduler
# 백그라운드 실행
nohup airflow standalone > airflow.log 2>&1 &
nohup airflow webserver > /home/codeping/airflow/logs/webserver.log 2>&1 &
nohup airflow scheduler > /home/codeping/airflow/logs/scheduler.log 2>&1 &

📔 참고 자료
https://unfinishedgod.netlify.app/2023/07/29/airflow-1/