[TIL - 11 / Python] process

haejun-kim·2020년 7월 27일
0

[Python]

목록 보기
18/19
post-thumbnail

process

저번 포스팅에서 쓰레드에 대해서 알아보았다. 쓰레드를 사용해서 두개의 쓰레드를 동작시켰을 때 원하는 출력값은 얻을 수 있었지만 프로그램 동작 시간에서는 크게 차이를 보이지 않았다. 그러한 동작 속도부분을 개선시키기 위해서 이번에는 process에 대해서 학습해보자. 쓰레드가 동시성을 띄고있다면, 프로세스는 병렬성을 띄고 있다고 볼 수 있다. 멀티 프로세스 프로그램은 각각 별도의 메모리 영역을 가지고 여러 작업을 동시에 나눠서 처리할 수 있다. 프로세스를 생성하고 사용한다는 것은 GIL을 피하고 프로세스 별로 별도의 메모리 영역을 가지며, 큐, 파이프 파일 등을 이용한 프로세스 간 통신(IPC - Inter-process communication)과 같은 방법으로 통신을 구현할 수 있다는 것을 의미한다. 즉,하나의 프로세스에서 작업하던 것을 두배이상 빠르게 처리할 수 있다.

multiprocessing

multiprocessing은 두 가지 유형의 프로세스 간 통신 채널을 지원한다.

큐(Queue)

여러 생산자와 소비자를 허용

프로세스의 thread와 process를 안전하게 만든다.

  • Queue(maxsize = 0) : FIFO Queue, max size는 큐에 배치될 수 있는 항목 수를 설정할 수 있다. max size가 0 보다 작거나 같으면, queue 크기는 무한하다
  • LifoQueue(maxsize=0) : LIFO Queue
  • PriorityQueue(maxsize=0) : 우선순위 queue, 가장 낮은 값을 갖는 항목이 먼저 꺼내진다. >sorted(list(entries)))[0]에 의해 리턴되는 항목
  • SimpleQueue : 상한 없는 FIFO Queue, 작업 추적과 같은 고급 기능이 없다.

Exception

  • Empty : get()이 비어있는 queue 객체에 호출될 때 발생
  • Full : put()이 가득 찬 queue객체에 호출될 때 발생
  • qsize(): queue의 크기 리턴
  • empty(): queue가 비어있으면 True 리턴
  • full(): queue가 가득차면 True 리턴
  • put(item, block=True, timeout=None): queue에 item을 넣는다
    • block이 참이고 timeout이 None(기본값) 이면, 사용 가능한 슬롯이 확보될 때까지 블록한다
    • timeout이 양수면, 최대 timeout초 동안 블록하고 그 시간 내에 사용 가능한 슬롯이 없으면 Full exception이 발생한다
    • block이 거짓이면, 빈 슬롯이 즉시 사용할 수 있으면 queue에 item을 넣고, 그렇지 않으면 Full exception이 발생한다 (timeout 무시된다)
  • put_nowait(item) : put(item,False)와 동일
  • get(block=True, timeout=None): queue에서 item을 제거하고 리턴한다
    • block이 참이고 timeout이 None이면, 항목이 사용 가능할 때 까지 블록한다
    • timeout이 양수면 최대 timeout 초 동안 블록하고 그 시간 내에 사용 가능한 항목이 없으면 empty exception이 발생한다
    • block이 거짓이면, 즉시 사용할 수 있는 항목이 있으면 반환하고, 그렇지 않으면 empty exception이 발생한다 (timeout 무시된다)
  • get_nowait() : get(False)와 동일
  • task_done() : queue에 넣은 작업이 완료되었음을 나타낸다. get()호출 후에 task_done()호출은 작업에 대한 처리가 완료되었음을 queue에 알려준다.
    • join()이 블로킹 중이면, 모든 항목이 처리되면(task_done()이 호출됨) 재개된다
  • join() : queue의 모든 항목을 꺼내서 처리할 때까지 블록한다
    • 완료되지 않은 작업 카운트는 항목이 queue에 추가될 때마다 올라간다
    • task_done()을 호출해서 작업이 모두 완료되었음을 나타내면 카운트가 내려간다
    • 작업 카운트가 0이 되면 join() 이 블록 해제된다

파이프(Pipe)

두 프로세스 간의 연결

  • 서로 연결된 한 쌍의 객체로 이루어지며, 한쪽에서 다른쪽으로 데이터를 보낼 수 있다.
  • queue보다 빠르다.
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn, ))
    p.start()
    print(parent_conn.recv())
    p.join()
    
>>>
[42, None, 'hello']

Pipe()가 리턴하는 parent_connchild_conn은 파이프의 두 끝을 나타낸다.
send()recv() 메소드로 데이터를 교환할 수 있다.

semaphore?

세마포어 객체를 구현한다. 주어진 value가 0보다 작으면 ValueError가 발생한다.

acquire(blocking=True, timeout=None)
세마포어를 획득한다.

인자 없이 호출될 때:

  • 진입시 내부 카운터가 0보다 크면, 1 감소시키고 즉시 True를 반환한다.

  • 진입시 내부 카운터가 0이면, release()를 호출하여 깨울 때 까지 블록한다. 일단 깨어나면 (카운터가 0보다 크면), 카운터를 1 줄이고 True를 반환한다.

  • release()를 호출할 때 마다 정확히 하나의 스레드가 깨어난다. 스레드가 깨어나는 순서에 의존해서는 안된다.

  • False로 설정한 blocking으로 호출하면 블록 하지 않는다. 인자가 없는 호출이 블록 할 것이라면, 즉시 False를 반환한다. 그렇지 않으면 인자 없이 호출할 때와 같은 작업을 수행하고 True를 반환한다.

  • None 이외의 timeout으로 호출하면, 최대 timeout 초 동안 블록 한다. 그 사이 획득이 완료되지 않으면, False를 반환한다. 완료되면 True를 반환한다.

  • release()
    내부 카운터를 1 증가시키면서 세마포어를 해제한다. 진입 시 0이고 다른 스레드가 다시 0보다 커리기를 기다리고 있으면, 해당 스레드를 깨운다.

Mutex vs Semaphore

뮤텍스를 공부하다보니 세마포어라는 개념도 자꾸 등장하여, 함께 정리를 해놓으려한다.

뮤텍스란(Mutex)?
뮤텍스 객체를 두 쓰레드가 동시에 사용할 수 없다.

세마포어란?(Semaphore)
세마포어는 운영체제의 리소스를 경쟁적으로 사용하는 다중 프로세스에서 행동을 조정하거나 또는 동기화 시키는 기술.

1) Semaphore는 Mutex가 될 수 있지만 Mutex는 Semaphore가 될 수 없다.
(Mutex 는 상태가 0, 1 두 개 뿐인 binary Semaphore)

2) Semaphore는 소유할 수 없는 반면, Mutex는 소유가 가능하며 소유주가 이에 대한 책임을 진다. (Mutex 의 경우 상태가 두개 뿐인 lock 이므로 lock 을 ‘가질’ 수 있다.)

3) Mutex의 경우 Mutex를 소유하고 있는 쓰레드가 이 Mutex를 해제할 수 있다. 하지만 Semaphore의 경우 이러한 Semaphore를 소유하지 않는 쓰레드가 Semaphore를 해제할 수 있다.

4) Semaphore는 시스템 범위에 걸쳐있고 파일시스템상의 파일 형태로 존재한다. 반면 Mutex는 프로세스 범위를 가지며 프로세스가 종료될 때 자동으로 Clean up된다.

★★★ 가장 큰 차이점은 관리하는 동기화 대상의 갯수이다. Mutex는 동기화 대상이 오직 하나뿐일 때, Semaphore는 동기화 대상이 하나 이상일 때 사용한다.

출처 : Mutex와 Semaphore의 차이

shared_memory?

class multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0)

새로운 공유 메모리 블록을 만들거나 기존 공유 메모리 블록에 연결할 수 있다. 각 공유 메모리에는 이름을 지정할 수 있으며, 같은 이름을 사용하는 공유 메모리 블록에 연결할 수 있다.

한 프로세스가 더 이상 공유 메모리 블록에 대한 액세스를 필요로하지 않으면 close() 메서드를 호출해야 한다. 어떤 프로세스에서도 공유 메모리 블록이 필요하지 않으면, 정리를 위해 unlink() 메서드를 호출해야 한다.

  • name : 문자열로 지정된 공유 메모리의 이름이다.

  • create : 새로운 공유 메모리 블록을 만들지(True), 기존의 블록에 연결할지(Flase) 결정한다.

  • size : 새로운 공유 메모리 블록을 만들 때 요청된 바이트 수를 지정한다. 그렇게 생성된 블록의 사이즈는 요청한 크기와 정확히 같거나 더 클 수도있다. 기존 메모리 블록에 연결할 때는 무시된다.

  • close() : 공유 메모리에 대한 액세스를 닫는다. close()를 호출해도 공유 메모리 블록은 파괴되지 않는다.

  • unlink() : 메모리 블록이 삭제되도록 요청한다. 공유 메모리 블록에 접근하는 모든 프로세스들 중 하나가 단 한번만 호출해야 한다. 파괴를 요청한 후, 즉시 파괴될 수도 있고 그렇지 않을 수도 있다.

  • buf : 공유 메모리 블록 내용에 대한 메모리 뷰

  • name : 공유 메모리 블록의 고유한 이름에 대한 읽기 전용 액세스

  • size : 공유 메모리 블록 크기(바이트)에 대한 읽기 전용 액세스


Assignment

위의 코드 기반으로 IPC방식 중 공유 메모리 방식으로 두개의 프로세스로 50000000씩 증가시키고 공유 메모리를 사용해서 세마포어로 동기화 시켜 최종 값으로 1억을 만드는 코드를 구현해 보세요.

from multiprocessing import Process, shared_memory, Semaphore
import numpy as np
import time

def worker(id, number, new_array, shm, sema):
    increased_number = 0
    for i in range(number):
        increased_number += 1
    sema.acquire()
    ex_shm = shared_memory.SharedMemory(name=shm)
    b = np.ndarray(new_array.shape, dtype=new_array.dtype, buffer=ex_shm.buf)
    b[0] += increased_number
    sema.release()
    

if __name__ == "__main__":

    start_time = time.time()
    sema = Semaphore(1) # semaphore 객체 생성
    new_array = np.array([0]) # 1차원 numpy 배열 생성
    shm = shared_memory.SharedMemory(create=True, size=new_array.nbytes)
    c = np.ndarray(new_array.shape, dtype=new_array.dtype, buffer=shm.buf)

    th1 = Process(target=worker, args=(1, 50000000, new_array, shm.name, sema))
    th2 = Process(target=worker, args=(2, 50000000, new_array, shm.name, sema))

    th1.start()
    th2.start()
    th1.join()
    th2.join()

    print("--- %s seconds ---" % (time.time() - start_time))
    print("total_number=",end=""), print(c[0])
    print("end of main")
    shm.close()
    shm.unlink()
  1. semaphore, numpy_array, shared_memory를 생성해준다.
  2. 세마포어로 공유 메모리의 데이터를 다른 프로세스가 접근하지 못하게 block 처리한다.
  3. 같은 공유 메모리 연결한다.(block)
  4. 공유 메모리 블록의 버퍼를 넘파이 배열로 변환한다.
  5. 5천만이라는 값을 증가시키고 연산 결과를 배열에 저장한다.
  6. 세마포어를 release해주어 다른 프로세스가 공유 메모리에 접근 가능하도록 한다.
  7. close, unlink로 공유 메모리에 대한 엑세스를 닫고 파괴요청을 한다.
  • 실행 결과

쓰레드를 사용했을 때보다 확실히ㅂ 빨라진 결과를 확인할 수 있다.😊


Reference

멀티 프로세스 : https://docs.python.org/3/library/multiprocessing.html

공유 메모리 : https://docs.python.org/3/library/multiprocessing.shared_memory.html

세마포어 : https://docs.python.org/3/library/threading.html#semaphore-objects

0개의 댓글