서론

프로세스와 스레드의 큰 그림을 지난번 포스팅에서 알아보았다. 오늘은 파이썬의 Process 객체를 어떻게 활용하는지 구체적으로 살펴본다.

multiprocessing basic

파이썬의 multiprocessing 모듈에 대해서 배워보는 시간을 가진다.

두 번째 프로세스를 생성하고 실행하는 가장 심플한 방법은 타겟 함수를 가지는 Process 객체를 만들고 start() 메서드를 호출하는 것 이다.

# multiprocessing_simple.py
import multiprocessing


def worker():
    """worker function"""
    print('Worker')


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()

위의 코드를 실행시켜면 "Worker"를 다섯 번 출력된다. 각각의 프로세스가 아웃풋 스트림에 접근하기 위해 경쟁하기 때문에 실행 순서는 불분명하다.

$ python3 multiprocessing_simple.py

Worker
Worker
Worker
Worker
Worker

보통 어떤 일을 해야할지 알려주는 인자를 넣어서 프로세스를 생성한다. threading과 다르게 Process 객체에 넘어가는 인자는 pickle을 사용하여 직렬화 할 수 있어야 한다. 아래의 예는 각 워커에게 출력할 숫자를 전달한다.

# multiprocessing_simpleargs.py

import multiprocessing

def worker(num):
    """thread worker function"""
    print('Worker:', num)


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

각 워커에 의해 정수 인자들이 출력된다.

$ python3 multiprocessing_simpleargs.py

Worker: 0
Worker: 1
Worker: 2
Worker: 3
Worker: 4

import가 가능한 타겟 함수

threading 예제와 multiprocessing 예제의 다른점은 __main__에 대한 추가적인 보호이다. 새 프로세스가 시작되는 방식으로 인해, 자식 프로세스는 타겟 함수를 포함하는 스크립트를 import 할 수 있어야 한다. __main__을 검사하는 과정에서 main 부분을 래핑(wrapping)하는 것은 그 모듈이 import된 각 자식을 반복적으로 실행하지 않는다. 다른 접근법으로는 타겟 함수를 다른 스크립트로 분리하여 import 시키는 것이다. 예를 들어, multiprocessing_import_main.py는 다른 모듈에 정의된 워커 함수를 사용한다.

# multiprocessing_import_main.py

import multiprocessing
import multiprocessing_import_worker

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(
            target=multiprocessing_import_worker.worker,
        )
        jobs.append(p)
        p.start()

워커 함수는 multiprocessing_import_worker.py에 정의했다.

# multiprocessing_import_worker.py

def worker():
    """worker function"""
    print('Worker')
    return

출력은 다음과 같다.

python3 multiprocessing_import_main.py

Worker
Worker
Worker
Worker
Worker

현재 프로세스 (이름을) 결정하기

프로세스를 식별하거나 이름을 주기 위해 인자를 넘기는 것은 번거로운 일고 불필요한 일이다. 각 Process 인스턴스는 프로세스가 생성되면서 기본값으로 이름을 부여받는다. 프로세스에 이름이 있는 것은 동시에 여러 타입의 프로세스가 실행될 때, 그것들을 추적하는데 용이하다.

# multiprocessing_names.py
import multiprocessing
import time


def worker():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(2)
    print(name, 'Exiting')


def my_service():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(3)
    print(name, 'Exiting')


if __name__ == '__main__':
    service = multiprocessing.Process(
        name='my_service',
        target=my_service,
    )
    worker_1 = multiprocessing.Process(
        name='worker 1',
        target=worker,
    )
    worker_2 = multiprocessing.Process(  # default name
        target=worker,
    )

    worker_1.start()
    worker_2.start()
    service.start()

각 줄에 현재의 프로세스 이름을 포함한 값이 출력된다. Process-3 라는 이름을 가진 프로세스는 이름을 정의하지 않은 Process (worker_2) 주어진 이름이다.

$ python3 multiprocessing_names.py

worker 1 Starting
worker 1 Exiting
Process-3 Starting
Process-3 Exiting
my_service Starting
my_service Exiting

데몬 프로세스

기본적으로, 메인 프로그램은 모든 하위 항목들이 실행을 마칠 때 까지 종료되지 않는다. 도중에 중단시키는 방법이 어려운 서비스 또는 작업 도중 데이터를 손실시키지 않으면서 죽어도 되는 서비스 (예를 들어, 서비스 모니터링 툴에서 사용되는 "heart beats"를 생성하는 작업) 와 같이 메인 프로그램을 막지 않고 실행되는 백그라운드를 실행할 때가 있다.

프로세스를 데몬으로 설정하려면 daemon 속성을 True 로 준다. 속성을 주지않으면 데몬이 되지 않는다.

multiprocessing_daemon.py
import multiprocessing
import time
import sys


def daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    time.sleep(2)
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()


def non_daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()

메인 프로그램을 포함하는 모든 non-데몬 프로세스는 데몬 프로세스가 2초의 sleep 후 깨어나기 전에 종료되는 반면, 데몬 프로세스는 "Exiting" 메시지를 출력하지 않는다.

$ python3 multiprocessing_daemon.py

Starting: daemon 3915
Starting: non-daemon 3916
Exiting : non-daemon 3916

데몬 프로세스는 고아가 프로세스가 남아서 실행되지 않도록 메인 프로그램이 끝나기 전에 자동적으로 종료된다. 이것은 프로그램이 실행될 때 프로세스 아이디를 출력해보면서 확인해볼 수 있다. 그래고 ps같은 커맨드를 실행해 확인이 가능하다.

프로세스 대기하기

프로세스가 완료되어 종료되기까지 기다리게 해주는 메서드가 join()이다.

# multiprocessing_daemon_join.py

import multiprocessing
import time
import sys


def daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    time.sleep(2)
    print('Exiting :', name)


def non_daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    print('Exiting :', name)


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()

    d.join()
    n.join()

메인 프로세스가 join() 메서드를 사용하는 데몬이 종료되도록 기다리고 있기 때문에, "Existing" 메시지가 이 때 출력된다.

$ python3 multiprocessing_daemon_join.py

Starting: daemon
Starting: non-daemon
Exiting : non-daemon
Exiting : daemon

기본적으로 join() 메서드는 기다리게 한다. 그래서 timeout 인자를 넘겨줄 수 있다. 프로세스가 inactive하게 되는 시간을 float 숫자로 나타낸다. 단위는 초이다. 만약 프로세스가 타임아웃 시간 내에 종료하지 않는다면, join() 메서드는 그냥 리턴된다.

# multiprocessing_daemon_join_timeout.py

import multiprocessing
import time
import sys


def daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    time.sleep(2)
    print('Exiting :', name)


def non_daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    print('Exiting :', name)


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    n.start()

    d.join(1)    # 종료될 때 까지 기다리지 않는단 마인드
    print('d.is_alive()', d.is_alive())
    n.join()

timeout 시간이 데몬이 sleep 하는 시간보다 적기 때문에, 프로세스는 join() 메서드가 리턴 후 여전히 "alive" 상태이다.

$ python3 multiprocessing_daemon_join_timeout.py

Starting: daemon
Starting: non-daemon
Exiting : non-daemon
d.is_alive() True

프로세스 종료하기

포이즌 필 방식으로 프로세스를 종료해야한다는 신호를 보내는 것이 더 좋지만, 프로세스가 움직이 않게 되거나, 교착상태에 빠지면 강제로 종료하는 것이 좋다. 프로세스 객체에서 terminate() 메서드를 호출하면 자식 프로세스가 종료된다.

# multiprocessing_terminate.py

import multiprocessing
import time


def slow_worker():
    print('Starting worker')
    time.sleep(0.1)
    print('Finished worker')


if __name__ == '__main__':
    p = multiprocessing.Process(target=slow_worker)
    print('BEFORE:', p, p.is_alive())

    p.start()
    print('DURING:', p, p.is_alive())

    p.terminate()
    print('TERMINATED:', p, p.is_alive())

    p.join()
    print('JOINED:', p, p.is_alive())

Note
프로세스 관리 코드에게 객체에 프로세스 종료를 반영할 시간을 주기위해 terminate() 메서드 호출 후 join() 메서드를 호출한 것에 주목한다.

DURING: <Process name='Process-1' pid=4662 parent=4660 started> True
TERMINATED: <Process name='Process-1' pid=4662 parent=4660 started> True
JOINED: <Process name='Process-1' pid=4662 parent=4660 stopped exitcode=-SIGTERM> False

프로세스 종료 상태

프로세스가 종료할 때 생성된 상태 코드는 exitcode 속성을 통해 접근 가능하다. 아래의 테이블에 상태 코드의 종류가 적혀있다.

Exit CodeMeaning
== 0에러가 없음
> 0프로세스에 에러가 생겼고 상태코드와 함께 종료됨
< 0-1 * exitcode 의 신호와 함께 프로세스가 kill됨
# multiprocessing_exitcode.py

import multiprocessing
import sys
import time


def exit_error():
    sys.exit(1)


def exit_ok():
    return


def return_value():
    return 1


def raises():
    raise RuntimeError('There was an error!')


def terminated():
    time.sleep(3)


if __name__ == '__main__':
    jobs = []
    funcs = [
        exit_error,
        exit_ok,
        return_value,
        raises,
        terminated,
    ]
    for f in funcs:
        print('Starting process for', f.__name__)
        j = multiprocessing.Process(target=f, name=f.__name__)
        jobs.append(j)
        j.start()

    jobs[-1].terminate()

    for j in jobs:
        j.join()
        print('{:>15}.exitcode = {}'.format(j.name, j.exitcode))

예외가 발생한 프로세스들은 exitcode 값이 자동으로 1이 된다.

$ python3 multiprocessing_exitcode.py

Starting process for exit_error
Starting process for exit_ok
Starting process for return_value
Starting process for raises
Starting process for terminated
     exit_error.exitcode = 1
        exit_ok.exitcode = 0
   return_value.exitcode = 0
Process raises:
Traceback (most recent call last):
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/choonghee/workspace/wecode/projets-pour-blogging/process/multiprocessing_exitcode.py", line 19, in raises
    raise RuntimeError('There was an error!')
RuntimeError: There was an error!
         raises.exitcode = 1
     terminated.exitcode = -15

Logging

동시 실행 문제를 디버깅할 때, multiprocessing 에 의해 제공되는 객체의 내부에 접근하는 것이 유용하다. 로깅을 하게 해주는 모듈-레벨 함수 log_to_stderr() 가 있다. logging을 사용해서 logger 객체를 설정하고 로그 메시지를 스탠다드 에러 채널로 보내는 핸들러를 더해준다.

# multiprocessing_log_to_stderr.py

import multiprocessing
import logging
import sys


def worker():
    print('Doing some work')
    sys.stdout.flush()


if __name__ == '__main__':
    multiprocessing.log_to_stderr(logging.DEBUG)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()

기본적으로, 로깅 레벨은 NOTSET 으로 설정되어 있기 때문에 아무런 메시지도 생성지 않는다. logger를 초기화할 때 원하는 로깅 레벨을 설정하면 된다.

python3 multiprocessing_log_to_stderr.py

[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority
>= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with
priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

logger를 직접 조작 (로깅 레벨 설정 또는 핸들러 추가) 하고 싶으면, get_logger() 메서드를 사용한다.

import multiprocessing
import logging
import sys


def worker():
    print('Doing some work')
    sys.stdout.flush()


if __name__ == '__main__':
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()

multiprocessing가 사용하는 logging 설정 파일 API를 통해 logger를 설정할 수 있다.

$ python3 multiprocessing_get_logger.py

[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down

프로세스를 서브클래싱하기

Process에 타겟 함수를 넣어서 start 하는 것이 가장 간단한 방법이지만, custom subclass를 만들어 활용하는 것도 가능하다.

# multiprocessing_subclass.py
import multiprocessing


class Worker(multiprocessing.Process):

    def run(self):
        print('In {}'.format(self.name))
        return


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()

run() 메서드를 꼭 오버라이드 해야한다.

$ python3 multiprocessing_subclass.py

In Worker-1
In Worker-2
In Worker-3
In Worker-4
In Worker-5

Assignment

1

위에서 여러가지 예제를 공부했으니 퉁쳐보도록 하겠습니다.. 🙇🏻‍♂️

2

두개의 프로세스를 이용하여 5천만씩 증가시키고 공유 메모리를 사용해서 세마포어로 동기화 시켜 최종 값을 1억으로 만드는 코드를 구현해 보는 과제이다. 공유 메모리와 세마포어에 대해 알아보고 답안을 적어봤다.

shared_memory

shared_memory 모듈은 하나 또는 그 이상의 프로세스들이 접근 가능한 공유 메모리의 관리와 할당을 위한 SharedMemory 클래스를 제공한다.

SharedMemory(name=None, create=False, size=0)

새로운 공유 메모리 블록을 만들거나 기존 공유 메모리 블록에 연결할 수 있다. 각 공유 메모리에는 이름을 지정할 수 있으며, 같은 이름을 사용하는 공유 메모리 블록에 연결할 수 있다.

한 프로세스가 더 이상 공유 메모리 블록에 대한 액세스를 필요로하지 않으면 close() 메서드를 호출해야 한다. 어떤 프로세스에서도 공유 메모리 블록이 필요하지 않으면, 정리를 위해 unlink() 메서드를 호출해야 한다.

name은 문자열로 지정된 공유 메모리의 이름이다.

create는 새로운 공유 메모리 블록을 만들지(True), 기존의 블록에 연결할지(Flase) 결정한다.

size는 새로운 공유 메모리 블록을 만들 때 요청된 바이트 수를 지정한다. 그렇게 생성된 블록의 사이즈는 요청한 크기와 정확히 같거나 더 클 수도있다. 기존 메모리 블록에 연결할 때는 무시된다.

close()
공유 메모리에 대한 액세스를 닫는다. close()를 호출해도 공유 메모리 블록은 파괴되지 않는다.

unlink()
메모리 블록이 삭제되도록 요청한다. 공유 메모리 블록에 접근하는 모든 프로세스들 중 하나가 단 한번만 호출해야 한다. 파괴를 요청한 후, 즉시 파괴될 수도 있고 그렇지 않을 수도 있다.

buf
공유 메모리 블록 내용에 대한 메모리 뷰

name
공유 메모리 블록의 고유한 이름에 대한 읽기 전용 액세스

size
공유 메모리 블록 크기(바이트)에 대한 읽기 전용 액세스

Semaphore

세마포어는 각 acquire() 호출에 의해 감소하고 각 release() 호출에 의해 증가하는 내부 카운터를 관리 한다. 카운터는 절대 0 밑으로 떨어질 수 없다. acquire()가 0임을 발견하면, 다른 스레드가 release()를 호출할 때까지 대기하면서 블록 합니다.

Semaphore(value=1)

세마포어 객체를 구현한다. 주어진 value가 0보다 작으면 ValueError가 발생한다.

acquire(blocking=True, timeout=None)
세마포어를 획득한다.

인자 없이 호출될 때:

  • 진입시 내부 카운터가 0보다 크면, 1 감소시키고 즉시 True를 반환한다.
  • 진입시 내부 카운터가 0이면, release()를 호출하여 깨울 때 까지 블록한다. 일단 깨어나면 (카운터가 0보다 크면), 카운터를 1 줄이고 True를 반환한다. release()를 호출할 때 마다 정확히 하나의 스레드가 깨어난다. 스레드가 깨어나는 순서에 의존해서는 안된다.

False로 설정한 blocking으로 호출하면 블록 하지 않는다. 인자가 없는 호출이 블록 할 것이라면, 즉시 False를 반환한다. 그렇지 않느면 인자 없이 호출할 때와 같은 작업을 수행하고 True를 반환한다.

None 이외의 timeout으로 호출하면, 최대 timeout 초 동안 블록 한다. 그 사이 획득이 완료되지 않으면, False를 반환한다. 완료되면 True를 반환한다.

release()
내부 카운터를 1 증가시키면서 세마포어를 해제한다. 진입 시 0이고 다른 스레드가 다시 0보다 커리기를 기다리고 있으면, 해당 스레드를 깨운다.

답안

오우 이건 애를 좀 먹었다. 처음에는 SharedMemory의 버퍼가 bytearray 인 줄 모르고 왜 값이 255까지 밖에 안들어가는지 고생을 했다. 다른 동기들에게 물어보니 numpy의 배열을 사용해보라고 힌트를 얻어서 해보게 되었다. 굳이 넘파이까지 사용하지 않아도 공유 메모리를 잘 사용할 수 있게 업데이트 해줬으면 좋겠다. 코드 해설은 주석으로 달아놓았다.

from multiprocessing import Process, Semaphore, shared_memory
import time
import numpy as np


def worker(id, number, a, memory_name, semaphore1):

    """
    0에서 5천만까지 1씩 증가시킨다.
    """
    increased_number = 0
    for i in range(number):
        increased_number += 1
    
    """
    세마포어로 공유 메모리의 데이터를 다른 프로세스가 
    접근하지 못하게 블록한다.
    """
    semaphore1.acquire()
    """
    같은 공유 메모리 블록을 활용한다.
    """
    same_shared_memory1 = shared_memory.SharedMemory(name=memory_name)
    """
    공유 메모리 블록의 버퍼를 활용하기 좋게 넘파이 배열로 변환한다.
    이것은 아래 코드의 공유된 넘파이 배열의 레퍼런스이다.
    그래서 아래 코드의 "b"와 여기의 "c"는 같은 배열이다.
    """
    c = np.ndarray(a.shape, dtype=a.dtype, buffer=same_shared_memory1.buf)
    """
    공유 메모리 블록에 5천만이라는 값을 증가시킨다.
    """
    c[0] += increased_number
    """
    세마포어로 다른 프로세스가 공유 메모리에 접근 가능하도록 풀어준다.
    """
    semaphore1.release()


if __name__ == "__main__":

    start_time = time.time()
    
    """
    세마포어 객체 생성
    """
    semaphore1 = Semaphore()
    """
    넘파이 배열
    숫자 하나만 담을 예정이므로 아이템이 하나인 1차원 배열을 만든다.
    """
    a = np.array([0])
    """
    넘파이 배열을 이용하여
    새로운 공유 메모리를 생성한다.
    """
    shared_memory1 = shared_memory.SharedMemory(create=True, size=a.nbytes)
    """
    공유 메모리의 버퍼를 활용하기 좋게 넘파이 배열로 변환한다.
    """
    b = np.ndarray(a.shape, dtype=a.dtype, buffer=shared_memory1.buf)
    """
    타겟 함수(worker)를 실행하는 프로세스들이다.
    """
    th1 = Process(target=worker, args=(1, 50000000, a, shared_memory1.name, semaphore1))
    th2 = Process(target=worker, args=(2, 50000000, a, shared_memory1.name, semaphore1))

    th1.start()
    th2.start()
    th1.join()
    th2.join()

    print("--- %s seconds ---" % (time.time() - start_time))

    total = b[0]

    print("total_number=",end=""), print(total)
    print("end of main")

    shared_memory1.close()
    shared_memory1.unlink()

결과는 아래와 같다.

--- 3.0317249298095703 seconds ---
total_number=100000000
end of main

깔--------끔 😎

profile
뭐든지 열심히하는 타입 😎

0개의 댓글