[TIL] Python | Multi Processing

fore0919·2022년 4월 11일
0
post-thumbnail

GIL(Global Interpreter Lock)
파이썬의 경우에는 GIL이 존재하여 한번에 하나의 쓰레드만 계산을 실행하기 때문에 자원 관리(가비지 컬렉션)는 쉬워졌지만 멀티 코어 부분에선 한계가 있다.
따라서, 멀티 쓰레드보다는 멀티 프로세스를 사용하는 것이 효율적이다.

1️⃣ Multi Processing


  • threading 모듈로 스레드를 생성 하는 방식과 비슷한 프로세스를 생성함
  • 장점 : 시간이 오래걸리는 작업을 별도의 프로세스를 생성 후 병렬 처리하기 때문에 응답 처리속도가 빠름
  • 단점 : 프로세스는 각각 고유한 메모리 영역을 따로 가지기 때문에 멀티 스레딩 방식에 비하면 메모리 사용이 늘어남


Multi Processing 구조 - https://sebastianraschka.com/Articles/2014_multiprocessing

🤔 사용 용도

Multiprocessing 은 CPU bound 작업에서 보다 더 유리한 면을 가지고 있다.
CPU bound 작업은 이미징 관련 작업, 복잡하거나 수학적인 계산이 많은 로직, 데이터 마이닝 과 같이 CPU 사용이 더욱 많은 작업을 말한다.

I/O bound한 작업은 비동기 프로그래밍을 위한 모듈인 asyncio모듈을 이용하는 것이 더욱 효율적이다.

python 에서 이 Multiprocessing 을 이용한 병렬 처리를 구현하는 방법에는 두가지가 있다.

  • Threading과 비슷하게 multiprocessing 모듈을 이용하는 방식
  • concurrent.futures를 이용하는 방법

2️⃣ multiprocessing 모듈 사용하기


multiprocessingthreading 모듈과 유사한 API를 사용하여 프로세스 스포닝(spawning)을 지원하는 패키지

프로세스 클래스

  • multiprocessing (모듈) 에서, 프로세스는 Process 객체를 생성한 후 start() 메서드를 호출해서 스폰한다. Process 는threading.Thread 의 API를 따른다.

  • join() 메서드는 스레드가 작업을 완료할 때까지 기다린다는 것을 의미

  • is_alive() : 생성된 프로세스가 아직 실행 중인지, 아닌지 여부 판단

  • kill() : 프로세스 종료

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',)) # Process 객체를 생성하고, f 함수를 할당하고 그 인자로 ('bob',)를 넘긴다
    p.start() # start()메소드를 호출해서 spawn함
    p.join() # 프로세스가 완전히 끝나길 기다림
    
>>> hello bob    

join() 메서드

#### 사용하지 않았을 때
def work5():
    print("Sub Process start")
    time.sleep(5) 
    print("Sub Process end")

if __name__ == "__main__":
    print("Main Process start")
    proc = mp.Process(name="Sub Process", target=work5)
    proc.start() 
    print("Main Process end")
    
>>> Main Process start
>>> Main Process end
>>> Sub Process start
>>> Sub Process end    
#### 사용 했을 때 
def work4():
    print("Sub Process start")
    time.sleep(5) 
    print("Sub Process end")

if __name__ == "__main__":
    print("Main Process start")
    proc = mp.Process(name="Sub Process", target=work4)
    proc.start()
    proc.join() 
    print("Main Process end")
    
>>> Main Process start
>>> Sub Process start
>>> Sub Process end
>>> Main Process end
# 메인 프로세스가 서브 프로세스의 작업이 끝나길 기다림 

메인 프로세스

  • current_process 함수를 호출하면 현재 실행되는 프로세스에 대한 정보를 담고있는 객체를 얻을 수 있음.

  • 해당 객체의 namepid 속성에 접근하면 프로세스의 이름과 PID(Process ID)를 얻을 수 있음

    • PID란 운영체제가 각 프로세스에게 부여한 고유 번호
    • 우선 순위를 조정하거나 종료할 때와 같이 다양한 용도로 사용된다
import multiprocessing as mp

if __name__ == "__main__":
    proc = mp.current_process()
    print(proc.name)
    print(proc.pid)
    
>>> MainProcess
>>> 70336

프로세스 스포닝 (Spawning)

  • 부모 프로세스(Parent Proecess)가 운영체제에 요청하여 자식 프로세스(Child Process)를 새로 만들어내는 과정을 스포닝이라고 부름

  • multiprocessing 모듈을 사용해 부모 프로세스가 처리할 작업이 많은 경우 자식 프로세스를 새로 만들어 일부 작업을 자식 프로세스에게 위임하여 처리함

  • Process 클래스의 인스턴스를 생성할 때 생성될 자식 프로세스의 이름위임 하고자 하는 일(함수)을 전달한 후 start( ) 메소드를 호출한다.

  • daemon = True 입력 시 메인 프로세스가 종료되면 서브 프로세스도 종료된다.

    • proc = mp.Process(name="Sub Process", target=work, daemon=True)
def worker():
    print('SubProcess End!')

if __name__ == "__main__":
    p = mp.Process(name='SubProcess', target=worker) #process spawning
    p.start()
    
>>> SubProcess End!

각 프로세스의 이름과 PID 출력하기

def worker2():
    proc = mp.current_process()
    print(proc.name)
    print(proc.pid)
    time.sleep(5)
    print('SubProcess End')

if __name__ == '__main__':
    proc = mp.current_process() # main process
    print(proc.name) 
    print(proc.pid)

    p = mp.Process(name='SubProcess', target=worker2)
    p.start()

    print('MainProcess End')
    
>>> MainProcess
>>> 71091
>>> MainProcess End
>>> SubProcess
>>> 71093
(5초 후)
>>> SubProcess End   

큐 (Queue) 클래스

프로세스끼리는 자원을 공유하지 않기 때문에, 프로세스 간의 자원 전달이 필요할 때는 큐 자료구조를 사용해야 한다.

  • put() : 큐에 데이터를 넣어줌 +
  • get() : 큐에서 데이터를 가져옴 -
  • qsize() : 현재 큐의 크기 반환
  • empty() : 큐가 비어 있으면 1을 반환
  • full() : 전체가 채워져 있으면 1을 반환
def queue(q):
    q.put([5, False, 'Queue'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=queue, args=(q,))
    p.start()
    print(q.get())
    p.join()

3️⃣ multiprocessing 모듈 사용 예제

1. sleep을 이용해서 비교하기

# 일반 함수
def sleep():
    print ('sleeping 1 sec')
    time.sleep(1)
    print ('wake up')

if __name__ == '__main__':
    start_time = time.perf_counter() 
    # perf_counters는 함수를 호출하여 대기한 시간 포함하여 측정 
    # process_time은 실제로 연산하는데 걸린 시간만 측정
    for i in range(4):
        sleep()
    end_time = time.perf_counter()
    print (f"{round(end_time-start_time,2)} finished") # 4.01 finished

# 멀티프로세싱 모듈 사용
def sleep_mp():
    print('sleeping 1 sec')
    time.sleep(1)
    print('wake up')

if __name__ == '__main__':
    start_time = time.perf_counter()

    processes = [] # 프로세스를 담을 리스트 (초기화)
    for i in range(4): # for 루프를 돌며 프로세스를 만듬 
        p = mp.Process(target=sleep_mp) # 각 프로세스에 작업을 등록하기 # target 인자에 해당 프로세스가 해야할 작업(함수) 등록
        p.start()
        processes.append(p)

    for process in processes:
        process.join() # 프로세스가 끝날 때까지 다음 코드는 실행시키지 못하도록 각 프로세스마다 join 호출 
    end_time = time.perf_counter()
    print (f"{round(end_time-start_time,2)} finished") #  1.11 finished

4️⃣ Concurrency.futures 모듈 사용 예제


def sleep_curr():
    print('sleeping 1 sec')
    time.sleep(1)
    return 'wake up' # 리턴값을 이용해 출력하기 위해 print 사용하지 않음 
 
if __name__ == '__main__':
 
    start_time = time.perf_counter()
 
    with concurrent.futures.ProcessPoolExecutor() as executor: # with문으로 ProcessPoolExecutor클래스의 인스턴스(excutor)만들어줌. 
        futures = [executor.submit(sleep_curr) for _ in range(4)] # submit을 이용해 프로세스 Pool에 작업 제출 ->future 객체 반환
 
    for future in futures:
        print(future.result()) # 작업 결과를 Future 객체의 result 메서드를 호출하여 얻음 
 
    end_time = time.perf_counter()
    print(f"{round(end_time-start_time,2)} finished") # 1.15 finished

참고 자료 (Reference)


https://docs.python.org/ko/3/library/multiprocessing.html
https://niceman.tistory.com/145
https://zephyrus1111.tistory.com/113
https://wikidocs.net/85603

profile
Back-end 개발자

0개의 댓글