import os
from PIL import Image
import matplotlib.pyplot as plt
import cv2
import numpy as np
import torch
import torchvision
import torch.nn as nn
import time
import copy
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from tqdm.auto import tqdm
import torch.nn.functional as F
def conv_output_size(input_size, filter_size, padding, stride):
output_size = ( (input_size - filter_size + 2*padding)//stride ) + 1
return output_size
def conv_padding_size(input_size, filter_size, stride, output_size):
padding = ( ( output_size - 1 )*stride -input_size + filter_size ) / 2
print(padding)
if str(padding)[-2:] == '.5':
padding += 1 -0.5
return padding
print(conv_output_size(224,3,1,1))
print(conv_padding_size(224,3,1,224))
train_list = os.listdir('/home/mskang/hyeokjong/birds/rename_train')
val_list = os.listdir('/home/mskang/hyeokjong/birds/rename_valid')
class custom_dataset(Dataset):
def __init__(self, input_dir, transform = None):
self.input_dir = input_dir
self.input_list = os.listdir(input_dir)
self.transform = transform
def __len__(self):
return len(self.input_list)
def __getitem__(self,idx):
os.chdir(self.input_dir)
input_image_numpy = cv2.imread(self.input_list[idx])
target_class = int(self.input_list[idx][0:2])
if self.transform:
input_image_numpy = self.transform(input_image_numpy)
input_tensor = torchvision.transforms.functional.to_tensor(input_image_numpy)
target_tensor = torch.tensor(target_class)
return (input_tensor, target_tensor)
class RandomFlip(object):
def __init__(self, horizontal = True, vertical = False, p = 0.5):
self.horizontal = horizontal
self.vertical = vertical
self.p = p
def __call__(self, inputs):
if (self.horizontal) and (np.random.rand() > self.p):
inputs = cv2.flip(inputs,1)
if (self.vertical) and (np.random.rand() > self.p):
inputs = cv2.flip(inputs,0)
return inputs
train_dataset = custom_dataset('/home/mskang/hyeokjong/birds/rename_train', RandomFlip())
val_dataset = custom_dataset('/home/mskang/hyeokjong/birds/rename_valid', RandomFlip())
device = 'cuda:1'
batch_size = 64
train_dl = DataLoader(train_dataset, batch_size, shuffle=True,
num_workers=4, pin_memory=True)
val_dl = DataLoader(val_dataset, batch_size, shuffle=True,
num_workers=4, pin_memory=True)
for i,j in train_dl:
train_input=i.to(device)
train_target=j.to(device)
break
for i,j in val_dl:
val_input=i.to(device)
val_target=j.to(device)
break
print(train_input.shape, train_target, val_input.shape, val_target,sep='\n')
class Flatten(nn.Module):
def forward(self, input):
return input.view(input.size(0), -1)
class test_model(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(in_channels = 3, out_channels = 50, kernel_size = 3, stride=1, padding=1, bias = False)
self.maxpool1 = nn.MaxPool2d(2, 2)
self.bn1 = nn.BatchNorm2d(50)
self.conv2_ = nn.Conv2d(in_channels = 50, out_channels = 10, kernel_size = 1, stride=1, padding=0, bias = False)
self.conv2 = nn.Conv2d(in_channels = 10, out_channels = 50, kernel_size = 3, stride=1, padding=1, bias = False)
self.maxpool2 = nn.MaxPool2d(2, 2)
self.bn2 = nn.BatchNorm2d(50)
self.conv3_ = nn.Conv2d(in_channels = 50, out_channels = 10, kernel_size = 1, stride=1, padding=0, bias = False)
self.conv3 = nn.Conv2d(in_channels = 10, out_channels = 50, kernel_size = 3, stride=1, padding=1, bias = False)
self.maxpool3 = nn.MaxPool2d(2, 2)
self.bn3 = nn.BatchNorm2d(50)
self.conv4_ = nn.Conv2d(in_channels = 50, out_channels = 20, kernel_size = 1, stride=1, padding=0, bias = False)
self.conv4 = nn.Conv2d(in_channels =20, out_channels = 100, kernel_size = 3, stride=1, padding=1, bias = False)
self.maxpool4 = nn.MaxPool2d(2, 2)
self.bn4 = nn.BatchNorm2d(100)
self.conv5_ = nn.Conv2d(in_channels = 100, out_channels = 10, kernel_size = 1, stride=1, padding=0, bias = False)
self.conv5 = nn.Conv2d(in_channels = 10, out_channels = 100, kernel_size = 3, stride=1, padding=1, bias = False)
self.maxpool5 = nn.MaxPool2d(2, 2)
self.bn5 = nn.BatchNorm2d(100)
self.fc = nn.Sequential(Flatten(),
nn.Linear(100*7*7, 100),
nn.ReLU(),
nn.Linear(100,100))
def forward(self, inputs):
feature_map1 = self.conv1(inputs)
feature_map1 = self.maxpool1(feature_map1)
feature_map1 = self.bn1(feature_map1)
feature_map2 = self.conv2_(feature_map1)
feature_map2 = self.conv2(feature_map2)
feature_map2 = self.maxpool2(feature_map2)
feature_map2 = self.bn2(feature_map2)
feature_map3 = self.conv3_(feature_map2)
feature_map3 = self.conv3(feature_map3)
feature_map3 = self.maxpool3(feature_map3)
feature_map3 = self.bn3(feature_map3)
feature_map4 = self.conv4_(feature_map3)
feature_map4 = self.conv4(feature_map4)
feature_map4 = self.maxpool4(feature_map4)
feature_map4 = self.bn4(feature_map4)
feature_map5 = self.conv5_(feature_map4)
feature_map5 = self.conv5(feature_map5)
feature_map5 = self.maxpool5(feature_map5)
feature_map5 = self.bn5(feature_map5)
output = self.fc(feature_map5)
return output
import pytorch_model_summary
from torchinfo import summary
model = test_model().to(device)
x = train_input
print(pytorch_model_summary.summary(model, x, show_input=True))
print('!@#'*40)
summary(model, input_size = x.shape )
def get_lr(opt):
for param_group in opt.param_groups:
return param_group['lr']
opt = torch.optim.Adam(model.parameters(), lr=0.001)
from torch.optim.lr_scheduler import ReduceLROnPlateau
lr_scheduler = ReduceLROnPlateau(opt, mode='min', factor=0.1, patience=10)
def metric_function(output, target):
_, argmax = torch.max(output, dim = 1)
corrects = (argmax == target).sum()
return corrects
loss_function = nn.CrossEntropyLoss(reduction = 'sum')
def train_val(model, params):
num_epochs=params['num_epochs']
loss_func=params["loss_func"]
opt=params["optimizer"]
train_dl=params["train_dl"]
val_dl=params["val_dl"]
lr_scheduler=params["lr_scheduler"]
path2weights=params["path2weights"]
loss_history = {'train': [], 'val': []}
metric_history = {'train': [], 'val': []}
best_model_weight = copy.deepcopy(model.state_dict())
best_loss = float('inf')
start_time = time.time()
for epoch in range(num_epochs):
current_lr = get_lr(opt)
print('Epoch {}/{}, current lr={}'.format(epoch, num_epochs-1, current_lr))
model.train()
running_loss = 0.0
running_metric = 0.0
len_data_train = len(train_dl.dataset)
for inputs, targets in train_dl:
inputs , targets = inputs.to(device) , targets.to(device)
outputs = model(inputs)
loss_batch = loss_func(outputs, targets)
metric_batch = metric_function(outputs, targets)
opt.zero_grad()
loss_batch.backward()
opt.step()
running_loss += loss_batch.item()
running_metric += metric_batch
train_loss = running_loss / len_data_train
train_metric = running_metric / len_data_train
loss_history['train'].append(train_loss)
metric_history['train'].append(train_metric)
model.eval()
with torch.no_grad():
running_loss = 0.0
running_metric = 0.0
len_data_val = len(val_dl.dataset)
for inputs, targets in val_dl:
inputs , targets = inputs.to(device) , targets.to(device)
outputs = model(inputs)
loss_batch = loss_func(outputs, targets)
metric_batch = metric_function(outputs, targets)
running_loss += loss_batch.item()
running_metric += metric_batch
val_loss = running_loss / len_data_val
val_metric = running_metric / len_data_val
loss_history['val'].append(val_loss)
metric_history['val'].append(val_metric)
if val_loss < best_loss:
best_loss = val_loss
best_model_weight = copy.deepcopy(model.state_dict())
torch.save(model.state_dict(), path2weights)
print('Copied best model weights!')
print('Get best val_loss')
lr_scheduler.step(val_loss)
print('train loss: %.6f, train accuracy: %.2f' %(train_loss, 100*train_metric))
print('val loss: %.6f, val accuracy: %.2f' %(val_loss, 100*val_metric))
print('time: %.4f min' %((time.time()-start_time)/60))
print('-'*50)
model.load_state_dict(best_model_weight)
return model, loss_history, metric_history
params_train = {
'num_epochs':30,
'optimizer':opt,
'loss_func':loss_function,
'train_dl':train_dl,
'val_dl':val_dl,
'lr_scheduler':lr_scheduler,
'path2weights':'/home/mskang/hyeokjong/birds/best_model.pt',
}
model = model.to(device)
model, loss_hist, metric_hist = train_val(model, params_train)
num_epochs=params_train["num_epochs"]
plt.title("Train-Val Loss")
plt.plot(range(1,num_epochs+1),loss_hist["train"],label="train")
plt.plot(range(1,num_epochs+1),loss_hist["val"],label="val")
plt.ylabel("Loss")
plt.xlabel("Training Epochs")
plt.legend()
plt.show()
plt.title("Train-Val Accuracy")
plt.plot(range(1,num_epochs+1),metric_hist["train"],label="train")
plt.plot(range(1,num_epochs+1),metric_hist["val"],label="val")
plt.ylabel("Accuracy")
plt.xlabel("Training Epochs")
plt.legend()
plt.show()