머신러닝 예제 작성

박지은·2023년 6월 24일

Airflow Tutorial

목록 보기
3/4
post-thumbnail

이번 포스팅 부터는 Titanic 데이터셋을 RandomforestClassifier을 이용해 생존자 예측을 진행하는 전반적인 파이프라인을 구축하려고 합니다.

타이타닉 생존자 예측 코드는 https://github.com/lsjsj92/airflow_tutorial 의 코드를 따라하였습니다.

파일은 config.py, dataIO.py, model.py, preprocess.py, titanic.py로 구성되어 있으며, titanic.py에서 전처리 및 예측 전반적 과정이 진행됩니다.

Titanic.py


import pandas as pd

from titanic.preprocess import TitanicPreprocess
from titanic.config import PathConfig
from titanic.dataio import DataIOStream
from titanic.model import TitanicModeling

class TitanicMain(TitanicPreprocess, PathConfig, TitanicModeling, DataIOStream):
    def __init__(self):
        TitanicPreprocess.__init__(self)
        PathConfig.__init__(self)
        TitanicModeling.__init__(self)
        DataIOStream.__init__(self)

    def prepro_data(self, f_name, **kwargs):
        """
        Perform data preprocessing and save the preprocessed data.

        Parameters:
            f_name (str): The name of the input data file.
            **kwargs: Additional keyword arguments.

        Returns:
            str: A message indicating the end of the preprocessing.
        """
        data = self.get_data(self.titanic_path, f_name)
        data = self.run_preprocessing(data)
        data.to_csv(f"{self.titanic_path}/prepro_titanic.csv", index=False)
        kwargs['task_instance'].xcom_push(key='prepro_csv', value=f"{self.titanic_path}/prepro_titanic")
        return "end prepro"
    
    def run_modeling(self, n_estimator, flag, **kwargs):
        """
        Run the modeling process using the preprocessed data.

        Parameters:
            n_estimator (int): The number of estimators for the random forest model.
            flag (bool): A flag indicating whether to load preprocessed data from a file.
            **kwargs: Additional keyword arguments.

        Returns:
            str: A message indicating the end of the modeling process.
        """
        f_name = kwargs["task_instance"].xcom_pull(key='prepro_csv')
        data = self.get_data(self.titanic_path, f_name, flag)
        X, y = self.get_X_y(data)
        model_info = self.run_sklearn_modeling(X, y, n_estimator)
        kwargs['task_instance'].xcom_push(key='result_msg', value=model_info)
        return "end modeling" 

1. 데이터 전처리
타이타닉 데이터가 저장된 위치로부터 데이터를 불러오고, 전처리 과정을 실행합니다.
그 후, 전처리된 데이터를 따로 저장합니다.

2. 모델링 진행
전처리 된 데이터를 이용하여 예측을 진행합니다.

여기에서 작업 사이에 데이터를 공유할 수 있도록 XComs가 이용되었습니다.

XComs란?

기본적으로 Task는 완전히 격리되어 실행될 수 있는데, 이때 Task끼리 서로 통신할 수 있도록 하는 매커니즘 입니다.
task_id와 dag_id로 식별되며, 작은 데이터를 전달하는데 이용됩니다.

xcom_push: 스토리지에 지정한 데이터를 저장합니다.
xcom_pull: 스토리지에 저장된 데이터를 불러옵니다. 저장된 데이터는 key로 구분합니다.

Reference

[1] https://lsjsj92.tistory.com/633
[2] https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html

profile
Today I learned...

0개의 댓글