dacon에 현재 열려 있는 "상추의 생육 환경 생성 AI 경진대회"를 준비해보려 한다.
먼저 baseline 코드를 그대로 써보고 이를 해석한 것을 쓴다.
출처: https://dacon.io/competitions/official/236033/codeshare/7081?page=1&dtype=recent
먼저 필요한 라이브러리를 import한다.
import random
import pandas as pd
import numpy as np
import os
import glob
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm.auto import tqdm
import warnings
warnings.filterwarnings(action='ignore')
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
하이퍼파라미터 또한 CFG에 dictionary 형태로 저장한다.
혹시 나중에 hyperparameter를 추가하거나 변경해줄 때, 이 값을 이용하여 변경해주면 될듯하다.
CFG = {
'EPOCHS':30,
'LEARNING_RATE':1e-3,
'BATCH_SIZE':16,
'SEED':41
}
seed를 고정시킨다.
별로 중요해보이지 않지만 나중에 다시 결과를 재현할 때 꼭 필요한 작업이라 일단 해주는 것이 좋다. 물론 학습이 진행되는 데에는 영향을 끼치지 않는다.
def seed_everything(seed):
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True
seed_everything(CFG['SEED']) # Seed 고정
glob 라이브러리를 통해 train_input(target)내에 있는 모든 csv 파일들을 list에 넣는다. sorted를 통해 정렬된 상태로 list를 얻는다.
나 같은 경우는 구글 colab을 통해 실행시킬 것이기 때문에 DATA_PATH를 별도 설정해주었다.
DATA_PATH = "./drive/MyDrive/open"
all_input_list = sorted(glob.glob(f'{DATA_PATH}/train_input/*.csv'))
all_target_list = sorted(glob.glob(f'{DATA_PATH}/train_target/*.csv'))
그 다음은 data를 train set과 eval set으로 split해주는 작업인데, 여러 방법이 있겠지만 baseline에서는 shuffle 작업을 생략하고, 그냥 순서대로 split을 해주었다.
나는 그렇게 하긴 아쉬워서 shuffle을 추가하여 random하게 split을 해주었다.
random.shuffle(all_input_list) # baseline엔 없음
random.shuffle(all_target_list) # baseline엔 없음
train_input_list = all_input_list[:25]
train_target_list = all_target_list[:25]
val_input_list = all_input_list[25:]
val_target_list = all_target_list[25:]
다시 생각해보니 이렇게 input과 target을 따로 섞으면 dataset의 data, label 짝이 안 맞는 문제가 발생한다. 그래서 다음과 같은 방식으로 변경했다.
samples = set(random.sample(range(len(all_input_list)), k = 25))
train_input_list = []
train_target_list = []
val_input_list = []
val_target_list = []
for i in range(len(all_input_list)):
if i in samples:
train_input_list.append(all_input_list[i])
train_target_list.append(all_target_list[i])
else:
val_input_list.append(all_input_list[i])
val_target_list.append(all_target_list[i])
이제 model에 input을 넣기 전 더 좋은 형태로 넣기 위해 data들을 가공할 CustomDataset 클래스이다.
train data의 경우 시간 당 한 번, target data의 경우 하루에 한 번 측정한다.
그래서 train data 24개 묶음이 target data 1개와 하나의 data, label로 리턴되어야 한다.
class CustomDataset(Dataset):
def __init__(self, input_paths, target_paths, infer_mode):
self.input_paths = input_paths # input 데이터 경로 리스트 = train_input_list
self.target_paths = target_paths # target 데이터 경로 리스트 = train_target_list
self.infer_mode = infer_mode # True면 data만 리턴, False면 data 와 label 같이 리턴
self.data_list = []
self.label_list = []
print('Data Pre-processing..')
for input_path, target_path in tqdm(zip(self.input_paths, self.target_paths)):
input_df = pd.read_csv(input_path)
target_df = pd.read_csv(target_path)
input_df = input_df.drop(columns=['obs_time']) # "obs_time" column 삭제
input_df = input_df.fillna(0) # NaN 데이터를 0으로 채움
input_length = int(len(input_df)/24)
target_length = int(len(target_df))
for idx in range(target_length):
time_series = input_df[24*idx:24*(idx+1)].values
self.data_list.append(torch.Tensor(time_series))
for label in target_df["predicted_weight_g"]:
self.label_list.append(label)
print('Done.')
def __getitem__(self, index):
data = self.data_list[index]
label = self.label_list[index]
if self.infer_mode == False:
return data, label
else:
return data
def __len__(self):
return len(self.data_list)
train_dataset = CustomDataset(train_input_list, train_target_list, False)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=6)
val_dataset = CustomDataset(val_input_list, val_target_list, False)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=6)
Baseline에서의 Basemodel은 lstm에 FC layer를 붙인 방식을 채택했다.
대략적으로 (DAT, 내부온도관측치, · · ·, 일간누적총광량)와 같은 15개의 feature를 입력으로 주면, hidden_size가 256인 단방향 lstm을 통과하고 (batch_size, sequence_length, hidden_size)의 output이 나온다.
이 output 중 sequence_length의 마지막 output만을 가지고 classifier에 입력으로 주어 상추의 무게를 예측한다.
class BaseModel(nn.Module):
def __init__(self):
super(BaseModel, self).__init__()
self.lstm = nn.LSTM(input_size=15, hidden_size=256, batch_first=True, bidirectional=False)
# input_size는 feature의 개수와 동일(DAT, 내부온도관측치, · · ·, 일간누적총광량)
# output의 차원 = (batch_size, sequence_length, hidden_size), (h_n, c_n)
# h_n: n번 째 hidden_state
# c_n: n번 째 cell_state
self.classifier = nn.Sequential(
nn.Linear(256, 1),
)
def forward(self, x):
hidden, _ = self.lstm(x)
output = self.classifier(hidden[:,-1,:])
# lstm에 FC layer를 붙여 하나의 값이 return 되게 한다.
return output
train 함수
loss 함수로 L1 loss를 사용한다.
한 epoch이 끝날 때마다 validation loss를 측정하고, validation loss가 가장 낮은 모델을 return 한다.
def train(model, optimizer, train_loader, val_loader, scheduler, device):
model.to(device)
criterion = nn.L1Loss().to(device)
best_loss = 9999
best_model = None
for epoch in range(1, CFG['EPOCHS']+1):
model.train()
train_loss = []
for X, Y in iter(train_loader):
X = X.to(device)
Y = Y.to(device)
optimizer.zero_grad()
output = model(X)
loss = criterion(output, Y)
loss.backward()
optimizer.step()
train_loss.append(loss.item())
val_loss = validation(model, val_loader, criterion, device)
print(f'Train Loss : [{np.mean(train_loss):.5f}] Valid Loss : [{val_loss:.5f}]')
if scheduler is not None:
scheduler.step(val_loss)
if best_loss > val_loss:
best_loss = val_loss
best_model = model
return best_model
validation 함수
def validation(model, val_loader, criterion, device):
model.eval()
val_loss = []
with torch.no_grad():
for X, Y in iter(val_loader):
X = X.float().to(device)
Y = Y.float().to(device)
model_pred = model(X)
loss = criterion(model_pred, Y)
val_loss.append(loss.item())
return np.mean(val_loss)
학습 시작!!!
model = BaseModel() # model 인스턴스 생성
model.eval()
# Adam optimizer 사용
optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG["LEARNING_RATE"])
# ReduceLROnPlateau LR scheduler 사용
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, threshold_mode='abs',min_lr=1e-8, verbose=True)
best_model = train(model, optimizer, train_loader, val_loader, scheduler, device)
다시 glob를 통해 test 데이터 가져오기
test_input_list = sorted(glob.glob('./test_input/*.csv'))
test_target_list = sorted(glob.glob('./test_target/*.csv'))
inference 진행 후 제출할 csv 파일을 만드는 함수
def inference_per_case(model, test_loader, test_path, device):
model.to(device)
model.eval()
pred_list = []
with torch.no_grad():
for X in iter(test_loader):
X = X.float().to(device)
model_pred = model(X)
model_pred = model_pred.cpu().numpy().reshape(-1).tolist()
pred_list += model_pred
submit_df = pd.read_csv(test_path)
submit_df['predicted_weight_g'] = pred_list
submit_df.to_csv(test_path, index=False)
inference 실행
for test_input_path, test_target_path in zip(test_input_list, test_target_list):
test_dataset = CustomDataset([test_input_path], [test_target_path], True)
test_loader = DataLoader(test_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=0)
inference_per_case(best_model, test_loader, test_target_path, device)
생성된 파일 압축하기
import zipfile
os.chdir(f"{DATA_PATH}/test_target/")
submission = zipfile.ZipFile("../submission.zip", 'w')
for path in test_target_list:
path = path.split('/')[-1]
submission.write(path)
submission.close()