이전 포스팅은 Thread에 대해 다루었는데요! 2개의 스레드를 만들었지만 시간이 단축되지 않고 1억까지 증가되지 않았습니다. 하지만 병렬로 두개의 프로세스를 만들어 병렬의 프로그래밍 방식으로 구현하면 시간을 단축할 수 있는데요!
프로세스를 만들면 각 프로세스별로 별도의 메모리 영역을 가지며 queue
나 pipe
, shared memory
를 이용한 IPC
(Inter-process-communication_프로세스간 통신)과 같은 방법으로 객체들의 교환을 구현할 수 있습니다. '멀티 프로세스 프로그램'은 각각 별도의 메모리 영역을 가지고, 여러 작업을 동시에 나눠서 처리할 수 있습니다. 그리고 병렬로 작업을 처리하니 하나의 프로세스에서 작업하던 것을 두배 이상 빠르게 할 수 있습니다.
다음의 코드는 thread를 생성했던 것처럼 multiprocessing
패키지에서 process
클래스로부터 객체를 만들어 각각의 프로세스에 50000000씩 증가킨 후 q
에 삽입해서 최종으로 1억까지 증가시키는 코드입니다.
from multiprocessing import Process, Queue
import time
def worker(id, number, q):
increased_number = 0
for i in range(number):
increased_number += 1
q.put(increased_number)
return
if __name__ == "__main__":
start_time = time.time()
q = Queue()
th1 = Process(target=worker, args=(1, 50000000, q))
th2 = Process(target=worker, args=(2, 50000000, q))
th1.start()
th2.start()
th1.join()
th2.join()
print("--- %s seconds ---" % (time.time() - start_time))
q.put('exit')
total = 0
while True:
tmp = q.get()
if tmp == 'exit':
break
else:
total += tmp
print("total_number=",end=""), print(total)
print("end of main")
두개의 쓰레드를 사용해서 구현했을 때보다 두개의 프로세스로 병렬 프로그래밍을 구현하면 훨씬 더 빠르게 진행됨을 알 수 있습니다.
프로세스의 IPC 방법은 앞에서 설명했듯, queue
, pipe
, shared memory
등이 있는데요. 파이썬의 multiprocessing은 queue
와 pipe
두 가지 유형의 IPC를 지원하며 각각 queue()나 pipe()로 호출하면 됩니다.
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue() # queue
p = Process(target=f, args=(q,))
p.start()
p.join()
print(q.get())
# "[42, None, 'hello']"
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
p.join()
print(parent_conn.recv())
# "[42, None, 'hello']"
스레드는 프로세스를 나눈 단위이며 공유 데이터를 사용할 때 제약이 없지만, 프로세스는 독집적인 메모리 공간을 가지기 때문에 shared_memory
를 사용합니다. 공유 메모리를 사용하면 메모리의 일부 공간을 각각의 독립적인 프로세스에서 공유하고, 해당 메모리를 통해 데이터를 주고받을 수 있습니다.
semaphore
는 지정된 변수만큼의 프로세스 혹은 쓰레드가 자원에 접근할 수 있도록 하는 방법입니다. 공유 메모리에 여러 프로세스가 동시에 사용하기 시작하면 데이터가 손상될 수도 있습니다. 그래서 여러 프로세스 사이에 동작의 순서를 지정해주어야 하는데 이 때 semaphore
가 사용됩니다.
Mutex(상호배제)는 오직 하나의 프로세스 혹은 쓰레드만 자원에 접근하도록 하지만 Semaphore는 여러 대상을 처리할 수 있고 공유 메모리라는 임계 영역을 사용할 수있도록 합니다. 그리고 Lock
을 사용하지 않기에 현재 수행하지 않는 다른 프로세스가 semaphore를 해제할 수 있습니다.
다음의 코드는 queue를 통한 IPC를 구현한 코드를 shared_memory
방식과 semaphore
를 사용한 프로그램입니다.
from multiprocessing import Process, shared_memory, Semaphore
import numpy as np
import time
def worker(id, number, shm, arr, sem):
#매개 변수 : 프로세스명, 숫자, 공유메모리, 공유메모리를 저장할 배열, 세마포어 객체
increased_number = 0
for i in range(number):
increased_number += 1
sem.acquire()
# 기존 공유 메모리 블록에 연결
existing_shm = shared_memory.SharedMemory(name=shm)
# numpy 배열 형태에 맞게 변환
tmp_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=existing_shm.buf)
# 각각의 프로세스에서 연산한 값을 합해서 numpy 배열에 저장
tmp_arr[0] += increased_number
sem.release() # mutex와 동일
if __name__ == "__main__":
start_time = time.time()
sem = Semaphore()
# 숫자를 저장할 numpy 배열 생성
arr = np.array([0])
# 공유 메모리 생성
shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)
# 공유 메모리의 버퍼를 numpy 배열로 변환
np_shm = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
th1 = Process(target=worker, args=(1, 50000000, shm.name, np_shm, sem))
th2 = Process(target=worker, args=(2, 50000000, shm.name, np_shm, sem))
th1.start()
th2.start()
th1.join()
th2.join()
print("--- %s seconds ---" % (time.time() - start_time))
#프로세스에서 계산된 값을 저장한 np_shm 배열 출력
print("total_number=",end=""), print(np_shm[0])
print("end of main")
# 공유 메모리 사용 종료
shm.close()
# 공유 메모리 블록 삭제
shm.unlink()