250127 TIL #600 AI Tech #132 P:데이터셋 전처리

김춘복·2025년 1월 27일
0

TIL : Today I Learned

목록 보기
602/627

Today I Learned

오늘은 데이터셋 전처리 코드를 클래스화 해서 작성했다.


AbstractPreProcessor

추상화 클래스. 데이터셋이 달라질 수 있으므로 기본 추상 클래스를 선 정의.

  • get_data() 함수를 사용하면 파일이 있으면 불러오고, 없으면 전처리 후 저장하는 로직

  • _pre_process만 @abstractmethod으로 처리해서 반드시 자식클래스에서 구현하게 처리했다.

import os
from abc import ABC, abstractmethod

import pandas as pd


class AbstractPreProcessor(ABC):
    def __init__(self, dataset: str, data_path: str, export_path: str):
        self.dataset = dataset
        self.data_path = data_path
        self.export_path = export_path
        self.export_dfs: dict[str, pd.DataFrame] = {}

    @abstractmethod
    def _pre_process(self) -> None:
        raise NotImplementedError("Not implemented pre_process method")

    def get_data(self) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """
        전처리된 데이터를 로드하거나 전처리 후 저장하는 메서드.
        """
        required_files = {
            "items": os.path.join(self.export_path, self.dataset, "items.csv"),
            "train": os.path.join(self.export_path, self.dataset, "train.csv"),
            "valid": os.path.join(self.export_path, self.dataset, "valid.csv"),
            "test": os.path.join(self.export_path, self.dataset, "test.csv"),
        }

        if all(os.path.exists(f) for f in required_files.values()):
            return self._load_data(required_files)
        return self._process_data()

    def _load_data(
        self, required_files: dict
    ) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """
        저장된 데이터 파일들을 로드하는 메서드
        """
        print("Loading items, train, valid, test datasets from saved files...")
        try:
            items = pd.read_csv(required_files["items"])
            train = pd.read_csv(required_files["train"])
            valid = pd.read_csv(required_files["valid"])
            test = pd.read_csv(required_files["test"])
            return items, train, valid, test
        except Exception as e:
            print(f"Error loading files: {e}")
            raise

    def _process_data(
        self,
    ) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """
        데이터를 전처리하고 저장하는 메서드
        """
        print("Processed data not found. Running pre_process...")
        self._pre_process()
        self._save_data()

        items = self.export_dfs.get("items")
        train = self.export_dfs.get("train")
        valid = self.export_dfs.get("valid")
        test = self.export_dfs.get("test")

        if any(df is None for df in [items, train, valid, test]):
            raise ValueError("Missing required DataFrame after preprocessing")

        return items, train, valid, test

    def _save_data(self) -> None:
        """
        전처리된 데이터를 파일로 저장하는 메서드.
        """
        os.makedirs(self.export_path, exist_ok=True)
        os.makedirs(os.path.join(self.export_path, self.dataset), exist_ok=True)

        for key, df in self.export_dfs.items():
            file_path = os.path.join(self.export_path, self.dataset, f"{key}.csv")
            print(f"Saving {key} to {file_path}")
            print(f"{key} column list is {df.columns} - shape({df.shape})")
            df.to_csv(file_path, index=False)

MovieLensPreProcessor

위의 추상클래스를 구현한 클래스. MovieLens 20m을 전처리

  • _pre_process를 상속해서 작성.

  • 평점이 4점 이상인 interaction만 positive.
    나머지 interaction은 get_user_interactions()로 가능.

  • leave-one-out 방식으로 train/valid/test 분할

import os
from collections import defaultdict

import numpy as np
import pandas as pd
from tqdm import tqdm

from .abstract_preprocessor import AbstractPreProcessor


class MovieLensPreProcessor(AbstractPreProcessor):
    def __init__(
        self,
        dataset: str,
        data_path: str,
        export_path: str,
    ):
        super().__init__(dataset, data_path, export_path)
        self.data = {
            "items": pd.read_csv(os.path.join(data_path, "movies.csv")),
            "ratings": pd.read_csv(os.path.join(data_path, "ratings.csv")),
            # "tags": pd.read_csv(os.path.join(data_path, "tags.csv")),
        }

    def get_user_interactions(self) -> defaultdict:
        """
        유저별 상호작용 아이템을 반환하는 함수
        """
        ratings = self.data["ratings"]
        user_interactions = defaultdict(list)
        for user_id, item_id in zip(ratings["userId"], ratings["movieId"]):
            user_interactions[user_id].append(item_id)

        return user_interactions

    def _pre_process(self) -> None:
        items = self.data["items"]
        ratings = self.data["ratings"]

        # items 전처리
        items = self._clean_movie_titles_and_years(items)
        # items = self._preprocess_genres(items)

        # ratings 전처리
        ratings.rename(
            columns={
                "movieId": "item_id",
                "userId": "user_id",
                "rating": "rating",
                "timestamp": "timestamp",
            },
            inplace=True,
        )

        # 유저 리뷰 수 기준 이상치 제거 및 train-test split
        ratings = self._filter_users_with_interactions(ratings)
        train, valid, test = self._split_train_valid_test(ratings)

        # 영화 일정 리뷰 수 이하 제거 (추후 구현 필요)

        # export_dfs에 전처리된 데이터 저장
        self.export_dfs = {
            "items": items,
            "train": train,
            "valid": valid,
            "test": test,
        }

    def _preprocess_genres(self, items: pd.DataFrame) -> pd.DataFrame:
        """
        추후 장르 원핫인코딩 or 멀티 인코딩 코드 상의 후 작성
        """
        return items

    def _clean_movie_titles_and_years(self, items) -> pd.DataFrame:
        """
        items에서 year 생성 및 결측치 처리
        title에서 year 제거
        movieId를 item_id로 변경
        """

        # items의 title에서 연도 제거
        items["year"] = items["title"].str.extract(r"\((\d{4})\)")
        # 제목에서 연도 부분 제거
        items["title"] = items["title"].str.replace(r" \(\d{4}\)", "", regex=True)
        # year 컬럼을 정수형으로 변환
        items["year"] = items["year"].fillna(0).astype(int)

        year_json = {
            "40697": 1993,
            "79607": 1970,
            "87442": 2010,
            "107434": 2009,
            "108548": 2007,
            "108583": 1975,
            "112406": 2019,
            "113190": 2021,
            "115133": 1996,
            "115685": 2011,
            "125571": 1990,
            "125632": 2002,
            "125958": 2008,
            "126438": 2013,
            "126929": 2014,
            "127005": 1991,
            "128612": 2015,
            "128734": 2014,
            "129651": 2010,
            "129705": 2014,
            "129887": 2003,
            "130454": 1993,
        }

        # year_json의 key와 value를 items 데이터프레임에 반영
        for key, value in year_json.items():
            key = int(key)
            items.loc[items["movieId"] == int(key), "year"] = value

        items.rename(columns={"movieId": "item_id"}, inplace=True)
        return items

    def _filter_users_with_interactions(self, ratings: pd.DataFrame) -> pd.DataFrame:
        """
        사용자별 상호작용 데이터를 필터링하고 interaction 컬럼을 생성
        """
        # IQR 우선 계산
        user_rating_counts = (
            ratings.groupby("user_id").size().reset_index(name="rating_count")
        )

        Q1 = user_rating_counts["rating_count"].quantile(0.25)
        Q3 = user_rating_counts["rating_count"].quantile(0.75)
        IQR = Q3 - Q1
        upper_fence = Q3 + 1.5 * IQR

        # 4.0 이상 평점이 2개 이하인 유저 필터링 (valid와 test, train에 하나씩 필요)
        user_high_ratings = (
            ratings[ratings["rating"] >= 4.0]
            .groupby("user_id")
            .size()
            .reset_index(name="high_rating_count")
        )
        valid_users = user_high_ratings[user_high_ratings["high_rating_count"] > 2][
            "user_id"
        ]
        filtered_ratings = ratings[ratings["user_id"].isin(valid_users)].copy()

        # IQR 기반 이상치 제거
        valid_users = user_rating_counts[
            user_rating_counts["rating_count"] <= upper_fence
        ]["user_id"]
        filtered_ratings = filtered_ratings[
            filtered_ratings["user_id"].isin(valid_users)
        ]

        # interaction 컬럼 생성
        filtered_ratings.loc[:, "interaction"] = (
            filtered_ratings["rating"] >= 4.0
        ).astype(int)

        return filtered_ratings

    def _split_train_valid_test(
        self, ratings
    ) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        train_list, valid_list, test_list = [], [], []

        # user_id 기준 그룹화
        grouped = ratings.groupby("user_id", group_keys=False)

        for _, group in tqdm(grouped, desc="Splitting train/valid/test"):
            # positive,negative 상호작용 분리
            pos_mask = group["interaction"] == 1
            pos_interactions = group[pos_mask]

            # 상위 2개 timestamp 추출
            top2 = pos_interactions.nlargest(2, "timestamp")

            # Test/Valid 분리
            test_data = top2.iloc[[0]] if len(top2) >= 1 else None  # 최신 1개
            valid_data = top2.iloc[[1]] if len(top2) >= 2 else None  # 차순위 1개

            # Train 데이터 구성
            ## 남은 positive: 전체 positive에서 top2 제외
            train_data = pos_interactions.drop(top2.index, errors="ignore")

            # 데이터 추가
            if test_data is not None:
                test_list.append(test_data)
            if valid_data is not None:
                valid_list.append(valid_data)
            if not train_data.empty:
                train_list.append(train_data)

        # DataFrame 병합
        train = (
            pd.concat(train_list, ignore_index=True) if train_list else pd.DataFrame()
        )
        valid = (
            pd.concat(valid_list, ignore_index=True) if valid_list else pd.DataFrame()
        )
        test = pd.concat(test_list, ignore_index=True) if test_list else pd.DataFrame()

        return train, valid, test
profile
Backend Dev / Data Engineer

0개의 댓글

관련 채용 정보