Distributed Training - Horovod (1)

steadycode·2022년 11월 22일
0

분산 트레이닝을 구현하기 위해서는 필수적으로 알아야할 여러 framework 가 존재한다. Horovod 는 그 중 하나로서, framework-independent 하게 작동할 수 있는 general distributed training framework 중 하나이다. Horovod 의 실행 과정을 간단히 요약하면 다음과 같다. PyTorch 를 사용한 예시이다.

# pseudo-code
import torch
import horovod.torch as hvd
optimizer = torch.optim.SGD() # or Adam
optimizer = hvd.DistributedOptimizer(optimizer,
									named_parameters=model.named_parameters(),
                                    op=hvd.Average)

그렇다면 여기서 hvd.DistributedOptimizer() 가 하는 역할은 무엇일까? 본 글은 이것을 분석했다.

horovod/horovod/torch/optimizer.py

쉽게 말하자면, horovod 는 Pytorch 에서 제공하는 optimizer 를 wrapping 하는 것에 불과하다. 소스코드는 다음과 같다. 본 글은 위 소스코드를 하나하나 분석할 예정이다.

def DistributedOptimizer(optimizer, named_parameters=None,
                         compression=Compression.none,
                         backward_passes_per_step=1,
                         op=Average,
                         gradient_predivide_factor=1.0,
                         num_groups=0, groups=None,
                         sparse_as_dense=False,
                         process_set=global_process_set):
    """
    An optimizer that wraps another torch.optim.Optimizer, using an allreduce to
    combine gradient values before applying gradients to model weights.

    Allreduce operations are executed after each gradient is computed by ``loss.backward()``
    in parallel with each other. The ``step()`` method ensures that all allreduce operations are
    finished before applying gradients to the model.

    DistributedOptimizer exposes the ``synchronize()`` method, which forces allreduce operations
    to finish before continuing the execution. It's useful in conjunction with gradient
    clipping, or other operations that modify gradients in place before ``step()`` is executed.
    Make sure to use ``optimizer.skip_synchronize()`` if you're calling ``synchronize()``
    in your code.

    Example of gradient clipping:

    .. code-block:: python

        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.synchronize()
        torch.nn.utils.clip_grad_norm_(model.parameters(), args.clip)
        with optimizer.skip_synchronize():
            optimizer.step()

    Arguments:
        optimizer: Optimizer to use for computing gradients and applying updates.
        named_parameters: A mapping between parameter names and values. Used for naming of
                          allreduce operations. Typically just ``model.named_parameters()``.
        compression: Compression algorithm used during allreduce to reduce the amount
                     of data sent during the each parameter update step.  Defaults to
                     not using compression.
        backward_passes_per_step: Number of expected backward passes to perform
                                  before calling step()/synchronize(). This
                                  allows accumulating gradients over multiple
                                  mini-batches before reducing and applying them.
        op: The reduction operation to use when combining gradients across different ranks.
        gradient_predivide_factor: If op == Average, gradient_predivide_factor splits the averaging
                                   before and after the sum. Gradients are scaled by
                                   1.0 / gradient_predivide_factor before the sum and
                                   gradient_predivide_factor / size after the sum.
        num_groups: Number of groups to assign gradient allreduce ops to for explicit
                    grouping. Defaults to no explicit groups.
        groups: The parameter to group the gradient allreduce ops. Accept values is a
                non-negative integer or a list of list of torch.Tensor.
                If groups is a non-negative integer, it is the number of groups to assign
                gradient allreduce ops to for explicit grouping.
                If groups is a list of list of torch.Tensor. Tensors in the same
                inner list will be assigned to the same group, while parameter that does
                not appear in any list will form a group itself.
                Defaults as None, which is no explicit groups.
        sparse_as_dense: If set True, convert all sparse gradients to dense and perform allreduce, then
                         convert back to sparse before applying the update.
      process_set: Gradients will only be reduced over Horovod processes belonging
                   to this process set. Defaults to the global process set.
    """
    # We dynamically create a new class that inherits from the optimizer that was passed in.
    # The goal is to override the `step()` method with an allreduce implementation.
    if gradient_predivide_factor != 1.0:
        if rocm_built():
            raise ValueError('gradient_predivide_factor not supported yet with ROCm')
        if op != Average:
            raise ValueError('gradient_predivide_factor not supported with op != Average')

    if num_groups != 0:
        warnings.warn('Parameter `num_groups` has been replaced by `groups` '
                      'and will be removed in v0.23.0.', DeprecationWarning)
        if groups is None:
            groups = num_groups

    if op != Adasum or size() == 1:
        cls = type(optimizer.__class__.__name__, (optimizer.__class__,),
                   dict(_DistributedOptimizer.__dict__))
        return cls(optimizer.param_groups, named_parameters, compression, backward_passes_per_step, op,
                   gradient_predivide_factor, groups, sparse_as_dense, process_set)
    else:
        if process_set != global_process_set:
            raise NotImplementedError("Adasum does not support non-global process sets yet.")
        cls = type(optimizer.__class__.__name__, (optimizer.__class__,),
                   dict(_DistributedAdasumOptimizer.__dict__))
        return cls(optimizer.param_groups, named_parameters, compression, backward_passes_per_step)

1. comments (주석)

comment 1

    """
    An optimizer that wraps another torch.optim.Optimizer, using an allreduce to
    combine gradient values before applying gradients to model weights.

    Allreduce operations are executed after each gradient is computed by ``loss.backward()``
    in parallel with each other. The ``step()`` method ensures that all allreduce operations are
    finished before applying gradients to the model.

    DistributedOptimizer exposes the ``synchronize()`` method, which forces allreduce operations
    to finish before continuing the execution. It's useful in conjunction with gradient
    clipping, or other operations that modify gradients in place before ``step()`` is executed.
    Make sure to use ``optimizer.skip_synchronize()`` if you're calling ``synchronize()``
    in your code.
    """

comment 2

# We dynamically create a new class that inherits from the optimizer that was passed in.
# The goal is to override the `step()` method with an allreduce implementation.

먼저 주석부터 살펴보자. 특징을 요약하면 다음과 같다.

  • using an allreduce to combine gradient values before applying gradients to model ...
  • allreduce operations are executed after each gradient is computed by loss.backward()
  • the step() method ensures that all allreduce operations are finished before applying gradients to the model
  • create a new class that inherits from the optimizer that was passed in
  • the goal of the function is to override the step() method with an allreduce

다음과 같은 궁금증이 생긴다.

  • 어떻게 loss.backward()가 끝난 것을 알 수 있을까
  • step()이 어떻게 모든 allreduce가 끝나는 것을 보장할 수 있을까
  • step()을 override 한다는 것이 무엇일까

결국 목적은 새로운 class를 생성하는 것이니 return 을 살펴보면 될 것 같다.

2. return

if op != Adasum or size() == 1:
     cls = type(optimizer.__class__.__name__, (optimizer.__class__,),
                   dict(_DistributedOptimizer.__dict__))
     return cls(optimizer.param_groups, named_parameters, compression, backward_passes_per_step, op,
                   gradient_predivide_factor, groups, sparse_as_dense, process_set)
else:
     if process_set != global_process_set:
        raise NotImplementedError("Adasum does not support non-global process sets yet.")
     cls = type(optimizer.__class__.__name__, (optimizer.__class__,),
                   dict(_DistributedAdasumOptimizer.__dict__))
     return cls(optimizer.param_groups, named_parameters, compression, backward_passes_per_step)

return 부분을 살펴보니, type(name,bases,dict) 함수를 사용하여 새로운 클래스를 생성하는 것으로 확인했다. 각각의 인자는 다음과 같다.

  • name - optimizer.__class__.__name__ \rarr SGD
  • bases - optimizer.__class__,) \rarr (<class 'torch.optim.sgd.SGD'>,)
  • dict - _DistributedOptimizer.__dict__ \rarr {'__module__': 'horovod.torch.optimizer', '__init__': <function _DistributedOptimizer.__init__ at 0x7fc3d4b4d9d0>, ...

위와 같이, (1) parameter 로 전달받은 optimizer의 클래스 이름, (2) optimizer의 class, (3) horovod class 가 가지고 있는 모든 methods 와 fields 를 사용하여 새로운 클래스를 생성해낸다. 생성된 클래스를 출력해보면 다음과 같다.

<class 'horovod.torch.optimizer.SGD'>

이렇게 출력되는 이유는 class 가 객체로 생성된것이 아닌 class 자체를 print했기 때문에 그런 것이다. 객체에 넣고 출력하면 다음과 같이 나온다

SGD (
Parameter Group 0
dampening: 0
foreach: None
lr: 0.01
maximize: False
momentum: 0.5
nesterov: False
weight_decay: 0
)

추가적으로 클래스 생성자를 타고 올라가면 torch.optim.optimizer.py 에 다음과 같이 object 클래스를 상속하는 형식으로 정의 되어 있다.

class Optimizer(object):`
	...

class object():
	...
	def __repr__(self) -> str: ...  # noqa: Y029`

conclusion

추가적인 탐구는 다음 포스팅에서 다룰 예정이다.

profile
steadycode

0개의 댓글