Distributed Training - Horovod (2)

steadycode·2022년 11월 23일
0

이전 포스팅에서 horovod 에서 제공하는 function 의 목적과 리턴값을 살펴보았다.

  • using an allreduce to combine gradient values before applying gradients to model ...
  • allreduce operations are executed after each gradient is computed by loss.backward()
  • the step() method ensures that all allreduce operations are finished before applying gradients to the model
  • create a new class that inherits from the optimizer that was passed in the goal of the function is to override the step() method with an allreduce

그리고 다음과 같은 궁금증이 생겼다.

  • 어떻게 loss.backward()가 끝난 것을 알 수 있을까
  • step()이 어떻게 모든 allreduce가 끝나는 것을 보장할 수 있을까
  • step()을 override 한다는 것이 무엇일까

본 포스팅은 이 궁금증을 해결하기 위해 쓰여졌다. 작성하다 보니 글이 길어져 (2), (3) 으로 나누어서 포스팅 할 예정이다.


horovod/horovod/torch/optimizer.py

이 소스코드는 크게 세가지로 이루어져 있는데, 첫번째는 _DistributedOptimizer() 클래스이며, 두번째는 이 클래스를 리턴하는 DistributedOptimizer() 함수이다. 알다시피 첫번째 포스팅에서 다뤘다. 세번째는 _DistributedAdasumOptimizer() 클래스인데, 나중에 기회가 있으면 다뤄보도록 하겠다.


class _DistributedOptimizer(torch.optim.Optimizer)

앞서 구술했듯, 위 클래스의 목적은 step() 함수를 override 하는 것이다. 따라서 __init__() 함수 다음으로 step() 함수를 살펴보겠다. 포스팅을 하다 보니 글이 길어 step() 함수는 다음 포스팅에서 다룰 예정이다.


def __init__()

클래스를 처음 생성할 때 실행되는 함수이다. 소스코드를 분석하기에 앞서 함수에 사용되는 파라미터를 분석해보겠다.

  • params

위 값은 optimizer.param_groups 값과 동일하다. param_group은 optimizer 가 처음 생성될 때 전달받은 model.paramters()가 list 형태로 저장된 값이다. 함수는 다음과 같다.

def parameters(self, recurse: bool = True) -> Iterator[Parameter]:
        r"""Returns an iterator over module parameters.

        This is typically passed to an optimizer.

        Args:
            recurse (bool): if True, then yields parameters of this module
                and all submodules. Otherwise, yields only parameters that
                are direct members of this module.

        Yields:
            Parameter: module parameter

        Example::

            >>> for param in model.parameters():
            >>>     print(type(param), param.size())
            <class 'torch.Tensor'> (20L,)
            <class 'torch.Tensor'> (20L, 1L, 5L, 5L)

        """
        for name, param in self.named_parameters(recurse=recurse):
            yield param
  • named_parameters

위 값은 기존의 optimizer와 달리, horovod distributed optimizer 를 생성할 때 '추가로' 전달받는 model.named_parameter()의 값이다. 함수는 다음과 같다.

def named_parameters(self, prefix: str = '', recurse: bool = True) -> Iterator[Tuple[str, Parameter]]:
        r"""Returns an iterator over module parameters, yielding both the
        name of the parameter as well as the parameter itself.

        Args:
            prefix (str): prefix to prepend to all parameter names.
            recurse (bool): if True, then yields parameters of this module
                and all submodules. Otherwise, yields only parameters that
                are direct members of this module.

        Yields:
            (string, Parameter): Tuple containing the name and parameter

        Example::

            >>> for name, param in self.named_parameters():
            >>>    if name in ['bias']:
            >>>        print(param.size())

        """
        gen = self._named_members(
            lambda module: module._parameters.items(),
            prefix=prefix, recurse=recurse)
        for elem in gen:
            yield elem
  • compression

위 값은 distributed optimizer가 allreduce algorithm 을 실행할 때 데이터 크기를 줄이기 위해서 compression 을 할지 결정하는 값이다. 기본 값은 False 이다.

  • backward_passes_per_step=1

위 값은 step()synchronize() 함수를 호출하기 전 몇개의 backward pass 를 허용할 것인지에 대한 값이다. 보통 gradient를 구한 뒤 바로 communication 을 진행하므로 기본값은 1이다.

  • op=Average

위 값은, gradient를 네트워크를 통해 주고 받을 때 사용하는 operation 이다. gradient 가 Nx1 차원의 vector라고 할 때, 분산 트레이닝에 사용되는 GPUs (= N) 들에게서 동일한 차원의 gradient가 총 N개 생성될 것이다. 그 때, gradient를 NxN 차원으로 나열하는 것이 아닌, 더하든 빼든 평균을 구하는 등의 연산이 필요하다. 기본값은 average 연산이다.

  • gradient_predivide_factor=1.0

만약 average 연산을 사용한다면, 위 값은 평균 연산을 진행하기 전 gradient의 값을 위 값으로 나눈다. 그리고 평균 연산 이후 다시 값을 키운다. 위 값을 왜 사용하느냐 생각해보았는데, 아마 자료형으로 FP16을 사용했을 경우, 표현할 수 있는 수의 범위가 그렇게 크지 않기 때문에 sum 연산을 진행했을 때 결과 값이 FP16이 표현할 수 있는 범위를 넘어설 수 있기 때문에 마련한 것 같다. 만약 값이 100이고 GPU 가 만개라면 그 때의 평균값은 100 x 10000 / 10000 일텐데, 이때를 대비하여 설정한 값인 것 같다.

  • groups=None

allreduce algorithm을 grouping 하는 용도로 사용된다. 쉽게 말해 32개에서 분산 학습을 진행하는데, 16개 씩 allreduce를 하기 위해서는 group=16이 되어야 한다. 허용되는 값은 non-negative integer 혹은 list of list of torch.Tensor이다. 만약 전자의 값이면 주어진 값을 활용하여 group을 만들고, 만약 후자의 값이면 동일한 inner list 에 존재하는 tensor 가 같은 group으로 취급된다. inner list가 약간 추상적으로 들리는데, 더 분석은 진행하지 않겠다. 보통은 None 값으로 둔다.

  • sparse_as_dense=False

위 값을 True로 설정하면, sparse 한 gradient를 통신할 때, dense 하게 만들어 통신한 뒤 update 를 진행하기 직전 다시 sparse로 만든다. sparse 와 dense 의 차이점은 추후 포스팅에서 다뤄볼 예정이다.

  • process_set=global_process_set

gradient는 정해진 process set 에서만 통신된다. 크게 신경쓸 일은 없지만 global_process_set에 대해 추가적으로 알아보겠다. 위 값은 다음과 같이 mpi_ops에서 import 한다.

from horovod.torch.mpi_ops import ProcessSet, global_process_set

global_process_set의 값은 ProcessSet() 이라는 클래스로 정의한다.

global_process_set = ProcessSet([])

ProcessSet()은 다음과 같이 정의되어 있다.

horovod/horovod/common/process_sets.py

class ProcessSet:
    """ Representation of a set of Horovod processes that will run collective operations together

    Initialize a ProcessSet with a list of process ranks or an MPI communicator. Then pass this instance to hvd.init()
    or hvd.add_process_set(). If a valid process set has been initialized, process_set_id will be set to a numeric
    value.
    """
    process_set_id = None
    ranks = None
    mpi_comm = None

    def __init__(self, ranks_or_comm: Union[Sequence[int], MPI.Comm]):
        if is_iterable(ranks_or_comm):
            ranks_or_comm = sorted(ranks_or_comm)
            if any(not isinstance(rk, int) for rk in ranks_or_comm):
                raise ValueError(
                    "ProcessSet should be initialized with a list of process ranks or an mpi4py Comm object")
            self.ranks = ranks_or_comm
        else:
            assert _basics is not None, "process_sets._setup() must be called first"
            if not _basics.mpi_built():
                raise ValueError(
                    "Apparently you tried to build a ProcessSet from an MPI communicator, "
                    "but Horovod has not been built with MPI support. Ensure MPI is installed and "
                    "reinstall Horovod with HOROVOD_WITH_MPI=1 to debug the build error.")
            from mpi4py import MPI
            if not isinstance(ranks_or_comm, MPI.Comm):
                raise ValueError(
                    "ProcessSet should be initialized with a list of process ranks or an mpi4py Comm object")
            self.mpi_comm = ranks_or_comm

    def _invalidate(self):
        self.process_set_id = None

    def size(self) -> Optional[int]:
        """ Return size of the process set or None if not initialized. """
        if self.process_set_id is None:
            return None
        return _basics._process_set_size(self.process_set_id)

    def rank(self) -> Optional[int]:
        """ Return rank relative to this process set or None if not initialized.

        This is useful, e.g., to process the result of hvd.allgather().

        Please note that, even with process sets, Horovod operations like hvd.broadcast() are not parameterized by this
        relative rank, but by the global rank as obtained from hvd.rank().
        """
        if self.process_set_id is None:
            return None
        return _basics._process_set_rank(self.process_set_id)


    def included(self) -> Optional[bool]:
        """ Return whether the current process is part of this process set or None if not initialized. """
        if self.ranks is None:
            return None
        return _basics.rank() in self.ranks

    def __str__(self) -> str:
        return f"ProcessSet(process_set_id={self.process_set_id}, ranks={self.ranks}, mpi_comm={self.mpi_comm})"

이해가 어려워 출력을 해보았는데, default 값이 미리 horovod.init()으로 정의되는 듯 했다.


def __init__()

다시 돌아와, def __init__() 의 코드 구성을 하나하나 분석해보겠다.

code block 1

아래 코드는 super 오브젝트인 torch.optim.Optimizerparam 값을 활용하여 초기화 시켜주는 단계이다. 추가적으로 compression 유무도 초기화시켰다.

def __init__(self, params, named_parameters, compression,
                 backward_passes_per_step=1, op=Average,
                 gradient_predivide_factor=1.0,
                 groups=None,
                 sparse_as_dense=False,
                 process_set=global_process_set):
        super(self.__class__, self).__init__(params)
        self._compression = compression

code block 2

아래 코드는 '추가적으로' 전달받은 named_parameter 값을 활용하여 list 형태로 저장하는 단계이다. 만약 None 값을 전달받았으면, param 값을 활용하여 초기화 시킨 self.param_groups 값으로 임의의 값을 생성한다. 그리고 name_parameter 값을 초기화 했으면 값이 tuple 형태를 갖는지 확인하고, duplicate된 값이 없는지 검사한다.

        if named_parameters is not None:
            named_parameters = list(named_parameters)
        else:
            named_parameters = [(f'allreduce.noname.{i}.{j}', v)
                                for i, param_group in enumerate(self.param_groups)
                                for j, v in enumerate(param_group['params'])]
                                
        # make sure that named_parameters are tuples
        if any([not isinstance(p, tuple) for p in named_parameters]):
            raise ValueError('named_parameters should be a sequence of '
                             'tuples (name, parameter), usually produced by '
                             'model.named_parameters().')

        dups = _DistributedOptimizer.find_duplicates([k for k, _ in named_parameters])
        if len(dups) > 0:
            raise ValueError('Parameter names in named_parameters must be unique. '
                             'Found duplicates: %s' % ', '.join(dups))

code block 3

unnamed 된 파라미터를 검사한다. self.param_groups값을 활용하여 all_param_id를 만들고, named_parameter로 다시 named_param_id 를 만든다. 그리고 만약 이름이 정해지지 않은 파미터 있을 경우, model 이 갖는 파라미터에 오류가 있다고 판단. 에러를 발생시킨다.

        all_param_ids = {id(v)
                         for param_group in self.param_groups
                         for v in param_group['params']}
        named_param_ids = {id(v) for k, v in named_parameters}
        unnamed_param_ids = all_param_ids - named_param_ids
        if len(unnamed_param_ids):
            raise ValueError('named_parameters was specified, but one or more model '
                             'parameters were not named. Python object ids: '
                             '%s' % ', '.join(str(id) for id in unnamed_param_ids))

code block 4

전달받은 나머지 파라미터를 활용하여 오브젝트 변수를 초기화 한다.

        self._parameter_names = {v: k for k, v in sorted(named_parameters)}
        self.backward_passes_per_step = backward_passes_per_step
        self._allreduce_delay = {v: self.backward_passes_per_step
                                 for _, v in sorted(named_parameters)}
        self.op = op
        self.gradient_predivide_factor = gradient_predivide_factor
        self.sparse_as_dense = sparse_as_dense
        self.process_set = process_set

        self._handles = {}
        self._grad_accs = []
        self._requires_update = set()
        self._synchronized = False
        self._should_synchronize = True

        if groups is not None:
            if not (isinstance(groups, list) or groups > 0):
                raise ValueError('groups should be a non-negative integer or '
                                'a list of list of torch.Tensor.')
            if isinstance(groups, list):
                grouped_parameter_ids = set()
                for l in groups:
                    for p in l:
                        if not isinstance(p, torch.Tensor):
                            raise ValueError('groups must consist of torch.Tensor.')
                        if id(p) in grouped_parameter_ids:
                            raise ValueError('A parameter can only appear once in groups.')
                        grouped_parameter_ids.add(id(p))
        self._groups = groups
        self._p_to_group = {}
        self._group_counts = {}

code block 5

아래 코드 블럭이 눈여겨볼만 하다. 만약 process set이 현재 global process 에 포함되어 있고, horovod process가 1개 이상이면 self._register_hook() 을 호출한다.

        if self.process_set.included() and (size() > 1 or os.environ.get('HOROVOD_ELASTIC') == '1'):
            self._register_hooks()

conclusion

별도로 hook() 을 등록한다는 것이 흥미로웠다. 다음 포스팅에 _register_hook()에 대해 추가적으로 다룰 예정입니다.

profile
steadycode

0개의 댓글