
`
이러한 방법은 사실 학습 시간은 절감되나, 전체 메모리 사용량 자체는 증가한다. 왜냐하면 모든 GPU로 모델을 복제해야 하기 때문에 메모리 사용량 자체는 증가하기 때문이다.


| 방법 | 데이터 병렬처리 | 모델 병렬처리 |
|---|---|---|
| 병렬처리 대상 | 데이터 샘플 | 모델 파라미터 |
| 목표 | 학습 속도 가속화 | 거대모델 분할 적재 (초거대모델 학습) |
| 학습 속도 | GPU 개수에 비례 | 병목으로 인해 속도 저하 |
| 구현 난이도 | 비교적 쉬움 | 구현 및 관리가 어려움 |

단계별 설명
1. Initialization
모델을 모든 GPU에 복제하고, 데이터셋을 미니배치 단위로 나누어 각각 할당한다.
정리하자면 데이터 병렬화의 워크플로우에서 핵심은
1. Backward Pass에서 master GPU가
2. 모든 GPU들에서 계산된 gradient들을 모아서
3. 모델 복제본을 업데이트 하고,
4. Forward Pass에서 모든 GPU들이
5.master GPU의 업데이트된 모델을 복사하여 훈련하는 것이다.

DP Workflow의 문제는
Master Node에서의 병목이다. 마스터 노드 GPU가 너무 과로하는 것이다.그래서 나온 해결방안: DDP
AllReduce Operation
- 여러 디바이스에 흩어져 있는 데이터를 서로 동시에 주고 받기 위한 Collective Operation 중 하나이다.
- 여러 디바이스에 있는 데이터를 모두 모아 하나의 값으로 줄인 다음 (sum, max, min, average 등) 그 결과를 모든 디바이스에 정송한다.
Backward Pass에서 AllReduce 연산을 사용하여 모든 GPU들에서 계산된 gradient들을 동기화한 후에 각 GPU에서 독립적으로 모델 weights를 업데이트하여 모델을 훈련시킨다.

단계적으로 DDP를 설명하자면 아래와 같음
DP 방식과 Initialization 단계 자체는 동일함.
DP의 경우에 각각에 대한 Forward Pass 연산을 진행하고 그 도출된 Logits 값들을 마스터 노드에서 통합하여 Loss를 계산한 뒤에 다시 각 노드들에 나눠줬다면, DDP에서는 모델에 대한 카피 후에 나온 각 독립적인 Logits들을 알아서 각각의 노드에서 계산하게 됨.과정을 다시 요약하자면 아래와 같음
바로 이 지점이 DP와 다른 점, 마스터 GPU가 Logits들을 모으는 과정이 없음AllReduce연산을 거쳐 averaged gradients 값을 구하는 데에 사용되고이 과정에서 모든 GPU는 똑같은 averaged gradients 값을 가지게 되고 synchronized gradients 즉, 각 GPU의 가중치가 이때 다시 동기화가 된다Local gradients >> AllReduce 연산 >> averaged gradients(synchronized gradients) >> 이 gradients를 가지고 저마다 GPU에서 처음에 initialized 된 모델에 대해 weights 업데이트 실시>> 동일한 wieghts로 업데이트가 된 바, 이미 모델 또한 synchronized 되었다.)
| 라이브러리 | DP (DataParallel) | DDP (DistributedDataParallel) |
|---|---|---|
| Gradient 동기화 | master GPU가 모두 모아 평균내어 모델을 업데이트 | AllReduce 연산을 사용하여 모든 GPU가 동시에 업데이트 |
| 모델 복제 | master GPU의 모델을 매 iteration마다 복제 | 모든 GPU가 Initialization 단계에서 한 번만 복제 |
| 통신 오버헤드 | master GPU에 리소스가 몰려 통신비용 매우 높음 | master GPU가 없어 통신비용 낮음 |
| 장점 | 구현이 쉬움 | 효율적인 통신으로 학습 속도 증가 |
| 단점 | 병목 현상 가능성 | 구현이 상대적으로 복잡 |
global)서버내에서 구별)
| Host | GPU | Local Rank | Rank |
|---|---|---|---|
| host0 | GPU-0 | 0 | 0 |
| host0 | GPU-1 | 1 | 1 |
| host1 | GPU-0 | 0 | 2 |
| host1 | GPU-1 | 1 | 3 |
이 표는 두 개의 호스트와 각각 두 개의 GPU를 가진 분산 데이터 병렬(DDP) 설정을 나타낸다.
각 GPU는 분산 시스템 내에서의 위치를 나타내는 로컬 순위 및 글로벌 순위를 할당받는다.

ddp_setup 함수 추가
import torch
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
def ddp_setup(rank, world_size, gpu_list):
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
torch.cuda.set_device(gpu_list[rank])
init_process_group(backend="nccl", rank=rank, world_size=world_size)
DistributedSampler()를 사용하여 기존 DataLoader를 교체한다
이는 각 GPU에 다른 데이터가 들어가도록 분배하는 과정에 해당한다.
데이터를 GPU에 균등하게 나누는 작업으로써 이 샘플러 클래스가 해당 작업을 수행해준다.
셔플링 같은걸 할때 순서가 꼬일 수 있는데 그걸 방지할 수 있도록 Random Seed를 설정해서 데이터 순서를 동기화하거나, 각 GPU들을 동기화 하여 각 프로세스가 자기할당된 데이터만 처리를 하고 서로 다른 프로세스 안에서는 데이터중복이 일어나지 않도록 보장하는 역할을 이
DistributedSampler()가 수행해줌.
def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True if torch.cuda.is_available() else False,
shuffle=True
)
def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True if torch.cuda.is_available() else False,
shuffle=False,
sampler=DistributedSampler(datset)
)
각 GPU에서 계산된 gradients들을 동기화하기 위해 모델을 DDP로 wrapping 해줌
class Trainer:
def __init__(self, model, train_data, optimizer, gpu_id):
self.model = model.to(gpu_id)
class Trainer:
def __init__(self, model, train_data, optimizer, gpu_id, gpu_list):
self.model = DDP(model, device_ids = [gpu_id] # wrapmodel with DDP
def _run_epoch(self, epoch, max_epochs):
self.train_data_sampler.set_epoch(epoch) # set epoch for DistributedSampler
multiprocessing spawn 추가
def main(device, total_epochs, batch_size):
dataset, model, optimizer = load_train_objs()
train_data = prepare_dataloader(dataset, batch_size)
trainer = Trainer(model, train_data, optimizer, device)
trainer.train(total_epochs)
trainer.plot_loss()
def main(rank, world_size, total_epochs, batch_size, selected_gpus):
actual_gpu_id = selected_gpus[rank]
ddp_setup(rank, world_size, selected_gpus)
dataset, model, optimizer = load_train_objs()
train_data = prepare_dataloader(dataset, batch_size)
trainer = Trainer(model, train_data, optimizer, actual_gpu_id, selected_gpus)
trainer.train(total_epochs)
destroy_process_group()
if __name__ == "__main__":
# Determine which GPUs to use
if args.gpus is not None:
available_gpus = torch.cuda.device_count()
for gpu in args.gpus:
if gpu >= available_gpus:
raise ValueError(f"GPU {gpu} is not available. Only {available_gpus} GPUs are present.")
gpu_list = args.gpus # Set gpu_list to specified GPUs
world_size = len(args.gpus)
else:
# Use all available GPUs
gpu_list = list(range(torch.cuda.device_count()))
world_size = len(gpu_list)
mp.spawn(
main,
nprocs=world_size,
args=(world_size, args.total_epochs, args.batch_size, gpu_list),
join=True
)
추가적으로
argument addiction으로 사용하려는 특정 GPU들을 지정할 수 있도록 파서를 받을 수 있다.
parser.add_argument('--gpus', nargs='+', type=int, default=None, help='Specific GPU IDs to use. If not specified, uses all available GPUs.')
여러 GPU간에 데이터를 분할하거나 모델 자체를 분할하여 여러 GPU에 걸쳐 훈련 프로세스를 병렬(Parallelism)화하는 학습 기법
주요 병렬화 방식:
일반적인 Data Parallelism (DP)
Distributed Data Parallelism (DDP)