[분산 시스템] 1-9. Queue & Pipe

gimseonjin616·2022년 1월 4일
0

분산시스템

목록 보기
10/14

Queue In Process

Thread에서 생산자 소비자 패턴을 학습할 때 처음으로 Queue를 사용했다. 간단하게 복습하면 Queue는 선입선출 방식의 자료구조로 주로 데이터를 전달할 때 많이 사용되며 멀티프로세스에서도 사용 가능하다

메인 프로세스 & 서브 프로세스

라이브러리

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process, Manager
import time
import os

메인 프로세스

def main():

    # 부모 프로세스 아이디
    parent_process_id = os.getpid()
    # 출력
    print(f"Parent process ID {parent_process_id}")

    # 시작 시간
    start_time = time.time()

    m = Manager()
    q = m.Queue()

    with ProcessPoolExecutor(max_workers=5) as executor:
        for _ in range(10):
            executor.submit(worker, 1, 100000000, q)

    # 순수 계산 시간
    print("--- %s seconds ---" % (time.time() - start_time))

    # 종료 플래그
    q.put('exit')

    total = 0

    # 대기
    while True:
        tmp = q.get()
        if tmp == 'exit':
            break
        else:
            total += tmp

    print()

    print("Main-Processing Total_count={}".format(total))
    print("Main-Processing Done!")

if __name__ == "__main__":
    main()
   

서브 프로세스

# 실행 함수
def worker(id, baseNum, q):

    process_id = os.getpid()
    process_name = current_process().name

    # 누적
    sub_total = 0

    # 계산
    for i in range(baseNum):
        sub_total += 1

    # Produce
    q.put(sub_total)

    # 정보 출력
    print(f"Process ID: {process_id}, Process Name: {process_name}")
    print(f"Result : {sub_total}")

실행 결과

Pipe in Process

파이프라인은 프로세스 간의 통신을 할 수 있도록 통로를 열어주는 개념이다. 소켓 통신과 비슷한데 1대1 통신을 기본으로 한다.

메인 프로세스 & 서브 프로세스

라이브러리

from multiprocessing import Process, Pipe, current_process
import time
import os

메인 프로세스

def main():

    # 부모 프로세스 아이디
    parent_process_id = os.getpid()
    # 출력
    print(f"Parent process ID {parent_process_id}")

    # 시작 시간
    start_time = time.time()

    # Pipe 선언
    parent_conn, child_conn = Pipe()

     # 프로세스 생성 및 실행
    
    # 생성
    t = Process(target=worker, args=(1, 100000000, child_conn))

    # 시작
    t.start()

    # Join
    t.join()

    # 순수 계산 시간
    print("--- %s seconds ---" % (time.time() - start_time))

    print()

    print("Main-Processing : {}".format(parent_conn.recv()))
    print("Main-Processing Done!")

if __name__ == "__main__":
    main()

서브 프로세스

# 실행 함수
def worker(id, baseNum, conn):

    process_id = os.getpid()
    process_name = current_process().name

    # 누적
    sub_total = 0

    # 계산
    for _ in range(baseNum):
        sub_total += 1

    # Produce
    conn.send(sub_total)
    conn.close()

    # 정보 출력
    print(f"Process ID: {process_id}, Process Name: {process_name}")
    print(f"Result : {sub_total}")

실행 결과

profile
to be data engineer

0개의 댓글