분산 트레이닝을 구현하기 위해서는 필수적으로 알아야할 여러 framework 가 존재한다. Horovod 는 그 중 하나로서, framework-independent 하게 작동할 수 있는 general distributed training framework 중 하나이다. Horovod 의 실행 과정을 간단히 요약하면 다음과 같다. PyTorch 를 사용한 예시이다.
# pseudo-code
import torch
import horovod.torch as hvd
optimizer = torch.optim.SGD() # or Adam
optimizer = hvd.DistributedOptimizer(optimizer,
named_parameters=model.named_parameters(),
op=hvd.Average)
그렇다면 여기서 hvd.DistributedOptimizer()
가 하는 역할은 무엇일까? 본 글은 이것을 분석했다.
horovod/horovod/torch/optimizer.py
쉽게 말하자면, horovod 는 Pytorch 에서 제공하는 optimizer 를 wrapping 하는 것에 불과하다. 소스코드는 다음과 같다. 본 글은 위 소스코드를 하나하나 분석할 예정이다.
def DistributedOptimizer(optimizer, named_parameters=None,
compression=Compression.none,
backward_passes_per_step=1,
op=Average,
gradient_predivide_factor=1.0,
num_groups=0, groups=None,
sparse_as_dense=False,
process_set=global_process_set):
"""
An optimizer that wraps another torch.optim.Optimizer, using an allreduce to
combine gradient values before applying gradients to model weights.
Allreduce operations are executed after each gradient is computed by ``loss.backward()``
in parallel with each other. The ``step()`` method ensures that all allreduce operations are
finished before applying gradients to the model.
DistributedOptimizer exposes the ``synchronize()`` method, which forces allreduce operations
to finish before continuing the execution. It's useful in conjunction with gradient
clipping, or other operations that modify gradients in place before ``step()`` is executed.
Make sure to use ``optimizer.skip_synchronize()`` if you're calling ``synchronize()``
in your code.
Example of gradient clipping:
.. code-block:: python
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.synchronize()
torch.nn.utils.clip_grad_norm_(model.parameters(), args.clip)
with optimizer.skip_synchronize():
optimizer.step()
Arguments:
optimizer: Optimizer to use for computing gradients and applying updates.
named_parameters: A mapping between parameter names and values. Used for naming of
allreduce operations. Typically just ``model.named_parameters()``.
compression: Compression algorithm used during allreduce to reduce the amount
of data sent during the each parameter update step. Defaults to
not using compression.
backward_passes_per_step: Number of expected backward passes to perform
before calling step()/synchronize(). This
allows accumulating gradients over multiple
mini-batches before reducing and applying them.
op: The reduction operation to use when combining gradients across different ranks.
gradient_predivide_factor: If op == Average, gradient_predivide_factor splits the averaging
before and after the sum. Gradients are scaled by
1.0 / gradient_predivide_factor before the sum and
gradient_predivide_factor / size after the sum.
num_groups: Number of groups to assign gradient allreduce ops to for explicit
grouping. Defaults to no explicit groups.
groups: The parameter to group the gradient allreduce ops. Accept values is a
non-negative integer or a list of list of torch.Tensor.
If groups is a non-negative integer, it is the number of groups to assign
gradient allreduce ops to for explicit grouping.
If groups is a list of list of torch.Tensor. Tensors in the same
inner list will be assigned to the same group, while parameter that does
not appear in any list will form a group itself.
Defaults as None, which is no explicit groups.
sparse_as_dense: If set True, convert all sparse gradients to dense and perform allreduce, then
convert back to sparse before applying the update.
process_set: Gradients will only be reduced over Horovod processes belonging
to this process set. Defaults to the global process set.
"""
# We dynamically create a new class that inherits from the optimizer that was passed in.
# The goal is to override the `step()` method with an allreduce implementation.
if gradient_predivide_factor != 1.0:
if rocm_built():
raise ValueError('gradient_predivide_factor not supported yet with ROCm')
if op != Average:
raise ValueError('gradient_predivide_factor not supported with op != Average')
if num_groups != 0:
warnings.warn('Parameter `num_groups` has been replaced by `groups` '
'and will be removed in v0.23.0.', DeprecationWarning)
if groups is None:
groups = num_groups
if op != Adasum or size() == 1:
cls = type(optimizer.__class__.__name__, (optimizer.__class__,),
dict(_DistributedOptimizer.__dict__))
return cls(optimizer.param_groups, named_parameters, compression, backward_passes_per_step, op,
gradient_predivide_factor, groups, sparse_as_dense, process_set)
else:
if process_set != global_process_set:
raise NotImplementedError("Adasum does not support non-global process sets yet.")
cls = type(optimizer.__class__.__name__, (optimizer.__class__,),
dict(_DistributedAdasumOptimizer.__dict__))
return cls(optimizer.param_groups, named_parameters, compression, backward_passes_per_step)
1. comments (주석)
comment 1
"""
An optimizer that wraps another torch.optim.Optimizer, using an allreduce to
combine gradient values before applying gradients to model weights.
Allreduce operations are executed after each gradient is computed by ``loss.backward()``
in parallel with each other. The ``step()`` method ensures that all allreduce operations are
finished before applying gradients to the model.
DistributedOptimizer exposes the ``synchronize()`` method, which forces allreduce operations
to finish before continuing the execution. It's useful in conjunction with gradient
clipping, or other operations that modify gradients in place before ``step()`` is executed.
Make sure to use ``optimizer.skip_synchronize()`` if you're calling ``synchronize()``
in your code.
"""
comment 2
# We dynamically create a new class that inherits from the optimizer that was passed in.
# The goal is to override the `step()` method with an allreduce implementation.
먼저 주석부터 살펴보자. 특징을 요약하면 다음과 같다.
loss.backward()
step()
method ensures that all allreduce operations are finished before applying gradients to the modelstep()
method with an allreduce다음과 같은 궁금증이 생긴다.
loss.backward()
가 끝난 것을 알 수 있을까step()
이 어떻게 모든 allreduce가 끝나는 것을 보장할 수 있을까step()
을 override 한다는 것이 무엇일까결국 목적은 새로운 class
를 생성하는 것이니 return 을 살펴보면 될 것 같다.
2. return
if op != Adasum or size() == 1:
cls = type(optimizer.__class__.__name__, (optimizer.__class__,),
dict(_DistributedOptimizer.__dict__))
return cls(optimizer.param_groups, named_parameters, compression, backward_passes_per_step, op,
gradient_predivide_factor, groups, sparse_as_dense, process_set)
else:
if process_set != global_process_set:
raise NotImplementedError("Adasum does not support non-global process sets yet.")
cls = type(optimizer.__class__.__name__, (optimizer.__class__,),
dict(_DistributedAdasumOptimizer.__dict__))
return cls(optimizer.param_groups, named_parameters, compression, backward_passes_per_step)
return
부분을 살펴보니, type(name,bases,dict)
함수를 사용하여 새로운 클래스를 생성하는 것으로 확인했다. 각각의 인자는 다음과 같다.
optimizer.__class__.__name__
SGD
optimizer.__class__,)
(<class 'torch.optim.sgd.SGD'>,)
_DistributedOptimizer.__dict__
{'__module__': 'horovod.torch.optimizer', '__init__': <function _DistributedOptimizer.__init__ at 0x7fc3d4b4d9d0>, ...
위와 같이, (1) parameter 로 전달받은 optimizer의 클래스 이름, (2) optimizer의 class, (3) horovod class 가 가지고 있는 모든 methods 와 fields 를 사용하여 새로운 클래스를 생성해낸다. 생성된 클래스를 출력해보면 다음과 같다.
<class 'horovod.torch.optimizer.SGD'>
이렇게 출력되는 이유는 class 가 객체로 생성된것이 아닌 class 자체를 print했기 때문에 그런 것이다. 객체에 넣고 출력하면 다음과 같이 나온다
SGD (
Parameter Group 0
dampening: 0
foreach: None
lr: 0.01
maximize: False
momentum: 0.5
nesterov: False
weight_decay: 0
)
추가적으로 클래스 생성자를 타고 올라가면 torch.optim.optimizer.py
에 다음과 같이 object 클래스를 상속하는 형식으로 정의 되어 있다.
class Optimizer(object):`
...
class object():
...
def __repr__(self) -> str: ... # noqa: Y029`
추가적인 탐구는 다음 포스팅에서 다룰 예정이다.