[Python] 프로세스 (Process)

hukim·2020년 9월 20일
2

Python

목록 보기
9/12
post-custom-banner


이전 포스팅에서 thread를 두개 만들고 각각 동기화했지만 시간이 크게 단축되지 않았습니다.
결국에는 동시에 두 가지의 작업을 하지는 못한 것입니다.

Process

하지만 프로세스를 만들면 프로세스 별로 각각 별도의 메모리 영역을 가지게 되며 큐, 파이프 파일 등을 이용한 프로세스 간 통신(IPC, Inter-Process Communication)과 같은 방법으로 통신을 구현할 수 있습니다.

또한 멀티 프로세스 프로그램은 별도의 메모리 영역을 가지고 있기 때문에 여러 개의 작업을 나누어서 처리할 수 있습니다. 작업을 병렬로 나눠서 작업하면 하나의 프로세스 때보다 두배이상 빠르게 작업할 수 있습니다.

from multiprocessing import Process, Queue
import time

def worker(id, number, q):
    increased_number = 0

    for i in range(number):
        increased_number += 1
    
    q.put(increased_number)

    return


if __name__ == "__main__":

    start_time = time.time()
    q = Queue()

    th1 = Process(target=worker, args=(1, 50000000, q))
    th2 = Process(target=worker, args=(2, 50000000, q))

    th1.start()
    th2.start()
    th1.join()
    th2.join()


    print("--- %s seconds ---" % (time.time() - start_time))
    q.put('exit')

    total = 0
    while True:
        tmp = q.get()
        if tmp == 'exit':
            break
        else:
            total += tmp

    print("total_number=",end=""), print(total)
    print("end of main")

위의 프로그램은 각각의 프로세스에서 공통의 큐를 사용해서 증가시킨 값을 넣고 메인 쓰레드에서 다시 꺼내온 후 합해서 총 증가시킨 값을 구하는 프로그램입니다.

이전 포스팅에서는 약 11초 정도 걸렸었는데 멀티 프로세스를 이용하니까 시간이 확실히 두배이상 단축되었습니다.

공유 메모리

class multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0)
  • name : 공유 메모리의 이름
  • create : 공유 메모리 블록을 만들지(True) 기존 공유 메모리 블록을 연결할지(False)의 여부 선택
  • size : 새로운 공유 메모리를 만들 때 요청된 바이트 수를 지정

이번에는 프로세스의 IPC방식 중 공유 메모리 방식으로 위의 프로그램을 수정해보겠습니다.

from multiprocessing import Process, Semaphore, shared_memory
import numpy as np
import time


def worker(id, number, a, shm, serm):
    num = 0 
    for i in range(number):
        num += 1
        
    serm.acquire() #세마포어로 공유 메모리에 프로세스 한 개만 접근하도록 제한
    exst_shm = shared_memory.SharedMemory(name=shm) # 공유 메모리에 연결
    
    b = np.ndarray(a.shape, dtype=a.dtype, buffer=exst_shm.buf) #공유 메모리에 배열 생성
    b[0] += num # 최종 결과를 배열에 저장
    
    serm.release()

if __name__ == "__main__":
    serm = Semaphore(1)
    start_time = time.time()

    a = np.array([0])
    shm = shared_memory.SharedMemory(
        create=True, size=a.nbytes)  # 공유 메모리 블록 생성
    # 공유 메모리에 NumPy 배열을 만든다 > 프로세스에서 만든 NumPy 배열의 변경을 반영
    c = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
    th1 = Process(target=worker, args=(1, 50000000, a, shm.name, serm))
    th2 = Process(target=worker, args=(2, 50000000, a, shm.name, serm))

    th1.start()
    th2.start()
    th1.join()
    th2.join()

    print("--- %s seconds ---" % (time.time() - start_time))
    print("total_number=", end=""), print(c[0])
    shm.close()
    shm.unlink()
    print("end of main")

위의 큐를 사용한 코드와 비슷하게 결과를 얻을 수 있습니다.

post-custom-banner

0개의 댓글