DDP - (1) IPC 통신

손기훈·2024년 8월 29일

IPC - Socket 통신

Reference

개요

DDP의 핵심은 생성된 프로세스 그룹의 각 프로세스들이 GPU를 점유한채로 신경망과 데이터 배치들을 받아 각각 학습을 진행하고 진행된 학습의 loss를 공유하면서 병렬적으로 분산학습을 진행한다는것이다. 이 때, 각 프로세스들이 데이터를 주고 받는 과정은 필수적이다. 따라서, DDP의 일부 기능 중 하나를 구현하기에 앞서서 IPC에 대한 이해를 하고 넘어가야 한다는 생각이 들었다.

IPC - Inter Process Communication

일반적인 프로그래밍에선 IPC를 마주치는 상황이 적을 것이다. 하지만, 여러 프로세스들이 데이터를 공유하고 처리를 해야하는 등의 상황에선 필수적이다(OS 레벨에서 이루어지는 프로세스의 데이터 교환 등). IPC에 대한 간단한 설명과 IPC를 사용하는 다양한 방법에 대해 설명할 것이다.

what is IPC?

IPC란 이름에서 알 수 있듯이, 프로세스간의 데이터 통신을 의미한다. 하지만, 사실 IPC가 프로그래밍과 데이터 처리에서 아주 자연스러운 개념은 아니다. 왜냐면, 기본적으로 프로그램은 실행되는 순간 해당 프로그램이 점유하는 메모리 공간을 배당받고 해당 메모리 공간 내에서 데이터를 처리하기 때문이다. 이는 각 프로그램의 독립성을 보장하기 위해서도 행해지는 조치인데, 하나의 프로그램이 연산 과정에서 다른 프로그램의 메모리의 데이터를 변경시키거나 훼손시키면 안 되기 때문이다. 하지만, 실제로는 데이터가 서로 다른 방식의 처리 과정이 필요하고 처리 된 데이터를 각 프로세스끼리 공유해야하는 경우들이 아주 왕왕 있다. 이런 경우에 필요한 방식이 바로 IPC이다.

Shared Memory

Shared Memory는 이름에서 바로 직관적으로 알 수 있듯이, 여러 개의 프로세스가 같은 메모리 공간을 점유하는 것이다. 이 경우에는 다른 프로세스 간에 실제로 데이터가 송수신 되는 처리가 될 필요없이 같은 메모리 공간에서 읽기/쓰기 처리를 하기 때문에 매우 빠른 데이터 처리가 가능하다. 하지만, 하나의 프로세스가 점유하고 있는 데이터에 대해서 다른 프로세스가 접근하는 것을 막아줘야 한다. 이유는 원래 각 프로세스끼리 독립된 메모리 공간을 갖는 이유와 같다.

Shared File System

하나의 프로세스가 파일을 쓰면 다른 프로세스들이 해당, 파일을 열어서 읽는 형태의 데이터 공유 방법이다. PUB/SUB 방식의 통신과 상당히 유사함을 알 수 있다.

Pipe

PIPE는 사실 Shared File System과 유사하다. 실제로는 Shared Memory에서 필수적으로 사용하는 Message Queue 방식을 파일 형태로 하여 한 쪽에서 쓰면 한 쪽에서 읽는 것이다. 다만 일반적인 파일이 아닌 OS의 커널에서 관리하는 특별한 형태의 파일이다.

Socket Communication

위의 방안들은 모두 하나의 호스트 머신에서 프로세스끼리 통신하는 과정이다. 만약, 같은 서버 클러스터 혹은 원격 상황에서 서로 다른 프로세스끼리 데이터를 공유해야하는 상황이 있다면 이는 필연적으로 네트워크를 통해 통신을 해야한다. 소켓 통신은 결국 tcp/udp 통신의 과정이긴 하지만, 실제로 직접적으로 소켓 통신을 가장 활발하게 사용하는 경우는 IPC가 요구되는 상황이다. 이제 자세히 소켓 통신 과정과 multi-client, multi-server의 상황에 대해 알아보자

Socket IPC

만약 같은 호스트 머신에 있는 상황에서 IPC를 해야하는 상황이라면 위의 방안들을 사용하는 편이 훨씬 유리하다. 하지만, 서로 다른 머신에서 IPC를 해야하는 상황에서는 socket으로 통신하여 IPC를 하는 방안이 유일하다.

다른 네트워크의 통신 방법처럼 소켓은 서버 클라이언트 구조로 통신을 취한다. 아래는 간단한 socket의 동작방식에 대한 설명이다.

socket 동작 방식

위의 동작 중에서 socket의 client의 내용을 먼저 살펴보자. 소켓 클라이언트의 코드는 아래와 같이 구현이 가능하다.

socket - client

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("www.python.org", 80))
  • AF_INET은 IPV4 통신을 의미하고, SOCK_STREAM은 TCP 통신을 의미한다.

위의 connect 메소드가 완료되면, 소켓이 request를 보낼 준비가 되었다는 뜻이다. 이 때 send 메소드를 통해 데이터를 보내게 된다. send 메소드를 통해 데이터를 보낸 후에 응답을 받게 되고, 다시 빈 문자열을 보내, 연결이 종료됐다는 신호를 보낸다 그 이후에는 소켓이 파괴된다. client의 소켓은 하나의 리퀘스트에 1번만 사용된다.

socket - server

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind((socket.gethostname(), 80))
serversocket.listen(5)

위의 코드는 accetpt 메소드를 호출하여 소켓에 데이터를 받을 준비를 하기 직전까지의 코드이다. 이때, bind의 인자로 연결할 주소를 설정할 수 있다. 이때 위와 같인 호스트 네임을 통해 받을 수도 있고, ‘localhost’, ‘127.0.0.1’ 등 으로 같은 호스트 머신 안에서만 통신이 되도록 할 수도 있다. s.bind(’’, 80)과 같이 머신이 가질 수 있는 모든 주소로 소켓을 연결하게 할 수도 있다. listen 메소드의 인자의 값은 큐의 사이즈를 정하는 인자이다. 초기값은 MAX로 설정되어 있다.

이제 실제로 서버가 데이터를 받고 실행되는 부분의 코드를 작성해보자.


# 1 threaded server 버전
while True:
    (clientsocket, address) = serversocket.accept()
    # in this case, we'll pretend this is a threaded server
    ct = client_thread(clientsocket) 
    ct.run()
    

위의 코드의 역할은 다음과 같다. 위의 루프를 실행시키기 위해서는 보통 3가지 방법이 사용되 수 있다.

  1. 스레드를 디스패칭하기
  2. clientsocket을 다룰 새 프로세스를 스폰하기
  3. nonblocking 모드로 재구성하기 등이 있다.

위의 예제는 클라이언트 소켓을 다루기 위한 스레드를 디스패칭하는 예제이다.

일단 위의 코드로부터 알아야할 것은, 위의 동작이 서버가 동작하는 방식의 전부라는 점이다. 데이터를 보내지도 않고, 받지도 않는다. 이 때 그저 새로운 소켓을 생성한다는 점이다. 이 새로 생성된 소켓은 클라이언트 소켓에 응답을 하는 역할을 한다. 우리가 포트와 아이피를 주어 생성한 소켓은 새로운 연결이 들어올지도 모르기 때문에, 계속 listen 상태에서 다른 연결이 들어오는 것을 대기하고 있다. 이 일련의 동작은 accept 메소드를 통해 이루어진다.


# 2 일반적인 커뮤니케이션 루프 버전
with clientsocket:
    while True:
        request = clientsocket.recv(1024)
        request = request.decode("utf-8") # convert bytes to string
       
        if not request:
            clientsocket.send("closed".encode("utf-8"))
            break

        print(f"Received: {request}")

위의 예제는 일반적으로 가장 많이 사용되는 서버 구현 코드이다. 이 떄 recv 메소드는 요청이 들어오는 버퍼의 사이즈를 설정하는 인자이다. 위의 1024의 의미이는 버퍼사이즈를 1024로 하겠다는 뜻이다. 해당 들어오는 데이터는 인코딩 된 데이터이므로 디코딩의 과정을 거친다.

socket - socket을 사용한다는 것의 정확한 의미

소켓을 이용한 통신에는 두 가지 메소드가 사용된다. recv와 send가 사용된다. file을 통한 io를 통해 소통하는 경우는 read와 write를 사용한다. read와 write를 사용하여 통신을 할 때는 file 자체가 일종의 버퍼로 작동하기 때문에 필수적으로 flush하는 기능을 넣어줘야한다. 만약 이때 , flush기능이 없다면 무한정 대기 상태로 들어갈 것이다.

이제 네트워크 버퍼를 사용하여 동작하는, recv와 send에 대해서 더 자세히 알아보자. 이 두 메소드는 실제로 통신과정에서 전달되는 모든 바이트를 다룰 필요는 없다. 이들의 관심사와 동작 방식은 철저히 네트워크 버퍼에 의존적이다. 네트워크 버퍼가 채워져 있거나(send) 혹은 네트워크 버퍼가 비워져있을 때(recv)에 동작한다. 이 때 이들이 반환하는 것은 메소드 호출 시에 다뤄진 데이터의 양 - 몇 바이트 - 에 대한 것이 전부이다. 따라서, 통신하고자 하는 데이터가 완전히 전송되기까지 위의 메소드들을 다시 호출하는 것은 철저히 코드를 짜는 사람의 역할이다.

이 때 recv 메소드가 0 byte를 반환했다면 이는 client소켓이 닫혔음을 의미하는 것이다. 이것은 send 메소드도 동일하다. 따라서, 소켓을 하나의 연결이 끝났음에도 재사용하고자 하는 것은 사용종료 상태가 없다는 뜻이다. 이 때 recv는 무한정 대기 상태로 들어가야 한다. 이를 통해 socket의 근본에 대해서 알 수 있다.

  1. 메세지는 고정된 길이여야 한다.
  2. 메세지는 구분되어 있어야 한다.
  3. 메세지의 길이를 알려줘야 한다.
  4. 메세지는 connection을 끊어야 한다.

메소드를 통해 좀 더 명확하게 소켓이 동작하는 기전에 대해 살펴보면 다음과 같다.

  • listen(): 소켓에 들어오는 클라이언트를 들어오는 것을 기다린다. 이 때 들어오는 요청을 작업 큐에 넣어둔다.
  • accept(): 현재 프로그램의 실행을 연결이 들어올 때까지 대기 시킨다. 큐에서 처리하기 위한 연결을 제거한다. 그 이후에 들어온 연결에 응답하기 위하여 새로운 소켓을 생성한다.
  • connect(): 원격 서버에 새로운 연결을 생성한다.
  • send(): 네트워크 버퍼가 차있을 때 동작한다. 타겟 소켓에 데이터를 보낸다. 이후에는, 보낸 데이터의 길이를 반환한다.
  • recv(): 네트워크 버퍼가 비워져 있을 때 동작한다. 원천 소켓에서 데이터를 받는다. 이 때 받은 데이터의 길이를 반환한다.
  • close(): 소켓을 닫음과 동시에 연결되어 있는 다른 쪽 소켓에 빈 문자열을 보내 소켓이 닫혔음을 알린다.

socket - Disconnecting

엄밀하게는 소켓을 close 하기 이전에 shutdown 동작을 먼저 해줘야 한다. 다른 쪽의 소켓에 이제 연결을 끊겠다고 미리 말하는 것이다. 하지만, 대부분 프레임워크에서는 이를 명시적으로 동작시키는 부분이 없다. 파이썬에선 가비지 콜렉터가 소켓 객체를 메모리에서 삭제할 때 자동적으로 ‘clsoe’를 해준다. 다만 이에 너무 의존하는 것은 안 좋은 습관이다.

socket - non-blocking

위의 보았다 시피 소켓은 send의 동작과 recv 동작이 실행될 때 해당 메소드가 동작하기 전까지 - 실제 받든 데이터를 처리하기 전까지 - 다른 작업을 멈추고 대기하게 된다. 이는 매우 직관적이고 편안하지만, 동시에 여러 개의 연결을 처리해야 할 때 두 가지 문제가 발생하게 된다.

  1. 동시에 여러 번의 연결을 어떻게 처리해야 하는가?
  2. 모든 데이터를 보내거나 받을 때까지 send와 recv를 호출해야 한다.

위의 문제를 해결하는 방식은 여러가지가 있다. 예를 들어 asyncio를 사용한다거나, threading을 사용한다거나 하는 방법이 있다. 하지만 이 때 방금 말한 threading과 asyncio를 사용하는 대신, 더 간단한 방법이 있다. 소켓을 non-blocking 모드로 생성하고 select 와 함께 사용하는 것이다.

socket - non-blocking과 blocking의 차이점

blocking과 non-blocking의 가장 큰 차이점은, 메소드의 동작방식이다. recv send connect accept 의 메소드들은 어떤 동작도 없이 반환할 수 있다. 이 때 받을 데이터가 존재하지 않으면 에러가 발생하게 된다. 이 때 발생가능한 경우 모두를 try… accet 으로 컨트롤하는 것은 거의 불가능에 가깝다.

여기서 select가 사용된다. select 는 다중입출력을 하고자 할 때 사용하는 다중입출력 모델이다. select 를 씀으로 인해서 select가 검사하는 이벤트의 발생을 대기할 수 있고 각 소켓이 준비됐을 때 읽고 쓸 수 있다. (실제 구현시에는 selector 모듈을 사용할 예정)

socket - multi-connection server

select의 동작 방식과 실제로 multi-connection 환경을 코드로 구현하면서 알아보자.

sel = selectors.DefaultSelector() # selector 객체 선언

host, port = sys.argv[1], int(sys.argv[2])

lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.bind((host, port))
lsock.listen()

print(f"Listening on {(host, port)}")
lsock.setblocking(False) # do not block other connection

sel.register(lsock, selectors.EVENT_READ, data=None) # selector에 등록

위의 코드에서 sel.register 파트를 주목해보자. 먼저 listening을 담당하는 소켓을 등록해주고 있다. select가 확인하는 EVENT는 READ, WRITE, ERROR가 있지만 여기 listening을 담당하는 소켓은 들어오는 연결을 검사하는 역할만을 하므로 selectors.EVENT_READ만을 등록해주고 있다. 이 때 소켓을 따라 들어오는 데이터는 select 메소드가 반환할 때 함께 반환된다.

try:
    while True:
        events = sel.select(timeout=None)
        for key, mask in events:
            if key.data is None:
                accept_wrapper(key.fileobj)
            else:
                service_connection(key, mask)
except KeyboardInterrupt:
    print("Caught keyboard interrupt, exiting")
finally:
    sel.close()

위의 sel.select는 socket들이 준비가 될 때까지 blocking 하고 있다. 해당 메소드의 결과는 key, mask의 튜플을 반환한다. key는 selector key의 named tuple이고 fileobj 속성을 가지고 있다. 이 때 fileobj는 소켓객체이다. mask는 준비가 된 연산의 이벤트 마스크이다. 이 떄 mask는 int 값이다. None 일 때는 1의 값을 지니고 있다. selector.EVENT_READ 는 1, selector.EVENT_WRITE는 3의 값을 가지고 있다. MASK는 2 혹은 3의 값을 가지고 있다.

key.data가 none이라는 뜻은 해당 데이터가 listen하고 있는 서버라는 뜻이므로 connection을 받을 수 있도록 해줘야 한다. 직접 만든 accept_wrapper를 호출해서 새로운 소켓 객체를 만들고 selector에 등록시키자.


def accept_wrapper(sock):
    conn, addr = sock.accept()  # Should be ready to read
    print(f"Accepted connection from {addr}")
    conn.setblocking(False)
    data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"")
    events = selectors.EVENT_READ | selectors.EVENT_WRITE
    sel.register(conn, events, data=data) # accept 이후에는 새로 소켓을 하나 생성해서 데이터를 받기 때문에 따라서 또 selector에 등록시켜줘야함

accept 동작으로 새로운 소켓이 스폰되므로 새로운 소켓 객체인 conn 역시 setblocking(false) 메소드로 블로킹 하지 않도록 해줘야 한다. 이제 해당 connection 에 이벤트인 READ와 WRITE 를 넣어준다. WRITE를 넣어주는 이유는 응답 시에 데이터를 써서 보내야하기 때문이다. 그리고 데이터에 client 주소와 받는 데이터와 나가는 데이터를 속성으로 하는 타입 객체(types.SimpleNamespace) 를 만들어서 데이터 인자에 바인딩해준다. 이로써 새로운 소켓 객체에 이벤트와 데이터를 바인딩할 준비를 마쳤다. 마지막으로, sel.register 를 통해 새로운 소켓객체를 selector가 감시할 수 있도록 등록해주면서 이벤트와 데이터를 객체에 바인딩 해준다.

자 위의 코드로부터 이제 key의 데이터가 None이 아니면 client 소켓이라는 것을 알 수 있게 되었다. 이제 그렇다면 client 소켓이 들어왔을 때 응답 부분과 처리 로직을 담당하는service_connection 을 작성해주자.


def service_connection(key, mask):
    sock = key.fileobj
    data = key.data
    if mask & selectors.EVENT_READ:
        recv_data = sock.recv(1024)  # Should be ready to read
        if recv_data:
            data.outb += recv_data
        else:
            print(f"Closing connection to {data.addr}")
            sel.unregister(sock)
            sock.close()
    if mask & selectors.EVENT_WRITE:
        if data.outb:
            print(f"Echoing {data.outb!r} to {data.addr}")
            sent = sock.send(data.outb)  # Should be ready to write
            data.outb = data.outb[sent:]

selector.EVENT_READ 는 1, selector.EVENT_WRITE는 2의 값을 가지고 있다. MASK는 key.data == None 일 때는 1의 값을 가지고 있었다. 그리고 그 이후에는 2 혹은 3의 값을 가지고 있었다. 이 때의 값이 정해지는 이유를 잘 몰랐다.

조금 찾아보니 이유를 알 수 있었다. selector의 이벤트 플래그인 EVENT_READEVENT_WRITE 의 값은 bitmask인 상수로 16진수의 값으로 1과 2이다.

  • EVENT_READ : bitmask로 0b01 - 16진수 1
  • EVENT_WRITE : bitmask로 0b02 - 16진수 2

이 때 최초의 listening 서버의 경우 READ 이벤트만 바인딩 해줬으므로 해당 마스크가 준비됐을 때도 1의 값만을 가지게 되고, WRITE만 준비됐을 경우에는 2 그리고 다시 둘 다 준비가 됐을 때는 events= selectors.EVENT_READ | selectors.EVENT_WRITE 의 연산으로 인해 비트의 합연산이 이루어져 3의 값을 가지게 되는 것이었다.

그 이외에 나머지 저 코드를 설명하자면 recv 가 없을 경우에는 연결이 끊긴 경우이므로 selector에서 해제시켜주어야 하고, 만약 데이터가 있으면 echoing 서버이므로 그대로 data.outb에 써준다. 데이터가 쓸 준비가 되어있고 outb가 존재하는 경우라면 데이터를 send 해 줘야한다. 그리고 데이터의 outb는 다시 데이터가 전송됐으므로 보낸 길이만큼 다시 지우고 써준다.

socket - multi-client

소켓의 멀티 클라이언트는 서버의 구현과 유사하지만 약간 다르다. 아래의 구현코드를 보면서 살펴볼 수 있도록 하자.


import sys
import socket
import selectors
import types
import traceback

sel = selectors.DefaultSelector()
messages = [b"Message 1 from client.", b"Message 2 from client."]
host, port, num_conns = sys.argv[1], int(sys.argv[2]), int(sys.argv[3])

print(host, port, num_conns)

try:
    
    start_connections(host, port, num_conns)

    while True:
        events = sel.select(timeout=2)
        if not events:
            print("No events. Checking connections...")
            continue
        for key, mask in events:
            service_connection(key, mask)
except KeyboardInterrupt:
    print("Caught keyboard interrupt, exiting")

except Exception as e:
    print("ERROR occurred")
    # print(traceback.format_exc())
finally:
    sel.close()

위의 코드는 리스닝 서버가 필요한 상황이 아니기 때문에, 최초의 리스닝서버를 selector에 등록시켜주지 않아도 된다. 이 때, start_connection으로 여러 개의 커넥션을 생성하여 서버에 connect를 시켜준다.

start_connection의 코드를 알아보자.


def start_connections(host, port, num_conns):
    server_addr = (host, port)
    for i in range(0, num_conns):
        connid = i + 1
        print(f"Starting connection {connid} to {server_addr}")
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setblocking(False)
        sock.connect_ex(server_addr)
        events = selectors.EVENT_READ | selectors.EVENT_WRITE
        data = types.SimpleNamespace(
            connid=connid,
            msg_total=sum(len(m) for m in messages),
            recv_total=0,
            messages=messages.copy(),
            outb=b"",
        )
        sel.register(sock, events, data=data)

똑같이 non-blocking모드로 소켓을 열어준다. 다만, 일반적인 클라이언트와 다른 점은 connect 메소드 대신 connect_ex 메소드를 사용하여 연결을 시도한다는 점이다. 왜냐하면 connect 메소드는 바로 blockin_io 에러를 일으기지만, connect_ex는 에러를 일으키는 대신 에러의 발생을 알려주기 때문이다. 서버의 select는 기본적으로 blocking 모드 (이는 해당 소켓이 준비 상태가 될 때까지만 blocking하는 것이다.) 이기 때문이다.

커넥션이 완료되면 똑같이 읽고 쓸 준비가 완료된 것이다. 이때 서버의 select 동작과 똑같이 데이터는 select가 반환할 때 함께 반환한다. 이 때 서버와 똑같이 저장하고 싶은 데이터는 types.SimpleNamespace 를 통해 생성하고 저장해야한다.

이 코드에서 유의해야할 것은 메세지를 copy해줘야 한다는 점이다. 각 커넥션이 send를 호출하고 리스트를 수정하기 때문이다. 소켓에서 가장 중요한 것은 모든 활동은 계속 추적되어야 한다는 점이다. 클라이언트가 보내야할 것, 받은 것, 메세지의 길이 등등. 이러한 모든 정보를 data 객체에 담아야한다.

이제 service_connection을 작성해보자.


def service_connection(key, mask):
     sock = key.fileobj
     data = key.data
     if mask & selectors.EVENT_READ:
        recv_data = sock.recv(1024)  # Should be ready to read
        if recv_data:
            print(f"Received {recv_data!r} from connection {data.connid}")
            data.recv_total += len(recv_data)

        if not recv_data or data.recv_total == data.msg_total:
            print(f"Closing connection {data.connid}")
            sel.unregister(sock)
            sock.close()
     if mask & selectors.EVENT_WRITE:
        if not data.outb and data.messages:
            data.outb = data.messages.pop(0)
        if data.outb:
            print(f"Sending {data.outb!r} to connection {data.connid}")
            sent = sock.send(data.outb)  # Should be ready to write
            data.outb = data.outb[sent:]

위의 서비스 구현 코드는 서버의 구현코드와 크게 다르지 않음을 알 수 있다. 다만 받은 메세지의 길이가 원래 보낸 메세지의 길이와 맞지 않으면, 소켓의 등록을 해제하고 종료하는 것을 볼 수 있다.

profile
파이썬과 함께라면 두렵지 않아

0개의 댓글