
이번 포스팅 부터는 Titanic 데이터셋을 RandomforestClassifier을 이용해 생존자 예측을 진행하는 전반적인 파이프라인을 구축하려고 합니다.
타이타닉 생존자 예측 코드는 https://github.com/lsjsj92/airflow_tutorial 의 코드를 따라하였습니다.
파일은 config.py, dataIO.py, model.py, preprocess.py, titanic.py로 구성되어 있으며, titanic.py에서 전처리 및 예측 전반적 과정이 진행됩니다.
Titanic.py
import pandas as pd
from titanic.preprocess import TitanicPreprocess
from titanic.config import PathConfig
from titanic.dataio import DataIOStream
from titanic.model import TitanicModeling
class TitanicMain(TitanicPreprocess, PathConfig, TitanicModeling, DataIOStream):
def __init__(self):
TitanicPreprocess.__init__(self)
PathConfig.__init__(self)
TitanicModeling.__init__(self)
DataIOStream.__init__(self)
def prepro_data(self, f_name, **kwargs):
"""
Perform data preprocessing and save the preprocessed data.
Parameters:
f_name (str): The name of the input data file.
**kwargs: Additional keyword arguments.
Returns:
str: A message indicating the end of the preprocessing.
"""
data = self.get_data(self.titanic_path, f_name)
data = self.run_preprocessing(data)
data.to_csv(f"{self.titanic_path}/prepro_titanic.csv", index=False)
kwargs['task_instance'].xcom_push(key='prepro_csv', value=f"{self.titanic_path}/prepro_titanic")
return "end prepro"
def run_modeling(self, n_estimator, flag, **kwargs):
"""
Run the modeling process using the preprocessed data.
Parameters:
n_estimator (int): The number of estimators for the random forest model.
flag (bool): A flag indicating whether to load preprocessed data from a file.
**kwargs: Additional keyword arguments.
Returns:
str: A message indicating the end of the modeling process.
"""
f_name = kwargs["task_instance"].xcom_pull(key='prepro_csv')
data = self.get_data(self.titanic_path, f_name, flag)
X, y = self.get_X_y(data)
model_info = self.run_sklearn_modeling(X, y, n_estimator)
kwargs['task_instance'].xcom_push(key='result_msg', value=model_info)
return "end modeling"
1. 데이터 전처리
타이타닉 데이터가 저장된 위치로부터 데이터를 불러오고, 전처리 과정을 실행합니다.
그 후, 전처리된 데이터를 따로 저장합니다.
2. 모델링 진행
전처리 된 데이터를 이용하여 예측을 진행합니다.
여기에서 작업 사이에 데이터를 공유할 수 있도록 XComs가 이용되었습니다.
기본적으로 Task는 완전히 격리되어 실행될 수 있는데, 이때 Task끼리 서로 통신할 수 있도록 하는 매커니즘 입니다.
task_id와 dag_id로 식별되며, 작은 데이터를 전달하는데 이용됩니다.
xcom_push: 스토리지에 지정한 데이터를 저장합니다.
xcom_pull: 스토리지에 저장된 데이터를 불러옵니다. 저장된 데이터는 key로 구분합니다.
[1] https://lsjsj92.tistory.com/633
[2] https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html