[분산 시스템] 1-8. Process Shared State

gimseonjin616·2022년 1월 2일
0

분산시스템

목록 보기
9/14

각각의 프로세스 메모리는 독립적이다.

프로세스는 각각의 개체가 메모리를 할당받아 독립적인 Code, Data, Stack, Heap 영역을 가진다. 만약 1~1000까지 숫자를 더해야 하는데 총 10개의 프로세스에게 일을 맡기고 싶다면 어떻게 해야할까?

기본적으로 프로세스는 값을 공유하지 않기 때문에 1번 프로세스는 1~10까지, 2번 프로세스는 11~20까지 와 같이 범위를 지정해줘서 차후에 합치는 방법을 쓰거나 하나의 공유 메모리를 만들어서 순차적으로 메모리에 접근해 값을 더하는 방식을 쓸 수 있다.

이번 시간에는 공유 메모리를 만들어서 각각의 프로세스가 접근하는 방식을 구현해보고자 한다.

메인 프로세스 & 서브 프로세스

사용 라이브러리

# Value 와 Array는 공유 메모리를 선언하는데 사용되는 라이브러리다.
from multiprocessing import Process, current_process, Value, Array
import random
import os

메인 프로세스

def main():
    # 부모 프로세스 아이디
    parent_process_id = os.getpid()
    # 출력
    print(f"Parent process ID {parent_process_id}")

    # 프로세스 리스트  선언
    processes = list()
    
    # share_numbers = Array('i', range(50)) # i, c 등 Flag 확인 <- 여러 데이터를 공유
    share_value = Value('i', 0) # <- 단일 데이터 공유
    for _ in range(1,10):
        # 생성
        p = Process(target=generate_update_number, args=(share_value,))
        # 배열에 담기
        processes.append(p)
        # 실행
        p.start()
        
    # Join
    for p in processes:
        p.join()

    # 최종 프로세스 부모 변수 확인
    print("Final Data(share_value) in parent process",  share_value.value)

if __name__ == '__main__':
    main()

서브 프로세스

# 실행 함수
def generate_update_number(v : int):
    for i in range(50):
        v.value += 1
    print(current_process().name, "data", v.value)

실행 결과

실행 결과를 살펴보면 우리가 예상한 값 450보다 적은 405가 출력됐다. 이는 동기화 문제로 판단된다. 공유 자원인 shared_value에 다수의 프로세스가 동시에 접근하면서 데이터가 누락된 것으로 판단된다.

따라서 이때 Multiprocessing 라이브러리의 Lock 이나 Semaphore를 가져와서 접근을 제한해줘야 한다. 현재 공유 자원에는 한 개의 프로세스와 한 개의 스레드만 접근해야하기 때문에 Lock을 통해 뮤텍스로 구현해야 한다.

이때 Lock 객체는 메인 프로세스에서 구현한 후, 관리해줘야 한다. 그렇지 않으면 각각의 서브 프로세스에서 Lock을 관리하기 때문에 제대로 뮤택스화 되지 않는다!

수정된 메인 프로세스

def main():
    # 부모 프로세스 아이디
    parent_process_id = os.getpid()
    # 출력
    print(f"Parent process ID {parent_process_id}")

    # 프로세스 리스트  선언
    processes = list()

    # share_numbers = Array('i', range(50)) # i, c 등 Flag 확인
    share_value = Value('i',0)

    # Lock 객체 선언
    lock = Lock()

    for _ in range(1,10):
        # 생성
        p = Process(target=generate_update_number, args=(share_value,lock,))
        # 배열에 담기
        processes.append(p)
        # 실행
        p.start()
        
    # Join
    for p in processes:
        p.join()

    # 최종 프로세스 부모 변수 확인
    print("Final Data(share_value) in parent process",  share_value.value)

if __name__ == '__main__':
    main()

수정된 서브 프로세스

def generate_update_number(v : int, lock):
    for i in range(50):
        lock.acquire()
        v.value += 1
        lock.release()
    print(current_process().name, "data", v.value)

수정된 결과

profile
to be data engineer

0개의 댓글