프로세스와 스레드의 큰 그림을 지난번 포스팅에서 알아보았다. 오늘은 파이썬의 Process
객체를 어떻게 활용하는지 구체적으로 살펴본다.
파이썬의 multiprocessing
모듈에 대해서 배워보는 시간을 가진다.
두 번째 프로세스를 생성하고 실행하는 가장 심플한 방법은 타겟 함수를 가지는 Process
객체를 만들고 start()
메서드를 호출하는 것 이다.
# multiprocessing_simple.py
import multiprocessing
def worker():
"""worker function"""
print('Worker')
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker)
jobs.append(p)
p.start()
위의 코드를 실행시켜면 "Worker"를 다섯 번 출력된다. 각각의 프로세스가 아웃풋 스트림에 접근하기 위해 경쟁하기 때문에 실행 순서는 불분명하다.
$ python3 multiprocessing_simple.py
Worker
Worker
Worker
Worker
Worker
보통 어떤 일을 해야할지 알려주는 인자를 넣어서 프로세스를 생성한다. threading
과 다르게 Process
객체에 넘어가는 인자는 pickle
을 사용하여 직렬화 할 수 있어야 한다. 아래의 예는 각 워커에게 출력할 숫자를 전달한다.
# multiprocessing_simpleargs.py
import multiprocessing
def worker(num):
"""thread worker function"""
print('Worker:', num)
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
각 워커에 의해 정수 인자들이 출력된다.
$ python3 multiprocessing_simpleargs.py
Worker: 0
Worker: 1
Worker: 2
Worker: 3
Worker: 4
threading
예제와 multiprocessing
예제의 다른점은 __main__
에 대한 추가적인 보호이다. 새 프로세스가 시작되는 방식으로 인해, 자식 프로세스는 타겟 함수를 포함하는 스크립트를 import 할 수 있어야 한다. __main__
을 검사하는 과정에서 main 부분을 래핑(wrapping)하는 것은 그 모듈이 import된 각 자식을 반복적으로 실행하지 않는다. 다른 접근법으로는 타겟 함수를 다른 스크립트로 분리하여 import 시키는 것이다. 예를 들어, multiprocessing_import_main.py
는 다른 모듈에 정의된 워커 함수를 사용한다.
# multiprocessing_import_main.py
import multiprocessing
import multiprocessing_import_worker
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(
target=multiprocessing_import_worker.worker,
)
jobs.append(p)
p.start()
워커 함수는 multiprocessing_import_worker.py
에 정의했다.
# multiprocessing_import_worker.py
def worker():
"""worker function"""
print('Worker')
return
출력은 다음과 같다.
python3 multiprocessing_import_main.py
Worker
Worker
Worker
Worker
Worker
프로세스를 식별하거나 이름을 주기 위해 인자를 넘기는 것은 번거로운 일고 불필요한 일이다. 각 Process
인스턴스는 프로세스가 생성되면서 기본값으로 이름을 부여받는다. 프로세스에 이름이 있는 것은 동시에 여러 타입의 프로세스가 실행될 때, 그것들을 추적하는데 용이하다.
# multiprocessing_names.py
import multiprocessing
import time
def worker():
name = multiprocessing.current_process().name
print(name, 'Starting')
time.sleep(2)
print(name, 'Exiting')
def my_service():
name = multiprocessing.current_process().name
print(name, 'Starting')
time.sleep(3)
print(name, 'Exiting')
if __name__ == '__main__':
service = multiprocessing.Process(
name='my_service',
target=my_service,
)
worker_1 = multiprocessing.Process(
name='worker 1',
target=worker,
)
worker_2 = multiprocessing.Process( # default name
target=worker,
)
worker_1.start()
worker_2.start()
service.start()
각 줄에 현재의 프로세스 이름을 포함한 값이 출력된다. Process-3 라는 이름을 가진 프로세스는 이름을 정의하지 않은 Process (worker_2) 주어진 이름이다.
$ python3 multiprocessing_names.py
worker 1 Starting
worker 1 Exiting
Process-3 Starting
Process-3 Exiting
my_service Starting
my_service Exiting
기본적으로, 메인 프로그램은 모든 하위 항목들이 실행을 마칠 때 까지 종료되지 않는다. 도중에 중단시키는 방법이 어려운 서비스 또는 작업 도중 데이터를 손실시키지 않으면서 죽어도 되는 서비스 (예를 들어, 서비스 모니터링 툴에서 사용되는 "heart beats"를 생성하는 작업) 와 같이 메인 프로그램을 막지 않고 실행되는 백그라운드를 실행할 때가 있다.
프로세스를 데몬으로 설정하려면 daemon
속성을 True 로 준다. 속성을 주지않으면 데몬이 되지 않는다.
multiprocessing_daemon.py
import multiprocessing
import time
import sys
def daemon():
p = multiprocessing.current_process()
print('Starting:', p.name, p.pid)
sys.stdout.flush()
time.sleep(2)
print('Exiting :', p.name, p.pid)
sys.stdout.flush()
def non_daemon():
p = multiprocessing.current_process()
print('Starting:', p.name, p.pid)
sys.stdout.flush()
print('Exiting :', p.name, p.pid)
sys.stdout.flush()
if __name__ == '__main__':
d = multiprocessing.Process(
name='daemon',
target=daemon,
)
d.daemon = True
n = multiprocessing.Process(
name='non-daemon',
target=non_daemon,
)
n.daemon = False
d.start()
time.sleep(1)
n.start()
메인 프로그램을 포함하는 모든 non-데몬 프로세스는 데몬 프로세스가 2초의 sleep 후 깨어나기 전에 종료되는 반면, 데몬 프로세스는 "Exiting" 메시지를 출력하지 않는다.
$ python3 multiprocessing_daemon.py
Starting: daemon 3915
Starting: non-daemon 3916
Exiting : non-daemon 3916
데몬 프로세스는 고아가 프로세스가 남아서 실행되지 않도록 메인 프로그램이 끝나기 전에 자동적으로 종료된다. 이것은 프로그램이 실행될 때 프로세스 아이디를 출력해보면서 확인해볼 수 있다. 그래고 ps
같은 커맨드를 실행해 확인이 가능하다.
프로세스가 완료되어 종료되기까지 기다리게 해주는 메서드가 join()
이다.
# multiprocessing_daemon_join.py
import multiprocessing
import time
import sys
def daemon():
name = multiprocessing.current_process().name
print('Starting:', name)
time.sleep(2)
print('Exiting :', name)
def non_daemon():
name = multiprocessing.current_process().name
print('Starting:', name)
print('Exiting :', name)
if __name__ == '__main__':
d = multiprocessing.Process(
name='daemon',
target=daemon,
)
d.daemon = True
n = multiprocessing.Process(
name='non-daemon',
target=non_daemon,
)
n.daemon = False
d.start()
time.sleep(1)
n.start()
d.join()
n.join()
메인 프로세스가 join()
메서드를 사용하는 데몬이 종료되도록 기다리고 있기 때문에, "Existing" 메시지가 이 때 출력된다.
$ python3 multiprocessing_daemon_join.py
Starting: daemon
Starting: non-daemon
Exiting : non-daemon
Exiting : daemon
기본적으로 join()
메서드는 기다리게 한다. 그래서 timeout 인자를 넘겨줄 수 있다. 프로세스가 inactive하게 되는 시간을 float 숫자로 나타낸다. 단위는 초이다. 만약 프로세스가 타임아웃 시간 내에 종료하지 않는다면, join()
메서드는 그냥 리턴된다.
# multiprocessing_daemon_join_timeout.py
import multiprocessing
import time
import sys
def daemon():
name = multiprocessing.current_process().name
print('Starting:', name)
time.sleep(2)
print('Exiting :', name)
def non_daemon():
name = multiprocessing.current_process().name
print('Starting:', name)
print('Exiting :', name)
if __name__ == '__main__':
d = multiprocessing.Process(
name='daemon',
target=daemon,
)
d.daemon = True
n = multiprocessing.Process(
name='non-daemon',
target=non_daemon,
)
n.daemon = False
d.start()
n.start()
d.join(1) # 종료될 때 까지 기다리지 않는단 마인드
print('d.is_alive()', d.is_alive())
n.join()
timeout 시간이 데몬이 sleep 하는 시간보다 적기 때문에, 프로세스는 join()
메서드가 리턴 후 여전히 "alive" 상태이다.
$ python3 multiprocessing_daemon_join_timeout.py
Starting: daemon
Starting: non-daemon
Exiting : non-daemon
d.is_alive() True
포이즌 필 방식으로 프로세스를 종료해야한다는 신호를 보내는 것이 더 좋지만, 프로세스가 움직이 않게 되거나, 교착상태에 빠지면 강제로 종료하는 것이 좋다. 프로세스 객체에서 terminate()
메서드를 호출하면 자식 프로세스가 종료된다.
# multiprocessing_terminate.py
import multiprocessing
import time
def slow_worker():
print('Starting worker')
time.sleep(0.1)
print('Finished worker')
if __name__ == '__main__':
p = multiprocessing.Process(target=slow_worker)
print('BEFORE:', p, p.is_alive())
p.start()
print('DURING:', p, p.is_alive())
p.terminate()
print('TERMINATED:', p, p.is_alive())
p.join()
print('JOINED:', p, p.is_alive())
Note
프로세스 관리 코드에게 객체에 프로세스 종료를 반영할 시간을 주기위해terminate()
메서드 호출 후join()
메서드를 호출한 것에 주목한다.
DURING: <Process name='Process-1' pid=4662 parent=4660 started> True
TERMINATED: <Process name='Process-1' pid=4662 parent=4660 started> True
JOINED: <Process name='Process-1' pid=4662 parent=4660 stopped exitcode=-SIGTERM> False
프로세스가 종료할 때 생성된 상태 코드는 exitcode
속성을 통해 접근 가능하다. 아래의 테이블에 상태 코드의 종류가 적혀있다.
Exit Code | Meaning |
---|---|
== 0 | 에러가 없음 |
> 0 | 프로세스에 에러가 생겼고 상태코드와 함께 종료됨 |
< 0 | -1 * exitcode 의 신호와 함께 프로세스가 kill됨 |
# multiprocessing_exitcode.py
import multiprocessing
import sys
import time
def exit_error():
sys.exit(1)
def exit_ok():
return
def return_value():
return 1
def raises():
raise RuntimeError('There was an error!')
def terminated():
time.sleep(3)
if __name__ == '__main__':
jobs = []
funcs = [
exit_error,
exit_ok,
return_value,
raises,
terminated,
]
for f in funcs:
print('Starting process for', f.__name__)
j = multiprocessing.Process(target=f, name=f.__name__)
jobs.append(j)
j.start()
jobs[-1].terminate()
for j in jobs:
j.join()
print('{:>15}.exitcode = {}'.format(j.name, j.exitcode))
예외가 발생한 프로세스들은 exitcode
값이 자동으로 1이 된다.
$ python3 multiprocessing_exitcode.py
Starting process for exit_error
Starting process for exit_ok
Starting process for return_value
Starting process for raises
Starting process for terminated
exit_error.exitcode = 1
exit_ok.exitcode = 0
return_value.exitcode = 0
Process raises:
Traceback (most recent call last):
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/Users/choonghee/workspace/wecode/projets-pour-blogging/process/multiprocessing_exitcode.py", line 19, in raises
raise RuntimeError('There was an error!')
RuntimeError: There was an error!
raises.exitcode = 1
terminated.exitcode = -15
동시 실행 문제를 디버깅할 때, multiprocessing
에 의해 제공되는 객체의 내부에 접근하는 것이 유용하다. 로깅을 하게 해주는 모듈-레벨 함수 log_to_stderr()
가 있다. logging
을 사용해서 logger 객체를 설정하고 로그 메시지를 스탠다드 에러 채널로 보내는 핸들러를 더해준다.
# multiprocessing_log_to_stderr.py
import multiprocessing
import logging
import sys
def worker():
print('Doing some work')
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.log_to_stderr(logging.DEBUG)
p = multiprocessing.Process(target=worker)
p.start()
p.join()
기본적으로, 로깅 레벨은 NOTSET
으로 설정되어 있기 때문에 아무런 메시지도 생성지 않는다. logger를 초기화할 때 원하는 로깅 레벨을 설정하면 된다.
python3 multiprocessing_log_to_stderr.py
[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority
>= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with
priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers
logger를 직접 조작 (로깅 레벨 설정 또는 핸들러 추가) 하고 싶으면, get_logger()
메서드를 사용한다.
import multiprocessing
import logging
import sys
def worker():
print('Doing some work')
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
p = multiprocessing.Process(target=worker)
p.start()
p.join()
multiprocessing
가 사용하는 logging
설정 파일 API를 통해 logger를 설정할 수 있다.
$ python3 multiprocessing_get_logger.py
[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
Process
에 타겟 함수를 넣어서 start 하는 것이 가장 간단한 방법이지만, custom subclass를 만들어 활용하는 것도 가능하다.
# multiprocessing_subclass.py
import multiprocessing
class Worker(multiprocessing.Process):
def run(self):
print('In {}'.format(self.name))
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = Worker()
jobs.append(p)
p.start()
for j in jobs:
j.join()
run()
메서드를 꼭 오버라이드 해야한다.
$ python3 multiprocessing_subclass.py
In Worker-1
In Worker-2
In Worker-3
In Worker-4
In Worker-5
위에서 여러가지 예제를 공부했으니 퉁쳐보도록 하겠습니다.. 🙇🏻♂️
두개의 프로세스를 이용하여 5천만씩 증가시키고 공유 메모리를 사용해서 세마포어로 동기화 시켜 최종 값을 1억으로 만드는 코드를 구현해 보는 과제이다. 공유 메모리와 세마포어에 대해 알아보고 답안을 적어봤다.
shared_memory
모듈은 하나 또는 그 이상의 프로세스들이 접근 가능한 공유 메모리의 관리와 할당을 위한 SharedMemory
클래스를 제공한다.
새로운 공유 메모리 블록을 만들거나 기존 공유 메모리 블록에 연결할 수 있다. 각 공유 메모리에는 이름을 지정할 수 있으며, 같은 이름을 사용하는 공유 메모리 블록에 연결할 수 있다.
한 프로세스가 더 이상 공유 메모리 블록에 대한 액세스를 필요로하지 않으면 close()
메서드를 호출해야 한다. 어떤 프로세스에서도 공유 메모리 블록이 필요하지 않으면, 정리를 위해 unlink()
메서드를 호출해야 한다.
name은 문자열로 지정된 공유 메모리의 이름이다.
create는 새로운 공유 메모리 블록을 만들지(True
), 기존의 블록에 연결할지(Flase
) 결정한다.
size는 새로운 공유 메모리 블록을 만들 때 요청된 바이트 수를 지정한다. 그렇게 생성된 블록의 사이즈는 요청한 크기와 정확히 같거나 더 클 수도있다. 기존 메모리 블록에 연결할 때는 무시된다.
close()
공유 메모리에 대한 액세스를 닫는다. close()
를 호출해도 공유 메모리 블록은 파괴되지 않는다.
unlink()
메모리 블록이 삭제되도록 요청한다. 공유 메모리 블록에 접근하는 모든 프로세스들 중 하나가 단 한번만 호출해야 한다. 파괴를 요청한 후, 즉시 파괴될 수도 있고 그렇지 않을 수도 있다.
buf
공유 메모리 블록 내용에 대한 메모리 뷰
name
공유 메모리 블록의 고유한 이름에 대한 읽기 전용 액세스
size
공유 메모리 블록 크기(바이트)에 대한 읽기 전용 액세스
세마포어는 각 acquire()
호출에 의해 감소하고 각 release()
호출에 의해 증가하는 내부 카운터를 관리 한다. 카운터는 절대 0 밑으로 떨어질 수 없다. acquire()
가 0임을 발견하면, 다른 스레드가 release()
를 호출할 때까지 대기하면서 블록 합니다.
세마포어 객체를 구현한다. 주어진 value가 0보다 작으면 ValueError
가 발생한다.
acquire(blocking=True, timeout=None)
세마포어를 획득한다.
인자 없이 호출될 때:
True
를 반환한다.release()
를 호출하여 깨울 때 까지 블록한다. 일단 깨어나면 (카운터가 0보다 크면), 카운터를 1 줄이고 True
를 반환한다. release()
를 호출할 때 마다 정확히 하나의 스레드가 깨어난다. 스레드가 깨어나는 순서에 의존해서는 안된다.False로 설정한 blocking으로 호출하면 블록 하지 않는다. 인자가 없는 호출이 블록 할 것이라면, 즉시 False
를 반환한다. 그렇지 않느면 인자 없이 호출할 때와 같은 작업을 수행하고 True
를 반환한다.
None
이외의 timeout으로 호출하면, 최대 timeout 초 동안 블록 한다. 그 사이 획득이 완료되지 않으면, False
를 반환한다. 완료되면 True
를 반환한다.
release()
내부 카운터를 1 증가시키면서 세마포어를 해제한다. 진입 시 0이고 다른 스레드가 다시 0보다 커리기를 기다리고 있으면, 해당 스레드를 깨운다.
오우 이건 애를 좀 먹었다. 처음에는 SharedMemory
의 버퍼가 bytearray 인 줄 모르고 왜 값이 255까지 밖에 안들어가는지 고생을 했다. 다른 동기들에게 물어보니 numpy
의 배열을 사용해보라고 힌트를 얻어서 해보게 되었다. 굳이 넘파이까지 사용하지 않아도 공유 메모리를 잘 사용할 수 있게 업데이트 해줬으면 좋겠다. 코드 해설은 주석으로 달아놓았다.
from multiprocessing import Process, Semaphore, shared_memory
import time
import numpy as np
def worker(id, number, a, memory_name, semaphore1):
"""
0에서 5천만까지 1씩 증가시킨다.
"""
increased_number = 0
for i in range(number):
increased_number += 1
"""
세마포어로 공유 메모리의 데이터를 다른 프로세스가
접근하지 못하게 블록한다.
"""
semaphore1.acquire()
"""
같은 공유 메모리 블록을 활용한다.
"""
same_shared_memory1 = shared_memory.SharedMemory(name=memory_name)
"""
공유 메모리 블록의 버퍼를 활용하기 좋게 넘파이 배열로 변환한다.
이것은 아래 코드의 공유된 넘파이 배열의 레퍼런스이다.
그래서 아래 코드의 "b"와 여기의 "c"는 같은 배열이다.
"""
c = np.ndarray(a.shape, dtype=a.dtype, buffer=same_shared_memory1.buf)
"""
공유 메모리 블록에 5천만이라는 값을 증가시킨다.
"""
c[0] += increased_number
"""
세마포어로 다른 프로세스가 공유 메모리에 접근 가능하도록 풀어준다.
"""
semaphore1.release()
if __name__ == "__main__":
start_time = time.time()
"""
세마포어 객체 생성
"""
semaphore1 = Semaphore()
"""
넘파이 배열
숫자 하나만 담을 예정이므로 아이템이 하나인 1차원 배열을 만든다.
"""
a = np.array([0])
"""
넘파이 배열을 이용하여
새로운 공유 메모리를 생성한다.
"""
shared_memory1 = shared_memory.SharedMemory(create=True, size=a.nbytes)
"""
공유 메모리의 버퍼를 활용하기 좋게 넘파이 배열로 변환한다.
"""
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shared_memory1.buf)
"""
타겟 함수(worker)를 실행하는 프로세스들이다.
"""
th1 = Process(target=worker, args=(1, 50000000, a, shared_memory1.name, semaphore1))
th2 = Process(target=worker, args=(2, 50000000, a, shared_memory1.name, semaphore1))
th1.start()
th2.start()
th1.join()
th2.join()
print("--- %s seconds ---" % (time.time() - start_time))
total = b[0]
print("total_number=",end=""), print(total)
print("end of main")
shared_memory1.close()
shared_memory1.unlink()
결과는 아래와 같다.
--- 3.0317249298095703 seconds ---
total_number=100000000
end of main
깔--------끔 😎