
PyTorch의 사용법과 Tensor, 자동 미분 엔진에 대해서 먼저 배우자.
Tensor는 차원(rank, order)에 대한 특징을 갖는 수학적 객체이다.
.to로 데이터 type을 바꿀 수 있다. .shape, .reshape, .view(원본 데이터가 연속적이면 reshape), .T, .matmul, @(.matmul과 동일)
requires_grad가 True인 terminal node에 computational graph를 짓는다.
grad 함수를 호출하여 각 파라미터의 손실의 gradient를 계산할 수 있다..backward를 호출하여 모든 leaf node의 gradient를 계산할 수 있으며, 결과는 텐서의 .grad에 저장된다.torch.nn.Module base class로 model을 build하고 train한다.__init__ 생성자에서 forward method와 layer가 어떻게 상호작용할지를 정의한다.backward method는 손실함수의 gradient를 계산한다.grad_fn을 알 수 있다.Sequential class는 필수는 아니지만, 특정 순서에 실행시킬 때 유용하다.torch.no_grad()로 훈련 이후 예측 때 불필요한 학습을 막을 수 있다.
__init__, __getitem__, __len__가 필수이다. __getitem__에서는 인덱스를 통해 데이터셋에서 정확히 하나의 항목만 반환한다.__len__은 데이터셋의 길이를 반환한다.drop_last = True를 통해 마지막 batch가 나누어 떨어지지 않으면 drop할 수 있다.num_workers CPU의 서브 프로세스 수를 지정하여 병렬적인 data 적재를 할 수 있다..zero_grad()를 통해 의도하지 않은 gradient 축적을 막는다..step()을 통해 gradient를 사용하여 모델 파라미터를 업데이트한다..train()과 .eval()로 훈련과 추론 모드를 설정한다. 이러한 구분은 dropout이나 batch normalization layers를 다르게 두기 위해서는 필수적이다.softmax대신 torch.argmax로 예측한 결과값이 어떤 클래스에 속하는지 return 받을 수 있다. 이때 dim=1이면 각 행에 대한 최대값이다.==와 torch.sum 통해 예측과 실제값이 같은지를 T/F로 확인하거나 count할 수 있다.model.save()로 모델을 저장하고, model.load_state_dict(torch.load("model.pt"))로 모델을 파이썬 딕셔너리 형태로 load한다.torch.cuda.is_available()로 GPU 사용을 확인하며, .to()로 GPUs 간 이동이 가능하다. 하지만, 같은 device에 존재해야만 한다.# device를 정의하고 모델이나 메소드에 적용한다.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
# NEW imports:
import os
import platform
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
def ddp_setup(rank, world_size):
"""
Arguments:
rank: a unique process ID
world_size: total number of processes in the group
"""
# rank of machine running rank:0 process
# here, we assume all GPUs are on the same machine
os.environ["MASTER_ADDR"] = "localhost"
# any free port on the machine
os.environ["MASTER_PORT"] = "12345"
if platform.system() == "Windows":
# Disable libuv because PyTorch for Windows isn't built with support
os.environ["USE_LIBUV"] = "0"
# initialize process group
if platform.system() == "Windows":
# Windows users may have to use "gloo" instead of "nccl" as backend
# gloo: Facebook Collective Communication Library
init_process_group(backend="gloo", rank=rank, world_size=world_size)
else:
# nccl: NVIDIA Collective Communication Library
init_process_group(backend="nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
def prepare_dataset():
train_ds = ToyDataset(X_train, y_train)
test_ds = ToyDataset(X_test, y_test)
train_loader = DataLoader(
dataset=train_ds,
batch_size=2,
shuffle=False, # NEW: False because of DistributedSampler below
pin_memory=True, # 훈련 동안 빠른 메모리 전송 허용
drop_last=True,
# NEW: chunk batches across GPUs without overlapping samples:
sampler=DistributedSampler(train_ds) # NEW
)
test_loader = DataLoader(
dataset=test_ds,
batch_size=2,
shuffle=False,
)
return train_loader, test_loader
# NEW: wrapper
def main(rank, world_size, num_epochs):
ddp_setup(rank, world_size) # NEW: initialize process groups
train_loader, test_loader = prepare_dataset()
model = NeuralNetwork(num_inputs=2, num_outputs=2)
model.to(rank)
optimizer = torch.optim.SGD(model.parameters(), lr=0.5)
model = DDP(model, device_ids=[rank]) # NEW: wrap model with DDP
# the core model is now accessible as model.module
for epoch in range(num_epochs):
# NEW: Set sampler to ensure each epoch has a different shuffle order
train_loader.sampler.set_epoch(epoch)
model.train()
for features, labels in train_loader:
features, labels = features.to(rank), labels.to(rank) # New: use rank
logits = model(features)
loss = F.cross_entropy(logits, labels) # Loss function
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"[GPU{rank}] Epoch: {epoch+1:03d}/{num_epochs:03d}"
f" | Batchsize {labels.shape[0]:03d}"
f" | Train/Val Loss: {loss:.2f}")
model.eval()
train_acc = compute_accuracy(model, train_loader, device=rank)
print(f"[GPU{rank}] Training accuracy", train_acc)
test_acc = compute_accuracy(model, test_loader, device=rank)
print(f"[GPU{rank}] Test accuracy", test_acc)
destroy_process_group() # NEW: cleanly exit distributed mode
if __name__ == "__main__":
print("Number of GPUs available:", torch.cuda.device_count())
# NEW: spawn new processes
# note that spawn will automatically pass the rank
num_epochs = 3
world_size = torch.cuda.device_count()
mp.spawn(main, args=(world_size, num_epochs), nprocs=world_size)
# nprocs=world_size spawns one process per GPU
multiprocessesing.spawn으로 새로운 프로세스를 GPU 수만큼 생성한다. multiple input에 함수를 병렬적으로 적용한다.rank는 GPU ID로 사용하는 프로세스 ID를 의미하고 자동으로 pass한다.model은 DDP로 감싸서 훈련 동안 모든 process에서의 gradient를 동기화한다.ddp_setup에서 IP와 port를 정의 후 GPU간 소통을 위해 만들어진 NCCL backend에서 프로세스 group을 초기화한다.CUDA_VISIBLE_DEVICES=0, 2를 통해 사용할 GPU를 지정할 수 있다.