Pytorch starter - FasterRCNN Train
- FasterRCNN from torchvision
- Use Resnet50 backbone
- Albumentation enabled (simple flip for now)
OverView
목표
이번 Global Wheat Detection Competition은 이미지에서 밀의 머리를 감지하는 모델을 개발하는 것이 목표이다.
입출력 데이터의 특성
밀 식물의 사진과 각 이미지에 대한 밀 머리 위치의 경계 상자 정보를 포함하고 있다.
이에 따라 출력 데이터는 주어지는 이미지에 대해 모델에 의해 예측된 밀 머리 위치의 경계상자를 포함해야 한다.
Code
Pytorch를 사용하기 위한 기본적인 library들을 import한다.
특이하게 detection을 위한 FasterRCNN과 image augmentations을 위한 albumentations가 있다는 것을 확인할 수 있다.
import pandas as pd import numpy as np import cv2 import os import re from PIL import Image import albumentations as A from albumentations.pytorch.transforms import ToTensorV2 import torch import torchvision from torchvision.models.detection.faster_rcnn import FastRCNNPredictor from torchvision.models.detection import FasterRCNN from torchvision.models.detection.rpn import AnchorGenerator from torch.utils.data import DataLoader, Dataset from torch.utils.data.sampler import SequentialSampler import matplotlib.pyplot as plt DIR_INPUT = '/kaggle/input/global-wheat-detection' DIR_TRAIN = f'{DIR_INPUT}/train' DIR_TEST = f'{DIR_INPUT}/test' train_df = pd.read_csv(f'{DIR_INPUT}/train.csv') train_df.shape
train_df에 들어있는 bbox feature값을 보다 편리하게 [x, y, w, h] 각각으로 feature를 나누기 위한 가벼운 전처리 작업을 해준다.
# 정리된 bbox값을 x,y,w,h feature로 구분해주는 작업. train_df['x'] = -1 train_df['y'] = -1 train_df['w'] = -1 train_df['h'] = -1 def expand_bbox(x) : r = np.array(re.findall("[0-9]+[.]?[0-9]*", x)) if len(r) == 0 : r = [-1, -1, -1, -1] return r train_df[['x', 'y', 'w', 'h']] = np.stack(train_df['bbox'].apply(lambda x : expand_bbox(x))) train_df.drop(columns=['bbox'], inplace = True)
image_id feature의 고유한 값들을 모두 뽑아 image_ids에 저장한 후 그 중 마지막 665개의 데이터를 valid data(검증 데이터), 나머지를 train data(학습 데이터)로 나누어준다.
image_ids = train_df['image_id'].unique() valid_ids = image_ids[-665:] train_ids = image_ids[:-665]
# 검증 데이터셋 & 학습 데이터셋 분리 valid_df = train_df[train_df['image_id'].isin(valid_ids)] train_df = train_df[train_df['image_id'].isin(train_ids)]
valid_df.shape, train_df.shapeoutput: ((25006, 8), (122787, 8))
Custome Dataset을 왜 사용하는가?
데이터 로드와 증강을 포함한 전처리 과정을 유연하게 조정하기 위해 사용.
필수 magic method
Dataset을 상속받는 Custome Dataset class를 정의 할 때 필수로 overiding해야하는 magic method가 있다.
중요하게는 getitem magic method에서 특정 1개의 샘플을 가져올 때 image처리된 정보를 return하기 위해 processing과정을 진행한다.
그렇게 해당 index에 해당하는 image, target, image_id를 return하게 해준다.
class WheatDataset(Dataset) : # 데이터셋의 전처리를 해주는 부분 def __init__(self, dataframe, >image_dir, transforms=None) : super().__init__() self.image_ids = dataframe['image_id'].unique() self.df = dataframe self.image_dir = image_dir # 데이터 증강이나 전처리 사용할 것인지 안할 것인지. self.transforms = transforms # 데이터셋에서 특정 1개의 샘플을 가져오는 함수 def __getitem__(self, index: int) : # 주어진 index에 해당하는 이미지 id 가져오기 image_id = self.image_ids[index] # 해당 image_id들을 포함하는 데이터 프레임 선택하기 records = self.df[self.df['image_id'] == image_id] # 이미지 파일을 읽고 RGB로 변환하여 정규화 image = cv2.imread(f'{self.image_dir}/{image_id}.jpg', cv2.IMREAD_COLOR) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32) image /= 255.0 # 바운딩 박스 좌표를 배열로 변환 boxes = records[['x','y','w','h']].values.astype(float) # (x1,y1), (x2,y2) 좌표로 변환 boxes[:, 2] = boxes[:, 0] + boxes[:, 2] boxes[:, 3] = boxes[:, 1] + boxes[:, 3] area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0]) area = torch.as_tensor(area, dtype=torch.float32) # 모든 객체에 대해 동일한 라벨(1)을 부여 labels = torch.ones((records.shape[0], ), dtype=torch.int64) # 모든 객체가 crowd가 아니라고 가정(일단 답이 아니라고 가정) iscrowd = torch.zeros((records.shape[0], ), dtype=torch.int64) target = {'boxes' : boxes, 'labels' : labels, 'image_id' : torch.tensor([index]), 'area' : area, 'iscrowd' : iscrowd} # 변환이 설정되어 있는 경우(데이터 증강을 하는 경우) if self.transforms : sample = { 'image' : image, 'bboxes' : >target['boxes'], 'labels' : labels } sample = self.transforms(**sample) image = sample['image'] target['boxes'] = torch.stack(tuple(map(torch.tensor, zip(*sample['bboxes'])))).permute(1,0) return image, target, image_id # 데이터셋의 길이. 즉, 총 샘플의 수를 적어주는 부분 def __len__(self) -> int : return self.image_ids.shape[0]
Data augmentation을 위해 albumentations library를 import한다.
0.5확률로 image를 flip하고 Tensor로 변환하는 변환과정을 거친다.
import albumentations as A from albumentations.pytorch.transforms import ToTensorV2 def get_train_transform() : # Compose는 여러 변환을 하나로 묶어주는 역할을 함. # 리스트 형태로 여러 변환을 받아 순차적으로 적용. return A.Compose([ A.Flip(0.5), ToTensorV2(p=1.0) ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']}) def get_valid_transform() : return A.Compose([ ToTensorV2(p=1.0) ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
num_classes = 2 # 1 class (Wheat) + background # get number of input features for the classifier(1024) in_features = model.roi_heads.box_predictor.cls_score.in_features # replace the pre_trained head with a new one model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
class Averager : def __init__(self) : self.current_total = 0.0 self.iterations = 0.0 def send(self, value) : self.current_total += value self.iterations += 1 @property def value(self) : if self.iterations == 0 : return 0 else : return 1.0 * self.current_total / self.iterations def reset(self) : self.current_total = 0.0 self.iterations = 0.0
DataLoader를 왜 사용하는가?
DataLoader는 Dataset을 샘플에 쉽게 접근할 수 있도록 순회 가능한 객체로 감싸며 데이터를 batch 단위로 load하는 역할을 한다.
def collate_fn(batch) : return tuple(zip(*batch)) train_dataset = WheatDataset(train_df, DIR_TRAIN, get_train_transform()) valid_dataset = WheatDataset(valid_df, DIR_TRAIN, get_valid_transform()) # split the dataset in train and test set # len(train_dataset)만큼 랜덤 순열을 생성한다. indices = torch.randperm(len(train_dataset)).tolist() # DataLoader 생성 train_data_loader = DataLoader( train_dataset, batch_size = 16, shuffle = False, num_workers = 4, collate_fn = collate_fn) valid_data_loader = DataLoader( valid_dataset, batch_size = 8, shuffle = False, num_workers = 4, collate_fn = collate_fn)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
images, targets, image_ids = next(iter(train_data_loader)) images = list(image.to(device) for image in images) targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
boxes = targets[0]['boxes'].cpu().numpy().astype(np.int32) sample = images[0].permute(1,2,0).cpu().numpy()
print(boxes.shape, sample.shape)output: (47, 4) (1024, 1024, 3)
sample&Boxes 이미지 확인
fig, ax = plt.subplots(1, 1, figsize = (16,8)) for box in boxes : cv2.rectangle(sample, (box[0], box[1]), (box[2], box[3]), (255, 255, 255), 3) ax.set_axis_off() ax.imshow(sample)

model.to(device) params = [p for p in model.parameters() if p.requires_grad] optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005) lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size = 3, gamma= 0.1) lr_scheduler = None num_epochs = 2
loss_hist = Averager() itr = 1 for epoch in range(num_epochs) : loss_hist.reset() for images, targets, image_ids in train_data_loader : images = list(image.to(device) for image in images) targets = [{k:v.to(device) for k, v in t.items()} for t in targets] loss_dict = model(images, targets) losses = sum(loss for loss in loss_dict.values()) loss_value = losses.item() loss_hist.send(loss_value) optimizer.zero_grad() losses.backward() optimizer.step() if itr % 50 == 0 : print(f'Iteration #{itr} loss: {loss_value}') itr += 1 if lr_scheduler is not None : lr_scheduler.step() print(f'Epoch #{epoch} loss: {loss_hist.value}')Iteration #50 loss: 1.099586668477545
Iteration #100 loss: 0.8676293716363129
Iteration #150 loss: 0.865155978790981
Epoch #0 loss: 1.050750487340164
Iteration #200 loss: 0.9103968866132544
Iteration #250 loss: 0.900699060360379
Iteration #300 loss: 0.7227939488408983
Epoch #1 loss: 0.8960660341487151
images, targets, image_ids = next(iter(valid_data_loader))
images = list(img.to(device) for img in images) targets = [{k:v.to(device) for k,v in t.items()} for t in targets]
boxes = targets[1]['boxes'].cpu().numpy().astype(np.int32) sample = images[1].permute(1,2,0).cpu().numpy()
model.eval() cpu_device = torch.device('cpu') outputs = model(images) outputs = [{k:v.to(cpu_device) for k,v in t.items()} for t in outputs]
fig, ax = plt.subplots(1, 1, figsize = (16,8)) for box in boxes : cv2.rectangle(sample, (box[0], box[1]), (box[2], box[3]), (255, 255, 255), 3) ax.set_axis_off() ax.imshow(sample)
torch.save(model.state_dict(), 'fasterrcnn_resnet50_fpn.pth')