이 페이지는 실제 DDP의 부분을 구현하기 보단 원리를 구현하기 위해 작성한 코드에 대한 설명이다. 해당 코드는 numpy와 socket 을 사용하여 ddp의 기본적인 동작 방식과 원리를 구현하였다. 실제로는 DDP에서 데이터를 통신하고 받고 reduction 연산을 거치고 데이터의 상태를 균일하게 맞추는 과정은 Ring_AllReduce를 통해 이루어지지만, 이 포스트에서는 ALLReduce를 사용하여 DDP를 하는 과정을 구현하였다.
구현하기에 앞서 DDP의 과정을 복기하면서 구현할 파트가 어떻게 되는지를 하나하나 살펴보자.
자 각 단계별로 필요한 과정을 살펴보고 어떻게 구현해야 할지 살펴보자. 위의 순서를 기억하면서 다시 all_reduce를 묘사한 그림을 살펴보자.

자 위의 그림에서 Rank0과 Rank1,2,3 사이에서 데이터 교환이 일어나고 있고, 데이터를 rank0이 가져갔다가 다시 뿌리는 과정이 존재하고 있다. 1)의 과정에서 프로세스를 스폰하고 2) 모델을 브로드 캐스팅 하는 과정과 함께 생각해보면, 실제로는 Rank0이 Rank1,2,3을 스폰하고, Rank1,2,3 에 데이터를 보내는 역할임을 알 수 있다. 즉 Rank0은 데이터 처리를 하고 리퀘스트를 보내는 클라이언트 입장이고 Rank1,2,3이 서버의 역할을 하여 처리된 데이터를 반환 해야함을 알 수 있다.
따라서 위의 순서를 고려하여 설계를 하면 클라이언트(마스터) → Rank0, 서버(슬레이브) → Rank1,2,3의 역할을 해야함을 알 수 있다. 즉 클라이언트 역할을 할 파트와 서버 역할을 이행할 파트 둘로 나누어 구현되어야 한다. 이 부분을 그림으로 그려보면 다음과 같다.

실제로는 all_reduce_client 파트에서도 forward pass가 일어나지만, 이해를 돕기 위해 reduction 연산안에 포함되어 있는 걸로 간주하면 좋을 것 같다.
위의 DDP의 과정을 이제 클라이언트와 서버의 역할로 나누어 구분해보자.
자 위의 순서를 이제 다시 그림에 붙여서 보자.

일단 전체코드를 한 번 보고 이후에 섹션별로 설명을 하도록 하겠다. 클라이언트/서버 전체코드는 DDP-AllReduce 해당 리포에서 확인 가능하다.
import selectors
import socket
import types
import os
import signal
import traceback
import time
import numpy as np
from subprocess import Popen
class AllReduce(object):
def __init__(self, num_proc, host='127.0.0.1', start_port=25000):
self.num_proc = num_proc
self.host = host
self.start_port = start_port
self.pids = []
# self.selector = None
self.selector = selectors.DefaultSelector()
# data for communication
self.own_chunk = None
self.own_network = None
self.forward_data = []
self.reduction_result = None
# shape for the chunk
self.shape = None
def __del__(self):
print(f"pid : {self.pids}")
self.selector.close()
for pid in self.pids:
os.kill(pid, signal.SIGINT)
def spawn_processes(self):
for i in range(1, self.num_proc):
sub = Popen(['python', 'distributed_process.py', self.host, str(self.start_port + i)])
self.pids.append(sub.pid)
def start_connections(self):
for connid in range(1, self.num_proc):
server_addr = (self.host, self.start_port + connid)
print(f"Starting connection {connid} to {server_addr}")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
try:
sock.connect_ex(server_addr)
except BlockingIOError:
pass
events = selectors.EVENT_READ | selectors.EVENT_WRITE
data = types.SimpleNamespace(
connid=connid,
msg_total=0,
recv_total=0,
messages=[],
outb=b"",
)
self.selector.register(sock, events, data=data)
# 데이터를 읽고 쓰는 부분
def service_connection(self, key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(1024)
if recv_data == b'2': # 더미 데이터가 들어오면
print(f"Received {recv_data} from connection {data.connid}")
data.recv_total += len(recv_data)
print(f"Closing connection {data.connid}")
self.selector.unregister(sock)
sock.close()
if recv_data == b'1':
print(f"[BroadCasting] success, {recv_data} from connection {data.connid}")
self.selector.unregister(sock)
sock.close()
if len(recv_data) > 1:
print(f"[Scatter] success, {len(recv_data)} from connection {data.connid}")
forward_result_data = np.frombuffer(recv_data, dtype='float32')
forward_result_data = forward_result_data.reshape(self.shape)
self.forward_data.append(forward_result_data)
self.selector.unregister(sock)
sock.close()
if not recv_data:
print(f"Closing connection {data.connid}")
self.selector.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if not data.outb and data.messages:
data.outb = data.messages.pop(0)
if data.outb:
print(f"Sending {len(data.outb)} to connection {data.connid}")
sent = sock.send(data.outb)
data.outb = data.outb[sent:]
def conmmunication_loop(self):
try:
while True:
events = self.selector.select(timeout=2)
if not events:
print("No events. Checking connections...")
continue
for key, mask in events:
self.service_connection(key, mask)
except KeyboardInterrupt:
print("Caught keyboard interrupt, exiting")
except OSError:
# print(traceback.format_exc())
pass
except Exception as e:
print("ERROR occurred")
print(traceback.format_exc())
finally:
print("[info] this communication is done")
# self.selector.close()
def broadcast(self, data, type_):
self.start_connections()
self.own_network = data
byte_data = data.tobytes()
protocol = self.make_protocol(byte_data, type_, size=data.shape)
# print("length of selector :", len(self.selector.get_map().values()))
for key in self.selector.get_map().values():
# print("key :", key)
key.data.messages.append(protocol)
key.data.msg_total = len(protocol)
self.conmmunication_loop()
def scatter(self, data):
print("[scatter] started scatter pattern")
self.start_connections()
unit = len(data) // self.num_proc
self.own_chunk = data[0 : unit]
for key in self.selector.get_map().values():
# make protocol
data_ = data[unit:unit+unit].tobytes()
protocol = self.make_protocol(data_, 1, data[unit:unit+unit].shape)
#bind bytes data to message
key.data.messages.append(protocol)
key.data.msg_total = len(protocol)
unit += unit
self.conmmunication_loop()
# 실제 reduction 연산의 대상은 forwardpass 이후의 loss 값이다. 여기서는 간단하게 내적 연산으로 대체한다.
def reduction_op(self):
#own data result
master_forward = np.dot(self.own_chunk, self.own_network)
master_forward = master_forward.astype('float32')
self.forward_data.append(master_forward)
self.reduction_result = np.mean(self.forward_data, axis=0)
return self.reduction_result
def make_protocol(self, data, type_,size=(4,4)):
type_ = str(type_).encode()
self.shape = size
size_of_shape = str(len(size)).encode()
shape = ''.join(map(str,size)).encode()
bytes_data = type_ + size_of_shape + shape + data
return bytes_data
def main():
# initialize all_reduce class
all_reduce = AllReduce(3)
# spawn processes for distributed programming
all_reduce.spawn_processes()
# broadcast nueral network
network = np.random.random((4,4)).astype('float32')
all_reduce.broadcast(network, type_=0)
# scatter the batch data
batch = np.random.random((15, 4, 4)).astype('float32')
all_reduce.scatter(batch)
# all_reduce the batchdata
reduction_loss_value = all_reduce.reduction_op()
all_reduce.broadcast(reduction_loss_value, type_=2)
print("[Reduction] result of reduction op is broadcasted")
# quit all the process
quit_signal = np.array([])
all_reduce.broadcast(quit_signal, type_=3)
if __name__ == "__main__":
main()
약 200줄이 조금 넘어가는 코드로 구현할 수 있었다. 물론 이 코드는 실제로 학습을 진행하기 위한 코드가 아닌 구현과정과 원리를 이해해보는 코드이기 때문에 실제로 loss를 구하는 과정과 optimizing의 과정은 배제한체 코드가 구현되어 있다. 실제로 DDP를 할 때는 위와 유사한 과정을 통해 이루어진다고 이해하면 좋을 듯 하다.
자 먼저 main 코드 부터 살펴보자
def main():
# initialize all_reduce class
all_reduce = AllReduce(3)
# spawn processes for distributed programming
all_reduce.spawn_processes()
# broadcast nueral network
network = np.random.random((4,4)).astype('float32')
all_reduce.broadcast(network, type_=0)
# scatter the batch data
batch = np.random.random((15, 4, 4)).astype('float32')
all_reduce.scatter(batch)
# all_reduce the batchdata
reduction_loss_value = all_reduce.reduction_op()
all_reduce.broadcast(reduction_loss_value, type_=2)
print("[Reduction] result of reduction op is broadcasted")
# quit all the process
quit_signal = np.array([])
all_reduce.broadcast(quit_signal, type_=3)
먼 저 AllReduce의 인스턴스는 프로세스의 개수를 인자로 받는다. 그리고 해당 프로세스의 개수 -1 개 만큼의 서브프로세스를 spawn_processes() 를 통해 생성한다. 나머지 파트는 우리가 위의 그림에서 그린 대로 broadcasting(model) - scatter(batch) - broadcasting(reduction result)를 해주고 있음을 볼 수 있다.
이제 생성파트와 소멸 파트 및 구현파트를 살펴보자.
class AllReduce(object):
def __init__(self, num_proc, host='127.0.0.1', start_port=25000):
self.num_proc = num_proc
self.host = host
self.start_port = start_port
self.pids = []
# self.selector = None
self.selector = selectors.DefaultSelector()
# data for communication
self.own_chunk = None
self.own_network = None
self.forward_data = []
self.reduction_result = None
# shape for the chunk
self.shape = None
def __del__(self):
print(f"pid : {self.pids}")
self.selector.close()
for pid in self.pids:
os.kill(pid, signal.SIGINT)
def spawn_processes(self):
for i in range(1, self.num_proc):
sub = Popen(['python', 'distributed_process.py', self.host, str(self.start_port + i)])
self.pids.append(sub.pid)
init
위의 생성 파트에서 multiclient-socket 구현을 위해 non-blocking 소켓을 만들어야 한다. 그리고 해당 입출력을 제어하기 위한 selector 오브젝트의 생성과 rank0 프로세스인 클라이언트도 내부적으로 갖고있는 배치와 신경망의 연산을 하기 위해 저장하는 변수가 존재한다는 사실이다.
spawn_processes
spawn_processes 의 경우 프로세스를 subprocess 모듈을 사용하여 비동기적으로 스폰하고 해당 프로세스의 self.pids 의 속성에 저장하는 과정을 가지고 있다.
소멸자의 경우 소멸 시에 selector 오브젝트를 종료하고, 스폰시에 수집한 pid들을 통해 기존에 스폰한 프로세스들을 종료하는 과정을 가진다.
자 이제 실제 데이터를 전달하는 braodcast, scatter, 그리고 서버에서 데이터를 받았을 때 해당 데이터가 모델인지 배치데이터인지, reduction 결과물인지에 대해 구분을 해야하므로, 약간의 프로토콜을 만들어주는 make_protocol에 대해서 살펴보자.
def broadcast(self, data, type_):
self.start_connections()
self.own_network = data
byte_data = data.tobytes()
protocol = self.make_protocol(byte_data, type_, size=data.shape)
# print("length of selector :", len(self.selector.get_map().values()))
for key in self.selector.get_map().values():
# print("key :", key)
key.data.messages.append(protocol)
key.data.msg_total = len(protocol)
self.conmmunication_loop()
def scatter(self, data):
print("[scatter] started scatter pattern")
self.start_connections()
unit = len(data) // self.num_proc
self.own_chunk = data[0 : unit]
for key in self.selector.get_map().values():
# make protocol
data_ = data[unit:unit+unit].tobytes()
protocol = self.make_protocol(data_, 1, data[unit:unit+unit].shape)
#bind bytes data to message
key.data.messages.append(protocol)
key.data.msg_total = len(protocol)
unit += unit
self.conmmunication_loop()
def make_protocol(self, data, type_,size=(4,4)):
type_ = str(type_).encode()
self.shape = size
size_of_shape = str(len(size)).encode()
shape = ''.join(map(str,size)).encode()
bytes_data = type_ + size_of_shape + shape + data
return bytes_data
broadcast, scatter
소켓 통신은 한 번 연결이 되고 통신이 되고 나면 소켓이 파괴가 되기 때문에, 매 연결시에 커넥션을 새로해주어야 하기 때문에 , broadcasting 과 scatter 모두 start_connection을 메소드의 동작시 제일 먼저 해주는 것을 볼 수 있다. broadcsting과 scatter 모두 rank0에서 처리해야할 데이터를 따로 저장해준 후에 selector에 등록된 소켓의 데이터 속성에 for key in self.selector.get_map().values(): 반복문을 통해 값을 넣어주고 있음을 알 수 있다.
그 이후에는 self.conmmunication_loop() 메소드를 통해 selector의 등록된 소켓을 서버와 통신한다.
이 때 모델을 먼저 broadcasting하고 다시 배치 데이터를 보내고 최종적으로 reduction 연산의 결과를 보내는 것을 확인할 수 있다. 모델의 경우에는 간단한 구현을 위해 단순한 행렬을 만들어 전송하였다.
make_protocol
REST api처럼 엔드포인트를 지정하여 통신을 하고 있는 경우가 아닌, 하나의 소켓을 열어두고 지속적으로 통신을 하는 경우다 보니, 서버에서 수신한 데이터가 모델인지, 데이터 배치인지, 아니면 reduction 결과 인지등을 구분해야 한다. 이 때 서버에서 데이터를 구분하고 각 데이터의 길이와 형태를 디코딩해줄 수 있도록 일종의 헤더를 만들어야 한다. bytes_data = type_ + size_of_shape + shape + data 를 통해서 type_ 는 (0,1,2) 중의 하나의 값으로 (모델,배치,reduction) 구분할 수 있도록 하였고, size_of_shape을 통해 서로 전달하는 np.frombuffer로 행렬의 값을 복원할 때 사용할 수 있도록 하였다.
이제 start_connection과 communication loop 메소드에 대해 알아보자. 이 둘과 전반적인 multiconnection socket에 대한 내용은 IPC에 관한 포스팅(IPC - Socket 통신 )에 자세하게 설명되어 있다. 여기서는 multiconnection과 무관한 DDP에 사용되는 전반적인 과정의 안에서 다루도록한다.
def start_connections(self):
for connid in range(1, self.num_proc):
server_addr = (self.host, self.start_port + connid)
print(f"Starting connection {connid} to {server_addr}")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
try:
sock.connect_ex(server_addr)
except BlockingIOError:
pass
events = selectors.EVENT_READ | selectors.EVENT_WRITE
data = types.SimpleNamespace(
connid=connid,
msg_total=0,
recv_total=0,
messages=[],
outb=b"",
)
self.selector.register(sock, events, data=data)
# 데이터를 읽고 쓰는 부분
def service_connection(self, key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(1024)
if recv_data == b'2': # 더미 데이터가 들어오면
print(f"Received {recv_data} from connection {data.connid}")
data.recv_total += len(recv_data)
print(f"Closing connection {data.connid}")
self.selector.unregister(sock)
sock.close()
if recv_data == b'1':
print(f"[BroadCasting] success, {recv_data} from connection {data.connid}")
self.selector.unregister(sock)
sock.close()
if len(recv_data) > 1:
print(f"[Scatter] success, {len(recv_data)} from connection {data.connid}")
forward_result_data = np.frombuffer(recv_data, dtype='float32')
forward_result_data = forward_result_data.reshape(self.shape)
self.forward_data.append(forward_result_data)
self.selector.unregister(sock)
sock.close()
if not recv_data:
print(f"Closing connection {data.connid}")
self.selector.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if not data.outb and data.messages:
data.outb = data.messages.pop(0)
if data.outb:
print(f"Sending {len(data.outb)} to connection {data.connid}")
sent = sock.send(data.outb)
data.outb = data.outb[sent:]
def conmmunication_loop(self):
try:
while True:
events = self.selector.select(timeout=2)
if not events:
print("No events. Checking connections...")
continue
for key, mask in events:
self.service_connection(key, mask)
except KeyboardInterrupt:
print("Caught keyboard interrupt, exiting")
except OSError:
# print(traceback.format_exc())
pass
except Exception as e:
print("ERROR occurred")
print(traceback.format_exc())
finally:
print("[info] this communication is done")
# self.selector.close()
start_connection
start_connection 파트는 크게 설명할 부분이 없다. 대부분 위의 링크된 IPC 포스트에서 설명이 되어있기 때문이다.
communication_loop
communication_loop 역시 IPC 포스트에 대부분 설명이 되어있기 때문에 넘어가도록 한다.
service_connection
먼저 첫번쨰 if문인 if recv_data == b'2': 의 경우는 디버깅을 위한 부분이었으므로 건너 뛰기로 한다.
브로드캐스팅에 성공했을 경우 서버에서 b’1’을 반환하도록 했다. 따라서, if recv_data == b'1': 두 번째 if문의 내용은 브로드캐스팅에 성공했을 경우 동작하도록 되어있는 파트이다. 브로드캐스팅의 경우에는 서버에서 다시 돌려줘야할 특별한 값이 없으므로, 특별한 처리 없이 바로 소켓을 해제한다.
스캐터에 성공했을 경우는 forward pass의 결과물(실제로는 loss value)이 바로 다시 반환되므로, 길이가 1보다 크면 바로 해당 if문으로 진행하도록 했다. if len(recv_data) > 1: 세번째 if문이다. 이 떄는 rank0, rank1, rank2 … 의 연산결과를 모두 모아 reduction 연산을 진행한다. 그 이후에 소켓을 해제한다.
이제 최종적으로 reduction_op 메소드를 살펴보자.
def reduction_op(self):
#own data result
master_forward = np.dot(self.own_chunk, self.own_network)
master_forward = master_forward.astype('float32')
self.forward_data.append(master_forward)
self.reduction_result = np.mean(self.forward_data, axis=0)
return self.reduction_result
reduction_op
위의 과정은 사실 특별한 내용은 없다. 클라이언트에서 forward_pass를 진행해주고, 그 이후에 scatter 연산 이후에 반환받은 forward_pass 값을 함께 모아 reduction 연산 - 여기에선 평균을 내주었다 - 진행하는 것이 전부이다.
그리고 main 함수에서는 이 결과를 다시 서버로 broadcasting 한다.
위의 구현 파트에서 빠진 건 각 프로세스에서 backwardpass와 optimizing하는 파트이다. ddp의 전반적인 통신과정을 알아보기 위한 거였던 것인 만큼 구현에선 빠트렸다.
서버의 구현코드는 클라이언트보다 상대적으로 짧다.
import numpy as np
import socket
import logging
from logging.handlers import RotatingFileHandler
import sys
import traceback
class DistributedProcess(object):
def __init__(self, host, port) -> None:
self.host = host
self.port = port
self.network = None
self.input_datachunk = None
self.reduction_data = None
self.logger = None
self.set_logger()
def __del__(self):
self.logger.info("DistributedProcess object is deleted")
def set_logger(self):
process_id = self.port - 25000
self.logger = logging.getLogger(f'subprocess_{process_id}')
self.logger.setLevel(logging.INFO)
handler = RotatingFileHandler("subprocesses.log", maxBytes=10000, backupCount=5)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def open_socket_server(self):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((self.host, self.port))
s.listen()
self.logger.info(f"Socket opened on {self.host}:{self.port}")
while True:
conn, addr = s.accept()
with conn:
self.logger.info(f"Connected by {addr}")
while True:
data = conn.recv(1024)
self.logger.info(f"Received data: {data}")
if not data:
break
if data[0:1] == b'0':
self.network = self.decode_protocol(data[1:])
self.logger.info(f"Received network from {addr}, network : \n{self.network}")
conn.send(b"1")
elif data[0:1] == b'1':
self.logger.info(f"Received data chunk from {addr}")
self.input_datachunk = self.decode_protocol(data[1:])
result_forward = self.forward_datachunk()
conn.sendall(result_forward)
elif data[0:1] == b'2':
self.logger.info(f"Received reduction data from {addr}")
self.reduction_data = self.decode_protocol(data[1:])
self.logger.info(f"Reduction Data : \n{self.reduction_data}")
conn.send(b"1")
elif data[0:1] == b'3':
s.close()
sys.exit()
except Exception as e:
self.logger.error(f"Exception: {traceback.format_exc()}")
def decode_protocol(self, recv_data):
len_of_shape = int(recv_data[0:1].decode())
shape = tuple(map(int, recv_data[1:1+len_of_shape].decode()))
buffer_array_data = np.frombuffer(recv_data[1+len_of_shape:], np.float32)
buffer_array_data = buffer_array_data.reshape(shape)
self.logger.info(f"buffer_array_data : \n {buffer_array_data}")
return buffer_array_data
def forward_datachunk(self):
result_forward = np.dot(self.input_datachunk, self.network)
result_forward = result_forward.astype('float32')
self.logger.info(f"shape : {result_forward.shape}")
result_forward = result_forward.tobytes()
self.logger.info(f"bytes_length : {len(result_forward)}")
return result_forward
if __name__ == "__main__":
host, port = sys.argv[1], int(sys.argv[2])
process = DistributedProcess(host, port)
process.open_socket_server()
먼저 set_logger는 위의 프로세스들은 서브프로세스로 오픈이 되기 때문에 프린트 문을 찍어서 디버깅을 할 수 없기에, 각 로거 메소드를 가지고 하나의 로그 파일에 같이 쓰도록 하였다.
로거를 제외한 총 3개의 메소드를 가지고 있다. decode_protocol , forward_datachunk , open_socket_server 이 중에서 decode_protocol과 forward_datachunk의 경우는 특별한 내용은 없고 충분히 위의 client 파트만 봐도 이해가 되는 내용이기 때문에, 넘어가도록 한다.
open_socket_server
정석대로라면 원래 서버 역시 non-blocking 서버로 열어야 한다. 그래야 소켓을 지속적으로 열어두고 계속해서 연결되는 다양한 통신에 응답할 수 있기 때문이다. 하지만, 최대한 간단한 구현과 실제로 진행되는 순전파/역전파 연산시에는 blocking이 되어야 하므로, client 소켓의 종료 신호(sock.close()메소드가 호출되고 나서 전송하는 빈 문자열)를 받고 소켓이 종료되지 않고, 강제적으로 while True 문을 통해 열어놓을 수 있도록 하였다.
def open_socket_server(self):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((self.host, self.port))
s.listen()
self.logger.info(f"Socket opened on {self.host}:{self.port}")
while True:
conn, addr = s.accept()
with conn:
self.logger.info(f"Connected by {addr}")
while True:
data = conn.recv(1024)
self.logger.info(f"Received data: {data}")
if not data:
break
if data[0:1] == b'0':
self.network = self.decode_protocol(data[1:])
self.logger.info(f"Received network from {addr}, network : \n{self.network}")
conn.send(b"1")
elif data[0:1] == b'1':
self.logger.info(f"Received data chunk from {addr}")
self.input_datachunk = self.decode_protocol(data[1:])
result_forward = self.forward_datachunk()
conn.sendall(result_forward)
elif data[0:1] == b'2':
self.logger.info(f"Received reduction data from {addr}")
self.reduction_data = self.decode_protocol(data[1:])
self.logger.info(f"Reduction Data : \n{self.reduction_data}")
conn.send(b"1")
elif data[0:1] == b'3':
s.close()
sys.exit()
except Exception as e:
self.logger.error(f"Exception: {traceback.format_exc()}")
open_socket_server 메소드의 역할은 사실 소켓을 지속적으로 열어두고 대기한 상태에서 client의 보내는 요청에 맞추어 동작을 진행하는 것이다. 사실 잘 짜여진 코드는 아니다. 중첩된 while True문만 보더라도 그렇다. 하지만, 이 편이 최대한 간단한 구현이었다. 그리고 각 서버에서는 recv 메소드의 동작 이후에 미리 클라이언트와 맞추어놓은 프로토콜에 따라 동작(model copy, forward_pass)을 수행하는 것을 볼 수 있다.
DDP 파트를 공부하면서,분산처리와 동시처리 시에 필요한 IPC 의 개념과 다양한 collective 패턴을 배울 수 있었다. 이 번 all_reduce 파트의 구현은 두 개념을 다 사용하여 구현한 결과물이었다. 비록 구현 중에 쉽고 편한 구현을 위해 꼼수(?)를 사용하긴 하였지만, DDP의 핵심 원리를 이해할 수 있는 좋은 기회였다. 특히 소켓을 통해 IPC를 해보는 경험을 해보았고 좀 더 낮은 레벨의 프로그래밍과 DDP를 체화할 수 있었다.