[multiprocessing] shared memory

About_work·2024년 3월 17일
0

process, thread

목록 보기
21/23

1. 장단점/사용목적

1.1. 사용 시나리오

  • 대량의 데이터(예: numpy 배열)를 프로세스 간에 공유할 때 사용

1.2. 장점

  • 메모리를 직접 공유하기 때문에 대량의 데이터를 빠르게 전달할 수 있음.
  • 데이터 복사 과정이 없어, 다른 IPC 메커니즘에 비해 효율적

1.3. 단점

  • 동시성 제어(comcurrency control)를 개발자가 직접 관리해야 함
    • 예를 들어, 뮤텍스(mutexes)나 세마포어(semaphores)를 사용해야 할 수도 있음
  • 데이터 구조가 고정되어야 하며, 동적인 데이터 구조에는 적합하지 않을 수 있음
  • 네트워크를 통해 다른 머신들과 array를 공유할 때에는 작동하지 않는다.
    • 이럴 때는 redis나 다른 기술에 의존해야 한다.

1.4. 지원 가능한 데이터 type

  • 바이트-호환 데이터 (bytes, bytearray 등)
  • array.array, numpy.array 같은 배열 데이터
  • C의 구조체를 모방한 ctypes 모듈의 데이터 구조

2. 사용법

SharedMemory(name=None, create=False, size=0)

2.1. 본문

  • 새 공유 메모리 블록을 만들거나 기존 공유 메모리 블록에 연결
  • 각 공유 메모리 블록에는 고유한 이름이 지정
  • 이런 식으로, 하나의 프로세스가 특정 이름을 가진 공유 메모리 블록을 생성 할 수 있으며,
    • 다른 프로세스가 같은 이름을 사용하여 같은 공유 메모리 블록에 연결할 수 있음
  • 공유 메모리 블록은 생성한 원래 프로세스보다 오래갈 수 있음
  • close() 메서드
    • 한 프로세스가, 더는 다른 프로세스가 필요로 할 수도 있는 공유 메모리 블록에 대한 액세스를 필요로하지 않으면
  • unlink()
    • 어떤 프로세스에서도 공유 메모리 블록이 더는 필요하지 않으면, 적절한 정리를 위해

2.2. parameter 소개

  • name
    • 문자열로 지정된 요청된 공유 메모리의 고유한 이름
    • 새 공유 메모리 블록을 만들 때, 이름에 None(기본값)이 제공되면, 새로운 이름이 생성
  • create
    • 새 공유 메모리 블록을 만들지(True), 또는 기존 공유 메모리 블록을 연결할지(False)를 제어
  • size
    • 새 공유 메모리 블록을 만들 때 요청된 바이트 수를 지정
    • 일부 플랫폼은, 해당 플랫폼의 메모리 페이지 크기를 기반으로 메모리 덩어리를 할당하기 때문에, 공유 메모리 블록의 정확한 크기는 요청한 크기보다 크거나 같을 수 있음
    • 기존 공유 메모리 블록에 연결할 때는, size 매개 변수가 무시
  • 새로 공유 메모리 생성할 때
    create=True / size=크기
  • 기존 메모리에 붙일 때
    • 기존 메모리의 name을 명시해 주어야 함.
from multiprocessing import Process, shared_memory, Semaphore
import numpy as np
import time
from typing import Tuple


def worker(id: int, number: int, np_array: np.ndarray,
           shm: shared_memory.SharedMemory, sam: Semaphore) -> None:
    """작업자 프로세스에서 실행되는 함수.

    Args:
        id (int): 작업자 ID.
        number (int): 반복할 숫자.
        np_array (np.ndarray): 공유할 NumPy 배열.
        shm (shared_memory.SharedMemory): 공유 메모리 객체.
        sam (Semaphore): 세마포어 객체.
    """
    b = np.ndarray(np_array.shape, dtype=np_array.dtype, buffer=shm.buf)
    sam.acquire()
    for i in range(number):
        b[0] += 1
    sam.release()


if __name__ == "__main__":
    sam: Semaphore = Semaphore(1)
    np_array: np.ndarray = np.array([0])
    start_time: float = time.time()
    shm: shared_memory.SharedMemory = shared_memory.SharedMemory(
        create=True, size=np_array.nbytes)

    b: np.ndarray = np.ndarray(np_array.shape,
                               dtype=np_array.dtype,
                               buffer=shm.buf)
    th1: Process = Process(target=worker,
                           args=(1, 50000000, np_array, shm, sam))
    th2: Process = Process(target=worker,
                           args=(2, 50000000, np_array, shm, sam))

    th1.start()
    th2.start()
    th1.join()
    th2.join()

    # print(b[0], 123)

    # existing_shm.close()

    print("--- %s seconds ---" % (time.time() - start_time))
    print(f"sheard memory : {b[0]}")
    shm.close()
    shm.unlink()

    print("total_number=", end="")
    print("end of main")
import numpy as np
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
from typing import Tuple, Any


def modify_shared_memory(name: str, shape: Tuple[int, ...], dtype: Any) -> None:
    """공유 메모리에 접근하여 NumPy 배열을 수정하는 함수.

    Args:
        name (str): 공유 메모리의 이름.
        shape (Tuple[int, ...]): NumPy 배열의 형태.
        dtype (Any): NumPy 배열의 데이터 타입.
    """
    # 공유 메모리에 접근합니다.
    existing_shm = SharedMemory(name=name)
    # 공유 메모리를 NumPy 배열로 매핑합니다.
    np_array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
    # NumPy 배열을 수정합니다. 모든 변경사항은 공유됩니다.
    np_array += 1
    # 공유 메모리 연결을 종료합니다.
    existing_shm.close()


if __name__ == '__main__':
    # 원본 NumPy 배열을 생성합니다.
    original_array: np.ndarray = np.arange(10, dtype=np.float64)
    print("Original array:", original_array)
    # 공유 메모리를 생성합니다.
    shm: SharedMemory = SharedMemory(create=True, size=original_array.nbytes)
    # 공유 메모리를 NumPy 배열로 매핑합니다.
    shared_array: np.ndarray = np.ndarray(original_array.shape,
                                          dtype=original_array.dtype,
                                          buffer=shm.buf)
    shared_array[:] = original_array  # 데이터를 복사합니다.

    # 공유 메모리를 수정하는 프로세스를 생성하고 시작합니다.
    p: Process = Process(target=modify_shared_memory,
                         args=(shm.name, original_array.shape,
                               original_array.dtype))
    p.start()
    p.join()

    # 수정된 공유 메모리 내용을 확인합니다.
    print(shared_array)
    # 공유 메모리를 정리합니다.
    shm.close()
    shm.unlink()

5.4.5. Semaphores / Mutexes

  • multiprocessing 모듈을 사용하여 SharedMemory를 이용할 때, 뮤텍스(mutexes)나 세마포어(semaphores)와 같은 동기화 메커니즘을 사용하여 프로세스 간의 데이터 접근을 동기화할 수 있음
  • 이러한 동기화 기법은 데이터 경쟁(두 개 이상의 프로세스가 동시에 공유 데이터를 수정하려 할 때 발생하는 문제)을 방지하는 데 중요
5.4.5.1. 세마포어(Semaphores)
  • 세마포어는 동시에 리소스에 접근할 수 있는 프로세스의 수를 제한하는 방법으로 사용
  • multiprocessing 모듈에서는 Semaphore를 사용하여 이를 구현할 수 있음
5.4.5.2. 뮤텍스(Mutexes)
  • 뮤텍스는 세마포어의 특별한 경우로, 동시에 한 프로세스만이 리소스에 접근할 수 있게 함
  • Python에서는 Lock을 사용하여 뮤텍스를 구현할 수 있음
5.4.5.3. 예제 코드
from multiprocessing import Process, Semaphore, shared_memory
import numpy as np
import time

def worker(shm_name, sem: Semaphore):
    # 공유 메모리 접근
    shm = shared_memory.SharedMemory(name=shm_name)
    array = np.ndarray((10,), dtype=np.int64, buffer=shm.buf)
    
    with sem:  # 세마포어로 동기화된 블록
        # 공유 메모리 데이터 수정
        for i in range(10):
            array[i] += 1
        print("Data updated by process:", array[:])
    
    # 공유 메모리 정리 (여기서는 생략)

if __name__ == "__main__":
    # 초기 데이터 준비
    original_data = np.arange(10, dtype=np.int64)
    shm = shared_memory.SharedMemory(create=True, size=original_data.nbytes)
    array = np.ndarray((10,), dtype=np.int64, buffer=shm.buf)
    array[:] = original_data

    # 세마포어 생성
    sem = Semaphore(1)  # 동시에 하나의 프로세스만 접근 허용

    # 프로세스 생성 및 시작
    processes = [Process(target=worker, args=(shm.name, sem)) for _ in range(2)]
    for p in processes:
        p.start()
    
    for p in processes:
        p.join()
    
    # 최종 데이터 출력
    print("Final data in shared memory:", array[:])

    # 공유 메모리 정리
    shm.close()
    shm.unlink()

  • Python의 multiprocessing.shared_memory.SharedMemory 클래스는
  • 공유 메모리 세그먼트를 생성할 때, 명시적으로 크기를 지정하지 않으면 내부적으로 정해진 기본 크기를 사용합니다.
  • 이 기본 크기는 운영 체제와 Python의 구현에 따라 달라질 수 있으며, 특정 환경에서 16384 (16KB)로 설정되어 있을 수 있습니다.
  • 공유 메모리 객체를 생성할 때 size 매개변수에 직렬화된 데이터의 길이를 전달하고 있지만,
  • 실제로 공유 메모리 세그먼트의 최소 할당 크기는 시스템의 페이지 크기나 다른 내부 정책에 의해 결정될 수 있습니다.
  • 많은 운영 체제에서는 메모리를 페이지 단위로 관리하며, 이 페이지의 크기는 보통 4KB 또는 그 이상일 수 있습니다.
  • 따라서 요청된 크기가 이 페이지 크기보다 작더라도, 실제 할당되는 메모리 크기는 페이지 크기 또는 여러 페이지의 크기가 될 수 있습니다.

  • SharedMemory 객체가 생성될 때, 요청된 크기보다 큰 메모리 세그먼트가 할당되는 경우,
    할당된 실제 크기는 size 속성을 통해 확인할 수 있습니다.
  • 이 경우, 할당된 공유 메모리의 크기는 운영 체제와 Python 구현의 내부 정책에 따라 결정되며, 특정 환경에서는 이 크기가 16384로 고정될 수 있습니다.
  • 이러한 행동은 특히 직렬화된 데이터의 크기가 다양하더라도 관찰될 수 있으며, 공유 메모리 사용을 위한 최소 요구 사항을 충족하기 위한 시스템의 방식입니다.

  • 16384 바이트보다 큰 데이터를 저장하려는 경우,
  • 예를 들어, 직렬화된 데이터의 크기가 20000 바이트라면, SharedMemory 객체의 size 매개변수로 20000을 지정하게 됩니다.
  • 이 경우, 운영 체제는 최소한 20000 바이트를 저장할 수 있는 공유 메모리 세그먼트를 할당하게 됩니다.
  • 실제 할당된 공유 메모리 세그먼트의 크기(existing_shm.size)는 20000 바이트 이상이 될 것이며, 이는 운영 체제의 페이지 크기와 메모리 관리 정책에 따라 결정됩니다.

  • 운영 체제는 메모리를 페이지 단위로 관리하기 때문에, 요청된 크기가 페이지 크기의 정수 배가 아닐 경우,
  • 실제 할당된 크기는 요청된 크기를 포함할 수 있는 가장 작은 페이지 크기의 배수가 됩니다.
  • 예를 들어, 페이지 크기가 4096 바이트이고 요청된 크기가 20000 바이트라면,
  • 할당된 공유 메모리의 크기는 20480 바이트(4096 * 5)가 될 수 있습니다.
  • 이는 existing_shm.size를 통해 확인할 수 있습니다.
profile
새로운 것이 들어오면 이미 있는 것과 충돌을 시도하라.

0개의 댓글