이전 포스팅에서 horovod 에서 제공하는 function 의 목적과 리턴값을 살펴보았다.
그리고 다음과 같은 궁금증이 생겼다.
본 포스팅은 이 궁금증을 해결하기 위해 쓰여졌다. 작성하다 보니 글이 길어져 (2), (3) 으로 나누어서 포스팅 할 예정이다.
horovod/horovod/torch/optimizer.py
이 소스코드는 크게 세가지로 이루어져 있는데, 첫번째는 _DistributedOptimizer()
클래스이며, 두번째는 이 클래스를 리턴하는 DistributedOptimizer()
함수이다. 알다시피 첫번째 포스팅에서 다뤘다. 세번째는 _DistributedAdasumOptimizer()
클래스인데, 나중에 기회가 있으면 다뤄보도록 하겠다.
class _DistributedOptimizer(torch.optim.Optimizer)
앞서 구술했듯, 위 클래스의 목적은 step()
함수를 override 하는 것이다. 따라서 __init__()
함수 다음으로 step()
함수를 살펴보겠다. 포스팅을 하다 보니 글이 길어 step()
함수는 다음 포스팅에서 다룰 예정이다.
def __init__()
클래스를 처음 생성할 때 실행되는 함수이다. 소스코드를 분석하기에 앞서 함수에 사용되는 파라미터를 분석해보겠다.
params
위 값은 optimizer.param_groups
값과 동일하다. param_group
은 optimizer 가 처음 생성될 때 전달받은 model.paramters()
가 list 형태로 저장된 값이다. 함수는 다음과 같다.
def parameters(self, recurse: bool = True) -> Iterator[Parameter]:
r"""Returns an iterator over module parameters.
This is typically passed to an optimizer.
Args:
recurse (bool): if True, then yields parameters of this module
and all submodules. Otherwise, yields only parameters that
are direct members of this module.
Yields:
Parameter: module parameter
Example::
>>> for param in model.parameters():
>>> print(type(param), param.size())
<class 'torch.Tensor'> (20L,)
<class 'torch.Tensor'> (20L, 1L, 5L, 5L)
"""
for name, param in self.named_parameters(recurse=recurse):
yield param
named_parameters
위 값은 기존의 optimizer와 달리, horovod distributed optimizer 를 생성할 때 '추가로' 전달받는 model.named_parameter()
의 값이다. 함수는 다음과 같다.
def named_parameters(self, prefix: str = '', recurse: bool = True) -> Iterator[Tuple[str, Parameter]]:
r"""Returns an iterator over module parameters, yielding both the
name of the parameter as well as the parameter itself.
Args:
prefix (str): prefix to prepend to all parameter names.
recurse (bool): if True, then yields parameters of this module
and all submodules. Otherwise, yields only parameters that
are direct members of this module.
Yields:
(string, Parameter): Tuple containing the name and parameter
Example::
>>> for name, param in self.named_parameters():
>>> if name in ['bias']:
>>> print(param.size())
"""
gen = self._named_members(
lambda module: module._parameters.items(),
prefix=prefix, recurse=recurse)
for elem in gen:
yield elem
compression
위 값은 distributed optimizer가 allreduce algorithm 을 실행할 때 데이터 크기를 줄이기 위해서 compression 을 할지 결정하는 값이다. 기본 값은 False 이다.
backward_passes_per_step=1
위 값은 step()
과synchronize()
함수를 호출하기 전 몇개의 backward pass 를 허용할 것인지에 대한 값이다. 보통 gradient를 구한 뒤 바로 communication 을 진행하므로 기본값은 1이다.
op=Average
위 값은, gradient를 네트워크를 통해 주고 받을 때 사용하는 operation 이다. gradient 가 Nx1 차원의 vector라고 할 때, 분산 트레이닝에 사용되는 GPUs (= N) 들에게서 동일한 차원의 gradient가 총 N개 생성될 것이다. 그 때, gradient를 NxN 차원으로 나열하는 것이 아닌, 더하든 빼든 평균을 구하는 등의 연산이 필요하다. 기본값은 average 연산이다.
gradient_predivide_factor=1.0
만약 average 연산을 사용한다면, 위 값은 평균 연산을 진행하기 전 gradient의 값을 위 값으로 나눈다. 그리고 평균 연산 이후 다시 값을 키운다. 위 값을 왜 사용하느냐 생각해보았는데, 아마 자료형으로 FP16을 사용했을 경우, 표현할 수 있는 수의 범위가 그렇게 크지 않기 때문에 sum 연산을 진행했을 때 결과 값이 FP16이 표현할 수 있는 범위를 넘어설 수 있기 때문에 마련한 것 같다. 만약 값이 100이고 GPU 가 만개라면 그 때의 평균값은 100 x 10000 / 10000 일텐데, 이때를 대비하여 설정한 값인 것 같다.
groups=None
allreduce algorithm을 grouping 하는 용도로 사용된다. 쉽게 말해 32개에서 분산 학습을 진행하는데, 16개 씩 allreduce를 하기 위해서는 group=16
이 되어야 한다. 허용되는 값은 non-negative integer
혹은 list of list of torch.Tensor
이다. 만약 전자의 값이면 주어진 값을 활용하여 group을 만들고, 만약 후자의 값이면 동일한 inner list
에 존재하는 tensor 가 같은 group으로 취급된다. inner list
가 약간 추상적으로 들리는데, 더 분석은 진행하지 않겠다. 보통은 None
값으로 둔다.
sparse_as_dense=False
위 값을 True로 설정하면, sparse 한 gradient를 통신할 때, dense 하게 만들어 통신한 뒤 update 를 진행하기 직전 다시 sparse로 만든다. sparse 와 dense 의 차이점은 추후 포스팅에서 다뤄볼 예정이다.
process_set=global_process_set
gradient는 정해진 process set
에서만 통신된다. 크게 신경쓸 일은 없지만 global_process_set
에 대해 추가적으로 알아보겠다. 위 값은 다음과 같이 mpi_ops에서 import 한다.
from horovod.torch.mpi_ops import ProcessSet, global_process_set
위 global_process_set
의 값은 ProcessSet()
이라는 클래스로 정의한다.
global_process_set = ProcessSet([])
ProcessSet()
은 다음과 같이 정의되어 있다.
horovod/horovod/common/process_sets.py
class ProcessSet:
""" Representation of a set of Horovod processes that will run collective operations together
Initialize a ProcessSet with a list of process ranks or an MPI communicator. Then pass this instance to hvd.init()
or hvd.add_process_set(). If a valid process set has been initialized, process_set_id will be set to a numeric
value.
"""
process_set_id = None
ranks = None
mpi_comm = None
def __init__(self, ranks_or_comm: Union[Sequence[int], MPI.Comm]):
if is_iterable(ranks_or_comm):
ranks_or_comm = sorted(ranks_or_comm)
if any(not isinstance(rk, int) for rk in ranks_or_comm):
raise ValueError(
"ProcessSet should be initialized with a list of process ranks or an mpi4py Comm object")
self.ranks = ranks_or_comm
else:
assert _basics is not None, "process_sets._setup() must be called first"
if not _basics.mpi_built():
raise ValueError(
"Apparently you tried to build a ProcessSet from an MPI communicator, "
"but Horovod has not been built with MPI support. Ensure MPI is installed and "
"reinstall Horovod with HOROVOD_WITH_MPI=1 to debug the build error.")
from mpi4py import MPI
if not isinstance(ranks_or_comm, MPI.Comm):
raise ValueError(
"ProcessSet should be initialized with a list of process ranks or an mpi4py Comm object")
self.mpi_comm = ranks_or_comm
def _invalidate(self):
self.process_set_id = None
def size(self) -> Optional[int]:
""" Return size of the process set or None if not initialized. """
if self.process_set_id is None:
return None
return _basics._process_set_size(self.process_set_id)
def rank(self) -> Optional[int]:
""" Return rank relative to this process set or None if not initialized.
This is useful, e.g., to process the result of hvd.allgather().
Please note that, even with process sets, Horovod operations like hvd.broadcast() are not parameterized by this
relative rank, but by the global rank as obtained from hvd.rank().
"""
if self.process_set_id is None:
return None
return _basics._process_set_rank(self.process_set_id)
def included(self) -> Optional[bool]:
""" Return whether the current process is part of this process set or None if not initialized. """
if self.ranks is None:
return None
return _basics.rank() in self.ranks
def __str__(self) -> str:
return f"ProcessSet(process_set_id={self.process_set_id}, ranks={self.ranks}, mpi_comm={self.mpi_comm})"
이해가 어려워 출력을 해보았는데, default 값이 미리 horovod.init()
으로 정의되는 듯 했다.
def __init__()
다시 돌아와, def __init__()
의 코드 구성을 하나하나 분석해보겠다.
code block 1
아래 코드는 super 오브젝트인 torch.optim.Optimizer
를 param
값을 활용하여 초기화 시켜주는 단계이다. 추가적으로 compression 유무도 초기화시켰다.
def __init__(self, params, named_parameters, compression,
backward_passes_per_step=1, op=Average,
gradient_predivide_factor=1.0,
groups=None,
sparse_as_dense=False,
process_set=global_process_set):
super(self.__class__, self).__init__(params)
self._compression = compression
code block 2
아래 코드는 '추가적으로' 전달받은 named_parameter 값을 활용하여 list
형태로 저장하는 단계이다. 만약 None
값을 전달받았으면, param
값을 활용하여 초기화 시킨 self.param_groups
값으로 임의의 값을 생성한다. 그리고 name_parameter
값을 초기화 했으면 값이 tuple
형태를 갖는지 확인하고, duplicate된 값이 없는지 검사한다.
if named_parameters is not None:
named_parameters = list(named_parameters)
else:
named_parameters = [(f'allreduce.noname.{i}.{j}', v)
for i, param_group in enumerate(self.param_groups)
for j, v in enumerate(param_group['params'])]
# make sure that named_parameters are tuples
if any([not isinstance(p, tuple) for p in named_parameters]):
raise ValueError('named_parameters should be a sequence of '
'tuples (name, parameter), usually produced by '
'model.named_parameters().')
dups = _DistributedOptimizer.find_duplicates([k for k, _ in named_parameters])
if len(dups) > 0:
raise ValueError('Parameter names in named_parameters must be unique. '
'Found duplicates: %s' % ', '.join(dups))
code block 3
unnamed 된 파라미터를 검사한다. self.param_groups
값을 활용하여 all_param_id를 만들고, named_parameter로 다시 named_param_id 를 만든다. 그리고 만약 이름이 정해지지 않은 파미터 있을 경우, model 이 갖는 파라미터에 오류가 있다고 판단. 에러를 발생시킨다.
all_param_ids = {id(v)
for param_group in self.param_groups
for v in param_group['params']}
named_param_ids = {id(v) for k, v in named_parameters}
unnamed_param_ids = all_param_ids - named_param_ids
if len(unnamed_param_ids):
raise ValueError('named_parameters was specified, but one or more model '
'parameters were not named. Python object ids: '
'%s' % ', '.join(str(id) for id in unnamed_param_ids))
code block 4
전달받은 나머지 파라미터를 활용하여 오브젝트 변수를 초기화 한다.
self._parameter_names = {v: k for k, v in sorted(named_parameters)}
self.backward_passes_per_step = backward_passes_per_step
self._allreduce_delay = {v: self.backward_passes_per_step
for _, v in sorted(named_parameters)}
self.op = op
self.gradient_predivide_factor = gradient_predivide_factor
self.sparse_as_dense = sparse_as_dense
self.process_set = process_set
self._handles = {}
self._grad_accs = []
self._requires_update = set()
self._synchronized = False
self._should_synchronize = True
if groups is not None:
if not (isinstance(groups, list) or groups > 0):
raise ValueError('groups should be a non-negative integer or '
'a list of list of torch.Tensor.')
if isinstance(groups, list):
grouped_parameter_ids = set()
for l in groups:
for p in l:
if not isinstance(p, torch.Tensor):
raise ValueError('groups must consist of torch.Tensor.')
if id(p) in grouped_parameter_ids:
raise ValueError('A parameter can only appear once in groups.')
grouped_parameter_ids.add(id(p))
self._groups = groups
self._p_to_group = {}
self._group_counts = {}
code block 5
아래 코드 블럭이 눈여겨볼만 하다. 만약 process set이 현재 global process 에 포함되어 있고, horovod process가 1개 이상이면 self._register_hook()
을 호출한다.
if self.process_set.included() and (size() > 1 or os.environ.get('HOROVOD_ELASTIC') == '1'):
self._register_hooks()
별도로 hook()
을 등록한다는 것이 흥미로웠다. 다음 포스팅에 _register_hook()
에 대해 추가적으로 다룰 예정입니다.