저번 포스팅에서 쓰레드에 대해서 알아보았다. 쓰레드를 사용해서 두개의 쓰레드를 동작시켰을 때 원하는 출력값은 얻을 수 있었지만 프로그램 동작 시간에서는 크게 차이를 보이지 않았다. 그러한 동작 속도부분을 개선시키기 위해서 이번에는 process에 대해서 학습해보자. 쓰레드가 동시성을 띄고있다면, 프로세스는 병렬성을 띄고 있다고 볼 수 있다. 멀티 프로세스 프로그램은 각각 별도의 메모리 영역을 가지고 여러 작업을 동시에 나눠서 처리할 수 있다. 프로세스를 생성하고 사용한다는 것은 GIL을 피하고 프로세스 별로 별도의 메모리 영역을 가지며, 큐, 파이프 파일 등을 이용한 프로세스 간 통신(IPC - Inter-process communication)과 같은 방법으로 통신을 구현할 수 있다는 것을 의미한다. 즉,하나의 프로세스에서 작업하던 것을 두배이상 빠르게 처리할 수 있다.
multiprocessing은 두 가지 유형의 프로세스 간 통신 채널을 지원한다.
여러 생산자와 소비자를 허용
프로세스의 thread와 process를 안전하게 만든다.
Exception
두 프로세스 간의 연결
queue
보다 빠르다.from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn, ))
p.start()
print(parent_conn.recv())
p.join()
>>>
[42, None, 'hello']
Pipe()
가 리턴하는 parent_conn
과 child_conn
은 파이프의 두 끝을 나타낸다.
send()
와 recv()
메소드로 데이터를 교환할 수 있다.
세마포어 객체를 구현한다. 주어진 value가 0보다 작으면 ValueError가 발생한다.
acquire(blocking=True, timeout=None)
세마포어를 획득한다.
인자 없이 호출될 때:
진입시 내부 카운터가 0보다 크면, 1 감소시키고 즉시 True를 반환한다.
진입시 내부 카운터가 0이면, release()를 호출하여 깨울 때 까지 블록한다. 일단 깨어나면 (카운터가 0보다 크면), 카운터를 1 줄이고 True를 반환한다.
release()를 호출할 때 마다 정확히 하나의 스레드가 깨어난다. 스레드가 깨어나는 순서에 의존해서는 안된다.
False로 설정한 blocking으로 호출하면 블록 하지 않는다. 인자가 없는 호출이 블록 할 것이라면, 즉시 False를 반환한다. 그렇지 않으면 인자 없이 호출할 때와 같은 작업을 수행하고 True를 반환한다.
None 이외의 timeout으로 호출하면, 최대 timeout 초 동안 블록 한다. 그 사이 획득이 완료되지 않으면, False를 반환한다. 완료되면 True를 반환한다.
release()
내부 카운터를 1 증가시키면서 세마포어를 해제한다. 진입 시 0이고 다른 스레드가 다시 0보다 커리기를 기다리고 있으면, 해당 스레드를 깨운다.
Mutex vs Semaphore
뮤텍스를 공부하다보니 세마포어라는 개념도 자꾸 등장하여, 함께 정리를 해놓으려한다.
뮤텍스란(Mutex)?
뮤텍스 객체를 두 쓰레드가 동시에 사용할 수 없다.
세마포어란?(Semaphore)
세마포어는 운영체제의 리소스를 경쟁적으로 사용하는 다중 프로세스에서 행동을 조정하거나 또는 동기화 시키는 기술.
1) Semaphore는 Mutex가 될 수 있지만 Mutex는 Semaphore가 될 수 없다.
(Mutex 는 상태가 0, 1 두 개 뿐인 binary Semaphore)
2) Semaphore는 소유할 수 없는 반면, Mutex는 소유가 가능하며 소유주가 이에 대한 책임을 진다. (Mutex 의 경우 상태가 두개 뿐인 lock 이므로 lock 을 ‘가질’ 수 있다.)
3) Mutex의 경우 Mutex를 소유하고 있는 쓰레드가 이 Mutex를 해제할 수 있다. 하지만 Semaphore의 경우 이러한 Semaphore를 소유하지 않는 쓰레드가 Semaphore를 해제할 수 있다.
4) Semaphore는 시스템 범위에 걸쳐있고 파일시스템상의 파일 형태로 존재한다. 반면 Mutex는 프로세스 범위를 가지며 프로세스가 종료될 때 자동으로 Clean up된다.
★★★ 가장 큰 차이점은 관리하는 동기화 대상의 갯수이다. Mutex는 동기화 대상이 오직 하나뿐일 때, Semaphore는 동기화 대상이 하나 이상일 때 사용한다.
출처 : Mutex와 Semaphore의 차이
class multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0)
새로운 공유 메모리 블록을 만들거나 기존 공유 메모리 블록에 연결할 수 있다. 각 공유 메모리에는 이름을 지정할 수 있으며, 같은 이름을 사용하는 공유 메모리 블록에 연결할 수 있다.
한 프로세스가 더 이상 공유 메모리 블록에 대한 액세스를 필요로하지 않으면 close() 메서드를 호출해야 한다. 어떤 프로세스에서도 공유 메모리 블록이 필요하지 않으면, 정리를 위해 unlink() 메서드를 호출해야 한다.
name
: 문자열로 지정된 공유 메모리의 이름이다.
create
: 새로운 공유 메모리 블록을 만들지(True), 기존의 블록에 연결할지(Flase) 결정한다.
size
: 새로운 공유 메모리 블록을 만들 때 요청된 바이트 수를 지정한다. 그렇게 생성된 블록의 사이즈는 요청한 크기와 정확히 같거나 더 클 수도있다. 기존 메모리 블록에 연결할 때는 무시된다.
close()
: 공유 메모리에 대한 액세스를 닫는다. close()를 호출해도 공유 메모리 블록은 파괴되지 않는다.
unlink()
: 메모리 블록이 삭제되도록 요청한다. 공유 메모리 블록에 접근하는 모든 프로세스들 중 하나가 단 한번만 호출해야 한다. 파괴를 요청한 후, 즉시 파괴될 수도 있고 그렇지 않을 수도 있다.
buf
: 공유 메모리 블록 내용에 대한 메모리 뷰
name
: 공유 메모리 블록의 고유한 이름에 대한 읽기 전용 액세스
size
: 공유 메모리 블록 크기(바이트)에 대한 읽기 전용 액세스
위의 코드 기반으로 IPC방식 중 공유 메모리 방식으로 두개의 프로세스로 50000000씩 증가시키고 공유 메모리를 사용해서 세마포어로 동기화 시켜 최종 값으로 1억을 만드는 코드를 구현해 보세요.
from multiprocessing import Process, shared_memory, Semaphore
import numpy as np
import time
def worker(id, number, new_array, shm, sema):
increased_number = 0
for i in range(number):
increased_number += 1
sema.acquire()
ex_shm = shared_memory.SharedMemory(name=shm)
b = np.ndarray(new_array.shape, dtype=new_array.dtype, buffer=ex_shm.buf)
b[0] += increased_number
sema.release()
if __name__ == "__main__":
start_time = time.time()
sema = Semaphore(1) # semaphore 객체 생성
new_array = np.array([0]) # 1차원 numpy 배열 생성
shm = shared_memory.SharedMemory(create=True, size=new_array.nbytes)
c = np.ndarray(new_array.shape, dtype=new_array.dtype, buffer=shm.buf)
th1 = Process(target=worker, args=(1, 50000000, new_array, shm.name, sema))
th2 = Process(target=worker, args=(2, 50000000, new_array, shm.name, sema))
th1.start()
th2.start()
th1.join()
th2.join()
print("--- %s seconds ---" % (time.time() - start_time))
print("total_number=",end=""), print(c[0])
print("end of main")
shm.close()
shm.unlink()
쓰레드를 사용했을 때보다 확실히ㅂ 빨라진 결과를 확인할 수 있다.😊
멀티 프로세스 : https://docs.python.org/3/library/multiprocessing.html
공유 메모리 : https://docs.python.org/3/library/multiprocessing.shared_memory.html
세마포어 : https://docs.python.org/3/library/threading.html#semaphore-objects