오늘은 데이터셋 전처리 코드를 클래스화 해서 작성했다.
추상화 클래스. 데이터셋이 달라질 수 있으므로 기본 추상 클래스를 선 정의.
get_data()
함수를 사용하면 파일이 있으면 불러오고, 없으면 전처리 후 저장하는 로직
_pre_process
만 @abstractmethod으로 처리해서 반드시 자식클래스에서 구현하게 처리했다.
import os
from abc import ABC, abstractmethod
import pandas as pd
class AbstractPreProcessor(ABC):
def __init__(self, dataset: str, data_path: str, export_path: str):
self.dataset = dataset
self.data_path = data_path
self.export_path = export_path
self.export_dfs: dict[str, pd.DataFrame] = {}
@abstractmethod
def _pre_process(self) -> None:
raise NotImplementedError("Not implemented pre_process method")
def get_data(self) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
전처리된 데이터를 로드하거나 전처리 후 저장하는 메서드.
"""
required_files = {
"items": os.path.join(self.export_path, self.dataset, "items.csv"),
"train": os.path.join(self.export_path, self.dataset, "train.csv"),
"valid": os.path.join(self.export_path, self.dataset, "valid.csv"),
"test": os.path.join(self.export_path, self.dataset, "test.csv"),
}
if all(os.path.exists(f) for f in required_files.values()):
return self._load_data(required_files)
return self._process_data()
def _load_data(
self, required_files: dict
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
저장된 데이터 파일들을 로드하는 메서드
"""
print("Loading items, train, valid, test datasets from saved files...")
try:
items = pd.read_csv(required_files["items"])
train = pd.read_csv(required_files["train"])
valid = pd.read_csv(required_files["valid"])
test = pd.read_csv(required_files["test"])
return items, train, valid, test
except Exception as e:
print(f"Error loading files: {e}")
raise
def _process_data(
self,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
데이터를 전처리하고 저장하는 메서드
"""
print("Processed data not found. Running pre_process...")
self._pre_process()
self._save_data()
items = self.export_dfs.get("items")
train = self.export_dfs.get("train")
valid = self.export_dfs.get("valid")
test = self.export_dfs.get("test")
if any(df is None for df in [items, train, valid, test]):
raise ValueError("Missing required DataFrame after preprocessing")
return items, train, valid, test
def _save_data(self) -> None:
"""
전처리된 데이터를 파일로 저장하는 메서드.
"""
os.makedirs(self.export_path, exist_ok=True)
os.makedirs(os.path.join(self.export_path, self.dataset), exist_ok=True)
for key, df in self.export_dfs.items():
file_path = os.path.join(self.export_path, self.dataset, f"{key}.csv")
print(f"Saving {key} to {file_path}")
print(f"{key} column list is {df.columns} - shape({df.shape})")
df.to_csv(file_path, index=False)
위의 추상클래스를 구현한 클래스. MovieLens 20m을 전처리
_pre_process
를 상속해서 작성.
평점이 4점 이상인 interaction만 positive.
나머지 interaction은 get_user_interactions()로 가능.
leave-one-out 방식으로 train/valid/test 분할
import os
from collections import defaultdict
import numpy as np
import pandas as pd
from tqdm import tqdm
from .abstract_preprocessor import AbstractPreProcessor
class MovieLensPreProcessor(AbstractPreProcessor):
def __init__(
self,
dataset: str,
data_path: str,
export_path: str,
):
super().__init__(dataset, data_path, export_path)
self.data = {
"items": pd.read_csv(os.path.join(data_path, "movies.csv")),
"ratings": pd.read_csv(os.path.join(data_path, "ratings.csv")),
# "tags": pd.read_csv(os.path.join(data_path, "tags.csv")),
}
def get_user_interactions(self) -> defaultdict:
"""
유저별 상호작용 아이템을 반환하는 함수
"""
ratings = self.data["ratings"]
user_interactions = defaultdict(list)
for user_id, item_id in zip(ratings["userId"], ratings["movieId"]):
user_interactions[user_id].append(item_id)
return user_interactions
def _pre_process(self) -> None:
items = self.data["items"]
ratings = self.data["ratings"]
# items 전처리
items = self._clean_movie_titles_and_years(items)
# items = self._preprocess_genres(items)
# ratings 전처리
ratings.rename(
columns={
"movieId": "item_id",
"userId": "user_id",
"rating": "rating",
"timestamp": "timestamp",
},
inplace=True,
)
# 유저 리뷰 수 기준 이상치 제거 및 train-test split
ratings = self._filter_users_with_interactions(ratings)
train, valid, test = self._split_train_valid_test(ratings)
# 영화 일정 리뷰 수 이하 제거 (추후 구현 필요)
# export_dfs에 전처리된 데이터 저장
self.export_dfs = {
"items": items,
"train": train,
"valid": valid,
"test": test,
}
def _preprocess_genres(self, items: pd.DataFrame) -> pd.DataFrame:
"""
추후 장르 원핫인코딩 or 멀티 인코딩 코드 상의 후 작성
"""
return items
def _clean_movie_titles_and_years(self, items) -> pd.DataFrame:
"""
items에서 year 생성 및 결측치 처리
title에서 year 제거
movieId를 item_id로 변경
"""
# items의 title에서 연도 제거
items["year"] = items["title"].str.extract(r"\((\d{4})\)")
# 제목에서 연도 부분 제거
items["title"] = items["title"].str.replace(r" \(\d{4}\)", "", regex=True)
# year 컬럼을 정수형으로 변환
items["year"] = items["year"].fillna(0).astype(int)
year_json = {
"40697": 1993,
"79607": 1970,
"87442": 2010,
"107434": 2009,
"108548": 2007,
"108583": 1975,
"112406": 2019,
"113190": 2021,
"115133": 1996,
"115685": 2011,
"125571": 1990,
"125632": 2002,
"125958": 2008,
"126438": 2013,
"126929": 2014,
"127005": 1991,
"128612": 2015,
"128734": 2014,
"129651": 2010,
"129705": 2014,
"129887": 2003,
"130454": 1993,
}
# year_json의 key와 value를 items 데이터프레임에 반영
for key, value in year_json.items():
key = int(key)
items.loc[items["movieId"] == int(key), "year"] = value
items.rename(columns={"movieId": "item_id"}, inplace=True)
return items
def _filter_users_with_interactions(self, ratings: pd.DataFrame) -> pd.DataFrame:
"""
사용자별 상호작용 데이터를 필터링하고 interaction 컬럼을 생성
"""
# IQR 우선 계산
user_rating_counts = (
ratings.groupby("user_id").size().reset_index(name="rating_count")
)
Q1 = user_rating_counts["rating_count"].quantile(0.25)
Q3 = user_rating_counts["rating_count"].quantile(0.75)
IQR = Q3 - Q1
upper_fence = Q3 + 1.5 * IQR
# 4.0 이상 평점이 2개 이하인 유저 필터링 (valid와 test, train에 하나씩 필요)
user_high_ratings = (
ratings[ratings["rating"] >= 4.0]
.groupby("user_id")
.size()
.reset_index(name="high_rating_count")
)
valid_users = user_high_ratings[user_high_ratings["high_rating_count"] > 2][
"user_id"
]
filtered_ratings = ratings[ratings["user_id"].isin(valid_users)].copy()
# IQR 기반 이상치 제거
valid_users = user_rating_counts[
user_rating_counts["rating_count"] <= upper_fence
]["user_id"]
filtered_ratings = filtered_ratings[
filtered_ratings["user_id"].isin(valid_users)
]
# interaction 컬럼 생성
filtered_ratings.loc[:, "interaction"] = (
filtered_ratings["rating"] >= 4.0
).astype(int)
return filtered_ratings
def _split_train_valid_test(
self, ratings
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
train_list, valid_list, test_list = [], [], []
# user_id 기준 그룹화
grouped = ratings.groupby("user_id", group_keys=False)
for _, group in tqdm(grouped, desc="Splitting train/valid/test"):
# positive,negative 상호작용 분리
pos_mask = group["interaction"] == 1
pos_interactions = group[pos_mask]
# 상위 2개 timestamp 추출
top2 = pos_interactions.nlargest(2, "timestamp")
# Test/Valid 분리
test_data = top2.iloc[[0]] if len(top2) >= 1 else None # 최신 1개
valid_data = top2.iloc[[1]] if len(top2) >= 2 else None # 차순위 1개
# Train 데이터 구성
## 남은 positive: 전체 positive에서 top2 제외
train_data = pos_interactions.drop(top2.index, errors="ignore")
# 데이터 추가
if test_data is not None:
test_list.append(test_data)
if valid_data is not None:
valid_list.append(valid_data)
if not train_data.empty:
train_list.append(train_data)
# DataFrame 병합
train = (
pd.concat(train_list, ignore_index=True) if train_list else pd.DataFrame()
)
valid = (
pd.concat(valid_list, ignore_index=True) if valid_list else pd.DataFrame()
)
test = pd.concat(test_list, ignore_index=True) if test_list else pd.DataFrame()
return train, valid, test