이전 포스팅에서 horovod class 중 하나인 _DistributedOptimizer()
를 실행할 때, 추가적으로 _register_hook()
을 실행한다는 것을 파악하고 분석을 진행했다. 너무 글이 길어져 추가 분석을 본 포스팅에서 진행한다.
def _register_hooks()
def _register_hooks(self):
if self._groups is not None:
p_list = []
# Get list of parameters with grads
for param_group in self.param_groups:
for p in param_group['params']:
if p.requires_grad:
p_list.append(p)
# To ensure parameter order and group formation is consistent, broadcast p_list order
# from rank 0 and use for every worker
p_list_names = [self._parameter_names.get(p) for p in p_list]
p_list_names = broadcast_object(p_list_names, root_rank=0, process_set=self.process_set)
p_list = sorted(p_list, key=lambda p: p_list_names.index(self._parameter_names.get(p)))
# Form groups
if isinstance(self._groups, list):
p_groups = []
grouped_id = set()
p_list_ids = [id(p) for p in p_list]
for group in self._groups:
p_groups.append([p for p in group if id(p) in p_list_ids])
for p in p_groups[-1]:
grouped_id.add(id(p))
for p in p_list:
if id(p) not in grouped_id:
p_groups.append([p])
else:
p_groups = split_list(p_list, self._groups)
p_groups = [tuple(p) for p in p_groups]
for group in p_groups:
for p in group:
self._p_to_group[p] = group
self._group_counts[group] = 0
for param_group in self.param_groups:
for p in param_group['params']:
if p.requires_grad:
self._requires_update.add(p)
p_tmp = p.expand_as(p)
grad_acc = p_tmp.grad_fn.next_functions[0][0]
grad_acc.register_hook(self._make_hook(p))
self._grad_accs.append(grad_acc)
위 과정을 쉽게 요약하면 다음과 같다.
p_list
를 초기화p_list
에 appendp_list_names
에 p_list에 존재하는 파라미터의 이름을 넣는다.p_list_names
에 broadcast_object 함수의 리턴값을 넣는다.p_list
를 sorting 한 값을 p_list에 넣는다.다음과 같은 궁금증이 생겼으며, 첫번째 궁금증을 해결했다.
self._requires_update
은 어떤 역할을 할까expand_as
는 어떤 함수일까grad_fn
은 무엇일까grad_acc
은 무엇일까self.param_groups
에서 굳이 'param' 으로 parameter 값을 저장하는 이유가 무엇일까_make_hook()
은 무엇일까expand_as
Pytorch 문서 상으로는 다음과 같이 설명되어있다.
Expand this tensor to the same size as other.
self.expand_as(other)
is equivalent toself.expand(other.size())
.
아마 동일한 차원을 사용했으니, 동일한 tensor 를 return하지 싶다.
import torch
x = torch.randn(4, 4, requires_grad=True)
x_tmp = x.expand_as(x)
x_same = x
print(id(x), x)
print(id(x_tmp), x_tmp)
print(id(x_sname), x_same)
140455830053648 tensor([[ 0.0576, 1.8933, 1.0352, -1.1749],
[ 0.5789, -0.4150, -2.0399, -0.3701],
[-0.3757, 0.3018, -0.4324, -0.1325],
[ 0.2515, -0.1606, -0.2203, 0.6161]], requires_grad=True)
140455829732304 tensor([[ 0.0576, 1.8933, 1.0352, -1.1749],
[ 0.5789, -0.4150, -2.0399, -0.3701],
[-0.3757, 0.3018, -0.4324, -0.1325],
[ 0.2515, -0.1606, -0.2203, 0.6161]], grad_fn=<ExpandBackward0>)
140455830053648 tensor([[ 0.0576, 1.8933, 1.0352, -1.1749],
[ 0.5789, -0.4150, -2.0399, -0.3701],
[-0.3757, 0.3018, -0.4324, -0.1325],
[ 0.2515, -0.1606, -0.2203, 0.6161]], requires_grad=True)
왜 그런 것인지는 모르겠으나, expand_as
된 텐서는 grad_fn
을 출력한다. 그리고 x 값을 그대로 넣은 텐서는 메모리 위치가 동일한 것을 확인할 수 있다. 다음의 이유로 활용하는 것 같다.
grad_fn
초기화grad_fn and grad_acc
궁금해서 다음의 코드로 출력을 해보았다.
import torch
x = torch.randn(4, 4, requires_grad=True)
x_tmp = x.expand_as(x)
print(type(x.grad_fn), x.grad_fn)
print(type(x_tmp.grad_fn), x_tmp.grad_fn)
x_grad_acc = x.grad_fn.next_functions[0][0]
tmp_grad_acc = x_tmp.grad_fn.next_functions[0][0]
print(x_grad_acc)
print(tmp_grad_acc
<class 'NoneType'> None
<class 'ExpandBackward0'> <ExpandBackward0 object at 0x7f56e9d58d30>
AttributeError: 'NoneType' object has no attribute 'next_functions'
에러가 나는 x_grad_acc 만 주석처리하고 출력해봤다.
<class 'NoneType'> None
<class 'ExpandBackward0'> <ExpandBackward0 object at 0x7f5c52e54d30>
<AccumulateGrad object at 0x7f5c52d54a30>
grad_acc
은 AccumulateGrad
를 나타내는 객체인 것 같다. 한편, grad_fn
은 gradient function을 가리키는 객체인 것 같다.
analysis of grad_fn
심층 분석을 위해 다음의 자료를 참고했다. 참고 1, 참고 2
x = torch.randn(4, 4, requires_grad=True)
y = torch.randn(4, 4, requires_grad=True)
z = x * y
l = z.sum()
print("FP: \n")
print("X: \n",x, x.grad_fn)
print("Y: \n",y, y.grad_fn)
print("Z: \n",z, z.grad_fn)
print("L: \n",l, l.grad_fn)
dl = torch.tensor(1.)
back_sum = l.grad_fn
dz = back_sum(dl)
back_mul = back_sum.next_functions[0][0]
dx, dy = back_mul(dz)
back_x = back_mul.next_functions[0][0]
back_x(dx)
back_y = back_mul.next_functions[1][0]
back_y(dy)
FP:
X:
tensor([[ 1.0804, 0.2147, 0.5602, -0.1632],
[-0.8500, -1.5312, -1.4656, 0.4574],
[-1.6296, -2.2056, 0.7183, -0.4765],
[ 0.1588, 0.8230, 0.2368, 0.4921]], requires_grad=True) None
Y:
tensor([[-1.4711e+00, -1.3592e+00, 2.3243e-01, 1.2495e-01],
[-1.2825e+00, -1.0158e+00, 9.2749e-01, 4.7613e-01],
[-1.3274e+00, -4.5900e-02, 1.9388e-01, 2.3195e-01],
[-1.9465e-01, 1.8027e-03, -6.9851e-01, 1.8149e+00]],
requires_grad=True) None
Z:
tensor([[-1.5893e+00, -2.9182e-01, 1.3020e-01, -2.0395e-02],
[ 1.0901e+00, 1.5553e+00, -1.3594e+00, 2.1777e-01],
[ 2.1632e+00, 1.0124e-01, 1.3927e-01, -1.1053e-01],
[-3.0919e-02, 1.4836e-03, -1.6540e-01, 8.9309e-01]],
grad_fn=<MulBackward0>) <MulBackward0 object at 0x7f4c274b9fd0>
L:
tensor(2.7240, grad_fn=<SumBackward0>) <SumBackward0 object at 0x7f4c274b9fd0>
특징을 살펴보면 다음과 같다.
grad_fn
이 None
값을 가지고 있음.grad_fn
이 생성됨mul
연산sum
연산Backprop 이후의 결과를 출력해보았다.
print("BP\n")
print(x, "\n", x.grad)
print(y, "\n", y.grad)
print(z, "\n", z.grad)
print(l, "\n", l.grad)
BP
tensor([[ 0.5812, -0.4268, -0.8694, -1.7837],
[-0.1613, -0.5435, -0.3585, -0.0232],
[ 0.2461, -1.1176, 1.8555, -0.7077],
[-0.1033, -0.0230, -1.6810, 0.5903]], requires_grad=True)
tensor([[ 1.6810e+00, 7.3783e-01, 2.6994e-01, 1.2975e+00],
[ 2.0548e-03, -1.2514e+00, 9.3488e-02, -1.8246e+00],
[-2.2385e+00, 4.0009e-01, 8.1906e-01, -1.2315e+00],
[ 9.2964e-01, -1.4634e+00, 1.5285e+00, -6.5143e-01]],
grad_fn=<CopyBackwards>)
tensor([[ 1.6810e+00, 7.3783e-01, 2.6994e-01, 1.2975e+00],
[ 2.0548e-03, -1.2514e+00, 9.3488e-02, -1.8246e+00],
[-2.2385e+00, 4.0009e-01, 8.1906e-01, -1.2315e+00],
[ 9.2964e-01, -1.4634e+00, 1.5285e+00, -6.5143e-01]],
requires_grad=True)
tensor([[ 0.5812, -0.4268, -0.8694, -1.7837],
[-0.1613, -0.5435, -0.3585, -0.0232],
[ 0.2461, -1.1176, 1.8555, -0.7077],
[-0.1033, -0.0230, -1.6810, 0.5903]], grad_fn=<CopyBackwards>)
tensor([[ 9.7707e-01, -3.1493e-01, -2.3467e-01, -2.3144e+00],
[-3.3151e-04, 6.8017e-01, -3.3515e-02, 4.2267e-02],
[-5.5088e-01, -4.4712e-01, 1.5197e+00, 8.7150e-01],
[-9.6054e-02, 3.3667e-02, -2.5693e+00, -3.8454e-01]],
grad_fn=<MulBackward0>)
None
tensor(-2.8213, grad_fn=<SumBackward0>)
None
특징을 요약하면 다음과 같다.
grad_fn
을 가지고 있지 않음grad_fn
이 존재grad_fn
은 torch/csrc/autograd
에 함수 형태로 저장하나하나 분석해보자
Autograd is reverse automatic differentiation system. Conceptually, autograd records a graph recording all of the operations that created the data as you execute operations, giving you a directed acyclic graph whose leaves are the input tensors and roots are the output tensors. By tracing this graph from roots to leaves, you can automatically compute the gradients using the chain rule.
Internally, autograd represents this graph as a graph of Function objects (really expressions), which can be apply() ed to compute the result of evaluating the graph. When computing the forwards pass, autograd simultaneously performs the requested computations and builds up a graph representing the function that computes the gradient (the .grad_fn attribute of each torch.Tensor is an entry point into this graph). When the forwards pass is completed, we evaluate this graph in the backwards pass to compute the gradients.
analysis of grad_acc (accumulated grad)
다음의 참고 자료를 사용했다. 참고3
AccumulateGrad
위 참고자료에 따르면 AccumulatedGrad는 backprop 연산의 종료 조건이다. AccumulateGrad가 나오면 backprop을 종료한다고 한다.
_make_hook()
그렇다면 이제 make_hook()
을 알아보자. _register_hook()
에서는 다음과 같이 make_hook()
을 사용한다.
grad_acc = p_tmp.grad_fn.next_functions[0][0]
grad_acc.register_hook(self._make_hook(p))
그렇다면 여기서, register_hook()
의 역할은 무엇일까. 다음과 같다.
The hook will be called every time a gradient with respect to the Tensor is computed. The hook should have the following signature:
보통은 tensor에 hook을 등록하는데, 위 함수에서는 grad_acc
에 직접 거는 것 같다. 일단 함수에 걸어도 된다고 이해하고 넘어가자.
def _make_hook(self, p):
def hook(*ignore):
if p in self._handles and self._handles[p][0] is not None:
if self._allreduce_delay[p] <= 0:
raise AssertionError(
"Gradients were computed more than "
"backward_passes_per_step times before call "
"to step(). Increase backward_passes_per_step to "
"accumulate gradients locally.")
assert not p.grad.requires_grad
assert self._allreduce_delay[p] > 0
handle, ctx = None, None
self._allreduce_delay[p] -= 1
if self._allreduce_delay[p] == 0:
if self._groups is not None:
group = self._p_to_group[p]
self._group_counts[group] += 1
if self._group_counts[group] == len(group):
handle, ctxs = self._grouped_allreduce_grad_async(group)
self._handles[group] = (handle, ctxs)
# Remove any None entries from previous no-op hook calls
for gp in group:
self._handles.pop(gp, None)
self._group_counts[group] = 0
return
else:
handle, ctx = self._allreduce_grad_async(p)
self._handles[p] = (handle, ctx)
return hook
간단하게 요약하면 다음과 같다.
self._handles
그리고 self._handles[p][0]
가 None이 아니면 gradient가 이미 계산되었다는 의미이다.self._allreduce_grad_async(p)
를 진행한다.self._handles[p]
에 tuple
형태로 저장한다.self._handles
위 값은 __init__()
에 dict 형태로 정의되어있다.
self._handles = {}
2차원으로 정의를 할 수 있는건 tuple 형태이기 때문이다.
x = torch.randn(4, 4, requires_grad=True)
handles = {}
handles[x] = ("handles","ctx")
print(type(handles[x]))
print(handles[x][1])
print(handles[x][0])
<class 'tuple'>
ctx
handles
_allreduce_grad_async()
그렇다면 hook()
이 호출되었을 때 실행되는 self._allreduce_grad_async()
을 살펴보자
def _allreduce_grad_async(self, p):
if p.grad is None:
# Gradient was not computed, but we still need to submit a tensor to allreduce
# as one of the other ranks may have computed it (due to dynamic forward functions).
#
# NOTE: this will not work if the gradient is sparse and we perform an allgather.
# Unfrotunately, there doesn't appear to be a good way to detect that the parameter will
# produce sparse gradients before computing the gradient.
p.grad = p.data.new(p.size()).zero_()
name = self._parameter_names.get(p)
tensor = p.grad
if p.grad.is_sparse:
if self.sparse_as_dense:
tensor = tensor.to_dense()
else:
return self._sparse_allreduce_grad_async(p, name)
tensor_compressed, ctx = self._compression.compress(tensor)
if self.op == Average:
# Split average operation across pre/postscale factors
# C++ backend will apply additional 1 / size() factor to postscale_factor for op == Average.
prescale_factor = 1.0 / self.gradient_predivide_factor
postscale_factor = self.gradient_predivide_factor
else:
prescale_factor = 1.0
postscale_factor = 1.0
handle = allreduce_async_(tensor_compressed, name=name, op=self.op,
prescale_factor=prescale_factor,
postscale_factor=postscale_factor,
process_set=self.process_set)
return handle, ctx
간단히 요약하면 다음과 같다.
None
이더라도 0을 채워서 보냄handle
에 allreduce_async_()
를 넣음ctx
는 compression 값horovod/torch/mpi_ops/allreduce_async
위 디렉토리에 함수가 정의되어 있다. 잡다한 것은 제외하고 핵심만 보자.
def allreduce_async_(tensor, average=None, name=None, op=None,
prescale_factor=1.0, postscale_factor=1.0,
process_set=global_process_set):
"""
A function that performs asynchronous in-place averaging or summation of the input
tensor over all the Horovod processes.
The reduction operation is keyed by the name. If name is not provided, an incremented
auto-generated name is used. The tensor type and shape must be the same on all
Horovod processes for a given name. The reduction will not start until all processes
are ready to send and receive the tensor.
Arguments:
tensor: A tensor to reduce.
average:
.. warning:: .. deprecated:: 0.19.0
Use `op` instead. Will be removed in v1.0.
name: A name of the reduction operation.
op: The reduction operation to combine tensors across different ranks. Defaults to
Average if None is given.
prescale_factor: Multiplicative factor to scale tensor before allreduce.
postscale_factor: Multiplicative factor to scale tensor after allreduce.
process_set: Process set object to limit this operation to a subset of
Horovod processes. Default is the global process set.
Returns:
A handle to the allreduce operation that can be used with `poll()` or
`synchronize()`.
"""
op = handle_average_backwards_compatibility(op, average)
return _allreduce_async(tensor, tensor, name, op, prescale_factor, postscale_factor, process_set)
def _allreduce_async(tensor, output, name, op, prescale_factor, postscale_factor, process_set: ProcessSet):
...
function = _check_function(_allreduce_function_factory, tensor)
try:
handle = getattr(mpi_lib, function)(tensor, output, divisor,
name.encode() if name is not None else _NULL, op,
prescale_factor, postscale_factor, process_set.process_set_id)
except RuntimeError as e:
raise HorovodInternalError(e)
_handle_map[handle] = (tensor, output)
return handle
def _allreduce_function_factory(tensor):
return 'horovod_torch_allreduce_async_' + tensor.type().replace('.', '_')
def _check_function(function_factory, tensor):
function = function_factory(tensor)
if not hasattr(mpi_lib, function):
raise ValueError('Tensor type %s is not supported.' % tensor.type())
if not tensor.is_contiguous():
raise ValueError('Tensor is required to be contiguous.')
return function
간단하게 요약하면 다음과 같다.
getattr(mpi_lib, function)
으로 mpi_lib에서 함수를 가져옴.function
은 horovod_torch_allreduce..
으로 정의됨_check_function
에서 함수 유효성 검사다음엔 step()
함수를 포스팅하겠다.