PyTorch의 data loading utiliy에 대해 이해하고 나만의 DataLoader를 구성해보자.
Reference : PyTorch offical document
일단은 기본적으로 torch module을 import해준다.
import torch
from torch import Tensor
from torch.utils.data import Dataset, DataLoader, SequentialSampler, RandomSampler, SubsetRandomSampler, BatchSampler
DataLoader는 Dataset에 대한 iterative한 객체로 data를 추출한다.
DataLoader는 아래와 같은 argument를 받는다.
DataLoader(dataset, batch_size=1, shuffle=False, sampler=None,
batch_sampler=None, num_workers=0, collate_fn=None,
pin_memory=False, drop_last=False, timeout=0,
worker_init_fn=None, *, prefetch_factor=2,
persistent_workers=False)
이 포스팅에서는 특히 Dataset, Sampler, Collate에 대해서 다루어 보려고 한다.
DataLoader에서 가장 중요한 요소로, data를 추출할 source이다. Map-style과 iterative-style이 있는데, 여기서는 map-style만 다루기로 한다.
Map-style dataset은 데이터 sample을 indices/key로부터 mapping하는 방식이다.
즉, dataset[idx]를 통해 data에 접근할 수 있다.
Map-style dataset은 __len__과 __getitem__ 프로토콜을 가지고 있어야 한다.
가장 기본적인 dataset의 구조는 아래와 같이 정의될 수 있다.
class DataWrapper(Dataset):
def __init__(self):
self.file_list = [[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]]
self.label = [[0],[0],[0],[0],[0],[0],[0],[0],[1],[1]]
def __len__(self):
return len(self.file_list)
def __getitem__(self, idx):
data = self.file_list[idx]
label = self.label[idx]
return {"input": data, "label": label}
my_dataset = DataWrapper()
print(my_dataset[0])
> print output
{'input': [0], 'label': [0]}
사실 Dataset만 준비된다면 바로 DataLoader를 이용해 데이터를 loading해볼 수 있다.
my_dataloader = DataLoader(
my_dataset,
batch_size = 1
)
for data in my_dataloader:
print(data)
> print output
{'input': [tensor([0])], 'label': [tensor([0])]}
{'input': [tensor([1])], 'label': [tensor([0])]}
{'input': [tensor([2])], 'label': [tensor([0])]}
{'input': [tensor([3])], 'label': [tensor([0])]}
{'input': [tensor([4])], 'label': [tensor([0])]}
{'input': [tensor([5])], 'label': [tensor([0])]}
{'input': [tensor([6])], 'label': [tensor([0])]}
{'input': [tensor([7])], 'label': [tensor([0])]}
{'input': [tensor([8])], 'label': [tensor([1])]}
{'input': [tensor([9])], 'label': [tensor([1])]}
원하는 batch_size로 데이터를 loading할 수도 있다. 이때, drop_last=True옵션을 주면, batch_size에 맞지 않는 batch를 버려준다.
my_dataloader = DataLoader(
my_dataset,
batch_size = 4
)
for data in my_dataloader:
print(data)
--------------------------------------------------
my_dataloader = DataLoader(
my_dataset,
batch_size = 4
)
for data in my_dataloader:
print(data)
> print output
{'input': [tensor([0, 1, 2, 3])], 'label': [tensor([0, 0, 0, 0])]}
{'input': [tensor([4, 5, 6, 7])], 'label': [tensor([0, 0, 0, 0])]}
{'input': [tensor([8, 9])], 'label': [tensor([1, 1])]}
--------------------------------------------------
> print output
{'input': [tensor([0, 1, 2, 3])], 'label': [tensor([0, 0, 0, 0])]}
{'input': [tensor([4, 5, 6, 7])], 'label': [tensor([0, 0, 0, 0])]}
근데 결과를 보면 항상 Dataset에서 정의한 index 순서대로만 data가 출력되는 것을 확인할 수 있다. 하지만 실제 모델을 학습시킬 때에는 data distribution 등을 고려해서 sampling strategy를 변경할 수 있다.
이를 위한 것이 바로 Sampler이다.
Sampler는 dataloader가 data를 추출하는 순서를 정해주는 객체로 dataset을 인자로 받고, data의 index를 반환한다.
보통은 Sampler를 선언해서 사용하지는 않고, DataLoader의 shuffle argument를 통해 자동적으로 random하게 index를 섞어줄 수 있다.
my_dataloader = DataLoader(
my_dataset,
batch_size = 1,
shuffle= True
)
for data in my_dataloader:
print(data)
> print output
{'input': [tensor([3])], 'label': [tensor([0])]}
{'input': [tensor([6])], 'label': [tensor([0])]}
{'input': [tensor([5])], 'label': [tensor([0])]}
{'input': [tensor([9])], 'label': [tensor([1])]}
{'input': [tensor([1])], 'label': [tensor([0])]}
{'input': [tensor([4])], 'label': [tensor([0])]}
{'input': [tensor([8])], 'label': [tensor([1])]}
{'input': [tensor([0])], 'label': [tensor([0])]}
{'input': [tensor([2])], 'label': [tensor([0])]}
{'input': [tensor([7])], 'label': [tensor([0])]}
하지만 index를 내가 원하는대로 index를 직접 다루고 싶다면 torch.utils.data.Sampler 클래스를 이용해 data loading에 사용되는 indices/keys 순서를 정할 수 있다. (이때 shuffle은 False)
Sampler는 단순히 인덱스 순서를 다루는 class라고 생각하면 된다.
my_sampler = SequentialSampler(my_dataset)
for idx in my_sampler:
print(idx)
> print output
0 1 2 3 4 5 6 7 8 9
my_sampler = RandomSampler(my_dataset)
for idx in my_sampler:
print(idx)
> print output
2 9 7 0 8 1 6 5 3 4
train_idx = [0,1,2,3,4]
valid_idx = [5,6,7,8,9]
train_sampler = SubsetRandomSampler(train_idx)
valid_sampler = SubsetRandomSampler(valid_idx)
for idx in train_sampler:
print(idx)
--------------------------------------------------
for idx in valid_sampler:
print(idx)
> print output
3 4 1 2 0
--------------------------------------------------
6 7 5 8 9
my_sampler = SequentialSampler(my_dataset)
my_batch_sampler = BatchSampler(my_sampler, batch_size=4, drop_last=False)
for idx in my_batch_sampler:
print(idx)
> print output
[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9]
위에서 소개한 Sampler는 PyTorch에서 이미 구현되어있는 것들이고, 사실은 나만의 Sampler를 만들고 싶었다. 보통은 label distribution에 따라서 over-sampling이나 undersampling을 하기 위해 필요하다. 사실 그것도 weightedSampling 등을 쓰면 되지만 알아두면 나중에 쓸 일이 있을 것 같다.
Sampler는 __iter__와 __len__ 프로토콜을 포함하도록 만들어야 한다.
예를 들어 RandomSampling은 아래와 같은 역할을 수행한다.
class CustomSampler(Sampler):
def __init__(self, dataset, replacement=True):
self.dataset = dataset
self.indices = list(range(len(dataset)))
def __iter__(self):
random.shuffle(self.indices)
return iter(self.indices)
def __len__(self):
return len(self.dataset)
my_sampler = CustomSampler(my_dataset, replacement=False)
for idx in my_sampler:
print(idx)
> print output
7 1 2 4 3 0 6 9 8 5
이를 응용해서 학습 시, class label에 따라 batch에 원하는 class를 갖는 data를 넣어줄 수도 있다. 결국에 Sampler는 내가 정의한 Dataset의 길이만큼의 index sequence를 조작하는 모듈이므로 내가 원하는 data의 index를 원하는 위치에 줄 세워서 순서대로 loading하게 하면 된다.
예를 들어 아래는 하나의 batch에 무조건 하나의 minor class가 포함되도록 하는 Sampler이다. 적은 갯수의 class의 indices를 파악한 후 해당 index가 각 batch size묶음 안에 무조건 포함되도록 최종 indices sequence를 짜주면 된다.
class CustomStratifiedSampler(Sampler):
def __init__(self, my_dataset, batch_size):
self.labels = np.array(my_dataset.label) # [[0], [0], [0], [0], [0], [0], [0], [0], [1], [1]]
self.minority_indices = [i for i, label in enumerate(labels) if label[0] == 1]
self.majority_indices = [i for i, label in enumerate(labels) if label[0] == 0]
self.batch_size = batch_size
# random sampling을 위해서 indices를 섞기
random.shuffle(self.minority_indices)
random.shuffle(self.majority_indices)
# merged indices가 해당 sampler의 최종 sampling indices sequence가 될 것임
self.merged_indices = []
minor_iter = iter(self.minority_indices)
major_iter = iter(self.majority_indices)
# Dataset의 length가 될 때까지 minor class를 먼저 넣고, batch만큼 major class를 채워넣는 방식
while len(self.merged_indices) < len(self.labels):
try:
self.merged_indices.append(next(minor_iter))
except:
minor_iter = iter(self.minority_indices)
self.merged_indices.append(next(minor_iter))
for _ in range(self.batch_size - 1):
if len(self.merged_indices) < len(self.labels):
self.merged_indices.append(next(major_iter))
def __iter__(self):
return iter(self.merged_indices)
def __len__(self):
return len(self.labels)
sampler = CustomStratifiedSampler(my_dataset, batch_size=4)
data_loader = DataLoader(my_dataset, batch_size=4, sampler=sampler, drop_last = False)
for epoch in range(10):
print("==== epoch {} ====".format(epoch))
for data in data_loader:
print(data)
> print output
==== epoch 0 ====
{'input': [tensor([8, 1, 7, 4])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([9, 2, 6, 0])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([8, 5])], 'label': [tensor([1, 0])]}
==== epoch 1 ====
{'input': [tensor([8, 1, 7, 4])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([9, 2, 6, 0])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([8, 5])], 'label': [tensor([1, 0])]}
==== epoch 2 ====
{'input': [tensor([8, 1, 7, 4])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([9, 2, 6, 0])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([8, 5])], 'label': [tensor([1, 0])]}
==== epoch 3 ====
{'input': [tensor([8, 1, 7, 4])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([9, 2, 6, 0])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([8, 5])], 'label': [tensor([1, 0])]}
==== epoch 4 ====
{'input': [tensor([8, 1, 7, 4])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([9, 2, 6, 0])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([8, 5])], 'label': [tensor([1, 0])]}
==== epoch 5 ====
{'input': [tensor([8, 1, 7, 4])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([9, 2, 6, 0])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([8, 5])], 'label': [tensor([1, 0])]}
==== epoch 6 ====
{'input': [tensor([8, 1, 7, 4])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([9, 2, 6, 0])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([8, 5])], 'label': [tensor([1, 0])]}
==== epoch 7 ====
{'input': [tensor([8, 1, 7, 4])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([9, 2, 6, 0])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([8, 5])], 'label': [tensor([1, 0])]}
==== epoch 8 ====
{'input': [tensor([8, 1, 7, 4])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([9, 2, 6, 0])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([8, 5])], 'label': [tensor([1, 0])]}
==== epoch 9 ====
{'input': [tensor([8, 1, 7, 4])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([9, 2, 6, 0])], 'label': [tensor([1, 0, 0, 0])]}
{'input': [tensor([8, 5])], 'label': [tensor([1, 0])]}
비슷한 방식으로 BatchSampler를 활용할 수도 있다.
class CustomStratifiedSampler(BatchSampler):
def __init__(self, my_dataset, batch_size, drop_last):
labels = np.array(my_dataset.label) # [[0], [0], [0], [0], [0], [0], [0], [0], [1], [1]]
self.minority_indices = [i for i, label in enumerate(labels) if label[0] == 1]
self.majority_indices = [i for i, label in enumerate(labels) if label[0] == 0]
self.batch_size = batch_size
self.drop_last = drop_last
def __iter__(self):
random.shuffle(self.minority_indices)
random.shuffle(self.majority_indices)
batch = []
minor_iter = iter(self.minority_indices)
major_iter = iter(self.majority_indices)
for _ in range(len(self)): # Sampler의 길이만큼 yield할 것이므로
try:
batch.append(next(minor_iter))
except:
minor_iter = iter(self.minority_indices)
batch.append(next(minor_iter))
for _ in range(self.batch_size - 1):
batch.append(next(major_iter))
yield batch
batch.clear()
def __len__(self):
if self.drop_last:
return min(len(self.minority_indices), len(self.majority_indices) // (self.batch_size - 1))
else:
return (len(self.minority_indices) + len(self.majority_indices) - 1) // self.batch_size
sampler = CustomStratifiedSampler(my_dataset, batch_size= 4, drop_last=True)
data_loader = DataLoader(my_dataset, batch_sampler=sampler)
Collate_fn은 말 그대로 index에 맞게 추출한 데이터를 batch로 묶을 때, 어떻게 함께 합칠 지를 결정하는 함수이다.
내 경우에는 아직 크게 사용할 일이 없어서 자세히 정리하지는 않았다.
Collate_fn이 dataloader에서 사용되는 개념 자체는 아래와 같다. sampler가 index를 추출해주면 이에 맞게 dataset에서 해당 index를 뽑아서 batch processing을 해준다. (여기서 batch sampler가 뽑는 index는 indices의 list일 것임)
for index in sampler:
yield collate_fn(dataset[index])
만약에 dataloader에 collate_fn을 선언해주지 않으면, automatic batching을 하게 되고, 그냥 default collate fn을 사용하게 된다. default collate fn은 받은 index에 대해서 데이터를 묶어 batch size만큼의 outer dimension을 형성하게 되고 데이터 type을 torch Tensor로 변환한다.
예를 들어 아래와 같이 하나의 batch 예시를 받으면 input에 100을 곱해서 batch로 묶어주는 함수를 만들어보자.
def my_collate_fn(batch):
inputs = [sample['input'] for sample in batch]
labels = [sample['label'] for sample in batch]
inputs = torch.from_numpy(np.array(inputs)).type(torch.float).view(-1)
labels = torch.from_numpy(np.array(labels)).type(torch.float).view(-1)
inputs = inputs * 100
labels = labels
return {'input': inputs,
'label': labels}
이를 위에서 썼던 같은 dataset에 대해서 적용을 해보면, 아래와 같이 원하는 대로 batch 내부 데이터를 processing할 수 있다.
my_sampler = SequentialSampler(my_dataset)
my_dataloader = DataLoader(
my_dataset,
batch_size= 3,
collate_fn=my_collate_fn
)
for data in my_dataloader:
print(data)
원래 collate_fn을 사용하지 않을 때 output
> print output
{'input': [tensor([0, 1, 2])], 'label': [tensor([0, 0, 0])]}
{'input': [tensor([3, 4, 5])], 'label': [tensor([0, 0, 0])]}
{'input': [tensor([6, 7, 8])], 'label': [tensor([0, 0, 1])]}
{'input': [tensor([9])], 'label': [tensor([1])]}
사용한 후 output
> print output
{'input': tensor([ 0., 100., 200.]), 'label': tensor([0., 0., 0.])}
{'input': tensor([300., 400., 500.]), 'label': tensor([0., 0., 0.])}
{'input': tensor([600., 700., 800.]), 'label': tensor([0., 0., 1.])}
{'input': tensor([900.]), 'label': tensor([1.])}
아무 생각 없이 하던대로 사용하던 Torch의 dataset과 dataloader를 조금 정리해봤다. 사실 이런거 몰라도 머신 러닝 코드를 다루는데 전혀 지장은 없지만서도 이해를 하고 사용할수록 전체적인 흐름도 구상하기가 편하고 구현도 자유로워서 좋다.
pin_memory나 workers는 나도 거의 신경 안쓰고 사는데, 이런것도 잘 이해해보면 좋지 않을까 싶다.