Single-Node Multi-GPU Training

JInwoo·2023년 5월 30일
0

이번 튜토리얼에서는 single gpu training code로 부터 multi gpu training을 위해 어떤 부분이 바뀌고 추가되는지에 대한 내용을 다룬다. 코드는 다음 github repo에서 확인할 수 있다.

  • Note
    만약 모델이 BatchNorm layer를 가지고 있다면, 모델의 모든 BatchNorm layer를 SyncBatchNorm으로 바꿔줘야 한다. 이를 위한 helper function으로 pytorch는 torch.nn.SyncBatchNorm.convert_sync_batchnorm(model) 함수를 제공한다.

Diff for single_gpu.py v/s multigpu.py

imports

  • torch.multiprocessing는 Python의 native multiprocessing을 감싼 Pytorch의 wrapper이다.
  • distributed process group은 모든 통신 가능하고 동기화 가능한 process들을 담고 있다.

아래는 DDP를 위해 추가로 import한 모듈들이다.

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os 

Constructing the process group

  • process group은 TCP로 초기화 되거나 shared file-system으로 초기화 될 수 있다.
  • init_process_group 함수는 distributed process group을 초기화 해주는 함수이다.
  • torch.cuda.set_device()는 각 process에 default GPU를 설정한다. 이는 GPU:0가 중단 되거나 과도한 메모리 사용을 방지한다.
def ddp_setup(rank: int, world_size: int):
   """
   Args:
       rank: Unique identifier of each process
       world_size: Total number of processes
   """
   os.environ["MASTER_ADDR"] = "localhost"   # machine의 IP 주소. single machine이라 localhost 입력
   os.environ["MASTER_PORT"] = "12355"   # 임의의 free port number
   init_process_group(backend="nccl", rank=rank, world_size=world_size)
   torch.cuda.set_device(rank)

MASTER_ADDR은 rank0 process가 실행되는 곳의 주소이기도 하다.

Constructing the DDP model

self.model = DDP(model, device_ids=[gpu_id])

model.to(device)를 사용하던 것 대신, DDP를 사용.

Distributing input data

  • DistributedSampler는 모든 distributed processes에 input data를 잘라 넣는다.
  • 아래 코드에 따르면 각 process는 32 sample의 input batch를 받는다. 효율적인 batch size는 32 * nproc으로 계산 할 수 있다.
train_data = torch.utils.data.DataLoader(
    dataset=train_dataset,
    batch_size=32,
    shuffle=False,   # shuffle 사용 x
    sampler=DistributedSampler(train_dataset),
)
  • 매 epoch의 시작 전 DistributedSampler의 set_epoch() 함수를 호출 해야 shuffle이 수행된다.
def _run_epoch(self, epoch):
    b_sz = len(next(iter(self.train_data))[0])
    self.train_data.sampler.set_epoch(epoch)   # set_epoch 호출
    for source, targets in self.train_data:
      ...
      self._run_batch(source, targets)

Saving model checkpoints

  • ckpt 저장은 하나의 process에서만 이루어 지도록 해야한다. 그렇지 않으면 모든 process에서 동일한 모델의 ckpt 저장을 수행하게된다.
ckp = self.model.module.state_dict()   # model이 DDP 객채로 wrap되어 module로 호출해야함.
...
...
if self.gpu_id == 0 and epoch % self.save_every == 0:   # 하나의 process를 사용하도록 조건 추가
  self._save_checkpoint(epoch)
  • WARNING
    Collective calls는 모든 distributed process에서 실행되는 함수이다. 특정 process의 states나 values를 수집할 때 사용한다. Collective calls는 모든 rank에서 collective code가 실행 될 수 있어야한다.

Running the distributed training job

  • device arg를 대신하여 rank와 world_size를 추가
  • rank의 값은 mp.spaw을 호출하면 DDP에서 자동으로 할당한다.
  • world_size는 process의 수로, GPU training에서는 GPU의 수와 동일하다.
def main(rank, world_size, total_epochs, save_every):
   ddp_setup(rank, world_size)
   dataset, model, optimizer = load_train_objs()
   train_data = prepare_dataloader(dataset, batch_size=32)
   trainer = Trainer(model, train_data, optimizer, rank, save_every)
   trainer.train(total_epochs)
   destroy_process_group()   # 모든 process 종료

if __name__ == "__main__":
   import sys
   total_epochs = int(sys.argv[1])
   save_every = int(sys.argv[2])
   world_size = torch.cuda.device_count()
   mp.spawn(main, args=(world_size, total_epochs, save_every,), nprocs=world_size)

참고 문서

profile
Jr. AI Engineer

0개의 댓글

관련 채용 정보