[python]Shared Memory

YongHo·2024년 3월 25일

이번에 급하게 프로세스간 통신(IPC)을 진행하게 되었는데, 솔직하게 처음에는 이걸 내가 할 수 있을까? 라는 막막한 생각이 좀 컸던 것 같다. 시작 전에 결론부터 말하면 생각보다 정말 간단해서 좀 놀라웠다!

0. 프로세스란?

운영체제가 프로그램을 실행하기 위해 필요한 가장 작은 단위의 쓰레드, 메모리, 소스 코드들의 집합이며, 프로그램 동작 그 자체를 의미한다. 스케줄링이 되는 작업(task)와 같은 의미로 쓰인다.
쉽게 설명해보자면 '작업관리자 - 프로세스' 를 확인해보면 된다.
말 그대로 실행중인 프로그램들이라고 보면 된다!

프로세스에 대해서 설명하자면 정말 깊게 들어가기도 하고, 아직 많이 알지 못해 그 부분은 생략하고 넘어가겠다.


위 그림과 같이 프로세스 간에는 서로 다른 메모리 영역을 할당받게 된다.
즉, 일방적인 방법으로는 서로 데이터를 공유할 수 없다는 것이다. 그러므로 프로세스간 통신을 하려면 통신을 위한 별도의 공간을 만들어주어야 한다. 이를 위해 커널 영역에서는 IPC(Inter Process Communication)라는 내부 프로세스 간 통신을 지원하게 되고, 프로세스는 커널이 제공하는 IPC설비를 이용해 프로세스간 통신을 하게 된다.

프로세스간 통신(IPC)은 기본적으로 동일한 컴퓨터 시스템 내에서 실행되는 프로세스들 사이에 데이터를 주고받는 매커니즘을 말하지만, IPC 개념을 확장하여, 네트워크를 통해 서로 다른 컴퓨터 시스템에서 실행되는 프로세스들 간의 통신도 포함될 수 있다.

0-1. 컴퓨터 내부에서만 가능한 프로세스 통신

  • 메시지 큐 (Message Queues): 메시지를 저장하는 큐에 데이터를 전송하여 프로세스 간 비동기 통신을 지원한다.
  • 파이프 (Pipes): 일방향 통신 채널로, 한 프로세스에서 다른 프로세스로 데이터 스트림을 전송한다.
  • 공유 메모리 (Shared Memory): 둘 이상의 프로세스가 접근할 수 있는 메모리 영역을 생성하여 데이터를 공유한다.

0-2. 외부 컴퓨터도 가능한 프로세스 통신

  • 소켓 프로그래밍 (Socket Programming): TCP/IP와 같은 네트워크 프로토콜을 사용하여 다른 시스템의 프로세스와 데이터를 교환하며 소켓은 네트워크 통신의 기본적인 단위로, 서버와 클라이언트 모델을 사용하여 통신을 구현한다.
  • 원격 프로시저 호출 (Remote Procedure Call, RPC): 네트워크를 통해 다른 컴퓨터의 프로세스에서 함수나 프로시저를 호출할 수 있게 하는 프로토콜이다.

오늘은 이 중에서 Shared Memory에 대한 예제 코드를 작성해보았다.

1. SharedMemory란?

말 그대로 메모리를 공유하는 것이다. SharedMemory는 새로운 공유 메모리 블록을 만들거나 기존 공유 메모리 블록에 연결한다.
각 공유 메모리 블록에는 고유한 이름이 할당된다. 이러한 방식으로 하나의 프로세스는 특정 이름을 가진 공유 메모리 블록을 만들 수 있고 다른 프로세스는 동일한 이름을 사용해 동일한 공유 메모리 블록에 연결할 수 있다.

shared_mem = SharedMemory(name = 'EXAM_SharedMemory', size = 1024, create = True)

공유 메모리는 고정된 크기를 가지며 바이트 데이터를 저장하게 된다. Python은 바이트 배열로 변한되어 SharedMemory에 저장되고, 바이트의 배열로 읽고 다시 Python Type으로 변환할 수 있다. 이는 queue 혹은 pipe같이 메시지 전달을 통해 데이터를 공유하는 것보다 빠르고 효율적이라고 한다!

"Processes are conventionally limited to only have access to their own process memory space but shared memory permits the sharing of data between processes, avoiding the need to instead send messages between processes containing that data. Sharing data directly via memory can provide significant performance benefits compared to sharing data via disk or socket or other communications requiring the serialization/deserialization and copying of data."

아래 링크 글에서 가져온 글이다. 맞는 번역일지는 모르겠지만, 일반적으로 프로세스는 자신의 메모리 공간에만 액세스 할 수 있도록 제한되어 있지만 공유 메모리는 프로세스 간 데이터를 공유 허용하기 때문에 메시지를 보낼 필요가 없다고 한다.
그래서 다른 통신에 비해 상당한 이점을 얻을 수 있다고 한다!
https://superfastpython.com/multiprocessing-sharedmemory/#How_to_Use_SharedMemory

2. SharedMemory 사용하기

2-1. sharedmemory 생성하기

# 공유 메모리 생성
shared_mem = SharedMemory(name = 'EXAM_SharedMemory', size = 1024, create = True)

SharedMemory클래스의 인스턴스를 생성할 때, 사용한 매개변수는 다음과 같은 의미를 가진다.

  • name: 공유 메모리 객체의 이름입니다. 이 이름을 사용하여 다른 프로세스에서 동일한 공유 메모리 객체에 접근할 수 있다.
  • size: 공유 메모리 영역의 크기입니다. 이 크기는 바이트 단위로 지정된다.
  • create: 이 매개변수가 True로 설정되면 새로운 공유 메모리 객체를 생성한다. 이미 존재하는 이름을 사용하여 객체를 생성하려고 하면 에러가 발생할 수 있다.

2-2. sharedmemory 데이터 Read/Write

# 공유 메모리 데이터 쓰기
data = b'Hello My name is Youngho'
shared_mem.buf[ : len(data) + 1] = data

# 공유 메모리 데이터 읽기
size = shared_mem.size
data = bytes(shared_mem.buf[ : size]).decode('utf-8').rstrip('\x00')
'''

'shared_mem.buf'는 SharedMemory 인스턴스에서 사용할 수 있는 속성으로, 공유 메모리의 버퍼에 대한 직접적인 접근을 제공한다.
'shared_mem.buf'는 'memoryview'타입의 객체를 반환한다. 'memoryview'객체는 Python 내장 타입으로, 바이너리 데이터 메모리를 보는 방식을 제공한다.
이를 통해 배열이나 바이트와 같은 바이너리 데이터 구조체를 복사하지 않고도 메모리를 직접 조작할 수 있게 된다.

  • 데이터 쓰기 : 'buf'에 바이트 데이터를 할당함으로, 공유 메모리에 데이터를 쓸 수 있다. 위 코드는 공유 메모리에 data byte 크기만큼 데이터를 넣어주는 것이다.
  • 데이터 읽기 : 'buf'에서 데이터를 읽기 위해 슬라이싱이 가능하다. 위 그림과 같이 [:size]는 size 크기만큼 공유 메모리의 바이트를 읽어 'data'에 저장한다.

2-3. sharedmemory close

# 공유 메모리 닫기
shared_mem.close()

공유 메모리에 대한 액세스를 닫는다. 리소스 관리를 위해 인스턴스가 더 이상 필요하지 않다면 모든 인스턴스가 close()를 호출해줘야한다.
close()를 해주었다고 공유 메모리 블록 자체가 파괴되고 사라지는 것은 아니다. 공유 메모리를 파괴하고 싶다면 아래와 같은 코드를 작성해주면 된다.

# 공유 메모리 파괴하기
shared_mem.unlink()

이를 토대로 내가 작성한 멀티 프로세스 통신 예제이다.

2-4. Shared Memory Write Process Example

from multiprocessing.shared_memory import SharedMemory
from multiprocessing import Process

FLAG_OFFSET = 0
DATA_OFFSET = 1

def task(shared_mem, num) :

    data1 = b'situation1'
    data2 = b'situation2'
    data3 = b'situation3'
    data4 = b'situation4'

    if num == 1 and shared_mem.buf[FLAG_OFFSET] == 0 :
        shared_mem.buf[DATA_OFFSET : len(data1) + 1] = data1
        shared_mem.buf[FLAG_OFFSET] = 1

    elif num == 2 and shared_mem.buf[FLAG_OFFSET] == 0 :
        shared_mem.buf[DATA_OFFSET : len(data1) + 1] = data2
        shared_mem.buf[FLAG_OFFSET] = 1

    elif num == 3 and shared_mem.buf[FLAG_OFFSET] == 0 :
        shared_mem.buf[DATA_OFFSET : len(data1) + 1] = data3
        shared_mem.buf[FLAG_OFFSET] = 1

    elif num == 4 and shared_mem.buf[FLAG_OFFSET] == 0 :
        shared_mem.buf[DATA_OFFSET : len(data1) + 1] = data4
        shared_mem.buf[FLAG_OFFSET] = 1

def run() :
    shared_mem = SharedMemory(name = 'EXAM_SharedMemory', size = 1024, create = True)
    shared_mem.buf[FLAG_OFFSET] = 0
    
    while True :
        ip = int(input('num = '))

        if ip == 9 :
            break
        
        task(shared_mem, ip)

    shared_mem.close()
    shared_mem.unlink()


if __name__ == '__main__' :
    run()

2-5. Shared Memory Read Process

from multiprocessing.shared_memory import SharedMemory
import setting_situation
import time

FLAG_OFFSET = 0
DATA_OFFSET = 1

condition = setting_situation.Situation()

def read_task() :
    shared_mem = SharedMemory(name = 'EXAM_SharedMemory')

    while True :
        if shared_mem.buf[FLAG_OFFSET] == 1 :
            size = shared_mem.size - 1
            data = bytes(shared_mem.buf[DATA_OFFSET : size]).decode('utf-8').rstrip('\x00')

            if data == 'situation1' :
                #condition.play_situation_1()
                print("11")
                time.sleep(10)

            elif data == 'situation2' :
                #condition.play_situation_2()
                print("22")
                time.sleep(10)

            elif data == 'situation3' :
                #condition.play_situation_3()
                print("33")
                time.sleep(10)

            elif data == 'situation4' :
                #condition.play_situation_4()
                print("44")
                time.sleep(10)

            shared_mem.buf[FLAG_OFFSET] = 0

        time.sleep(0.01)

if __name__ == '__main__' :
    read_task()

'SharedMemory'에서 주의할 점은 공유 메모리에 아무것도 쓰이지 않았는데 읽으려 하면 오류가 난다는 것이다.
그래서 나는 'FLAG'처리를 해주어 데이터가 있음을 알리는 방법을 사용하였다. 아직 main process가 닫히면 알리는 코드를 작성하지는 않았지만 같은 'FLAG'처리 방법으로 작성해줄 생각이다. 아주 간단하고 1:1 통신이라 따로 세마포어나 뮤택스는 넣어놓지 않았다!

3. 회고

아직 많이 멀었지만 기본적으로 예제를 작성해보며 살짝 공부해봤지만 thread와는 다른 느낌이라 새로웠다.
thread공부하면서 겉핥기식으로 배운 process가 많이 도움이 되었다! 아무 책이라도 읽어 놓으면 도움이 된다는 아버지의 말씀이 생각나는 하루였다..😁

0개의 댓글