파이썬의 multiprocessing
모듈에서는 프로세스 간의 통신 채널로 Queue
와 Pipe
를 지원합니다.
Queue
의 경우 queue
모듈의 Queue
클래스의 클론에 가깝습니다. 여러개의 프로세스가 하나의 Queue
를 바라보고, 데이터를 삽입/추출하는 방식을 통해 데이터를 전달합니다.
import time
import multiprocessing
def set_data(q):
p = multiprocessing.current_process()
msg = "Hello World"
q.put(msg) # queue에 데이터 삽입
print ("[%s] set queue data : %s" % (p.name, msg))
def get_data(q):
time.sleep(1)
p = multiprocessing.current_process()
print ("[%s] get queue data : %s" % (p.name, q.get())) # queue에서 데이터 추출
def main():
queue = multiprocessing.Queue()
p1 = multiprocessing.Process(name="set_data", target=set_data, args=(queue,))
p1.start()
p2 = multiprocessing.Process(name="get_data", target=get_data, args=(queue,))
p2.start()
p1.join()
p2.join()
if __name__ == "__main__":
main()
프로세스 간에 데이터를 전달하기 위한 queue
를 생성하여 set_data
, get_data
두 프로세스에게 인자로 전달하였습니다. set_data
프로세스에서는 데이터를 생성하여 queue
에 삽입하고, get_data
프로세스에서는 데이터를 추출하여 출력합니다. 이처럼 프로세스 간 공유되는 queue
를 만들고 데이터를 queue
에 삽입/삭제 하는 방식을 통해 전달할 수 있습니다. 이와 같은 multiprocessing
모듈의 Queue
는 스레드와 프로세스 간의 데이터 무결성을 보장하기 때문에 무결성에 대한 조치 없이도 사용할 수 있습니다.
[set_data] set queue data : Hello World
[get_data] get queue data : Hello World
- get(block=True, timeout=None)
큐에 저장된 객체를 반환하며 삭제합니다. 인자block
이True
이면timeout
만큼 데이터가 들어올 때까지 블록하게 됩니다.timeout
이 양수인 경우,block
이True
이면timeout
만큼 블록하고 해당 시간동안 객체가 들어오지 않으면queue.Empty
예외를 발생시킵니다.- put(obj, block=True, timeout=None) :
obj
를queue
에 넣습니다.block
이True
이고timeout
이None
이면 빈 슬롯이 생길때까지 블록하게 됩니다.timeout
이 양수인 경우timeout
초만큼 블록하고 그 동안 빈 슬롯이 생기지 않으면queue.Full
예최를 발생시킵니다.
한번에 여러 프로세스와 정보를 주고 받을 수 있는 Queue
와 달리 Pipe
는 1:1 통신 방식을 지원합니다. 따라서 Pipe
를 사용하게 되면 마치 client-server 처럼 2개의 pipe
객체를 반환받고 이를 통해서 메세지를 주고 받게 됩니다.
import multiprocessing
def child(pipe):
p = multiprocessing.current_process()
msg = "Hello World"
pipe.send(msg) # 데이터 전송
print ("[%s] Send a message to pipe : %s" % (p.name, msg))
def main():
parent_pipe, child_pipe = multiprocessing.Pipe() # 2개의 pipe 객체 반환
p = multiprocessing.Process(name="child", target=child, args=(child_pipe,))
p.start()
print ("Recieved message : %s" % parent_pipe.recv()) # 데이터 수신
p.join()
if __name__ == "__main__":
main()
Pipe
생성자는 Pipe
의 양 끝에 해당하는 2개의 객체를 반환합니다. 반환된 2개의 객체를 서로 다른 프로세스에게 전달하고, 두 프로세스는 해당 pipe
객체를 통해 데이터를 주고받게 됩니다.
[child] Send a message to pipe : Hello World
Recieved message : Hello World
class multiprocessing.connection.Connection
- send(obj) : 연결의 반대편 끝에서
recv()
를 사용하여 읽을 객체를 보냅니다. 단, 전송하는 객체는 피클이 가능해야 합니다.- recv() : 연결의 반대편 끝에서
send(obj)
를 통해 보낸 객체를 반환합니다. 객체를 수신할 때까지 블록하며, 수신할 내용이 없는 상태로 반대편 끝이 닫히게 된다면EOFError
를 발생시킵니다.- close() : 연결을 닫습니다.
스레드나 프로세스와 같이 동시성을 사용하는 로직을 설계할 때는 메모리를 공유하지 않는 것이 가장 좋습니다. 동시에 하나의 메모리에 접근하여 어떤 동작을 하게된다면 메모리의 무결성을 보장하기 위한 로직을 구현해야하기 때문입니다.
하지만 메모리를 필수적으로 공유해야하는 상황이 발생할 수 있습니다. 이러한 경우를 대비하여 multiprocessing
모듈은 프로세스가 메모리를 공유할 수 있도록 Server process
를 사용하는 방법과 실제로 프로세스간에 메모리를 공유할 수 있도록 하는 방법을 제공하고 있습니다.
여러 프로세스에게 공유되는 메모리 공간을 shared memory
라고 부릅니다. multiprocessing
모듈에서는 공유된 메모리 공간에 저장할 수 있는 Value
와 Array
라는 API를 제공합니다.
import multiprocessing
def worker(num, num_list):
p = multiprocessing.current_process()
print ("[%s] num : %s" % (p.name, num.value))
for idx, value in enumerate(num_list):
print ("[%s] num list[%s] : %s" % (p.name, idx, value))
num.value = 50 # value 수정
for i in range(len(num_list)): # array 수정
num_list[i] = num_list[i] * 10
# num_list = [x*10 for x in num_list] # list comprehension 사용 불가
def main():
single_integer = multiprocessing.Value("i", 5) # Value 생성(type, 초기화 값)
integer_list = multiprocessing.Array("i", range(10)) # Array 생성(type, 초기화 값)
p = multiprocessing.Process(name="worker", target=worker, args=(single_integer, integer_list))
p.start()
p.join()
print ("num : %s" % (single_integer.value))
for idx, value in enumerate(integer_list):
print ("num list[%s] : %s" % (idx, value))
if __name__ == "__main__":
main()
main
프로세스에서는 Value
와 Array
객체를 생성시키고 worker
프로세스에 전달하였습니다. worker
프로세스에서는 전달받은 Value
와 Array
객체의 값을 출력한 후, 값을 수정한 다음 다시 출력하는 작업을 수행합니다.
Value
와 Array
API는 내부적으로 프로세스와 스레드 간의 무결성이 보장됩니다. 따라서 해당 객체들을 사용하는 프로세스에서 데이터를 조회, 수정 등 다양한 작업을 수행하더라도 별도의 무결성을 위한 로직을 구현하지 않고 프로세스끼리 공유하는 메모리 공간을 사용할 수 있습니다.
데이터를 읽기와 쓰기를 동시에 수행하는 복합대입연산자의 경우 무결성이 깨지거나, List comprehension의 경우 의도한대로 동작하지 않을 수 있습니다. 복합대입연산자의 경우 원자적 연산이 아닙니다. 따라서 공유되는 메모리에 존재하는 데이터를 원자적으로 증가시키려면 아래와 같은 방법으로 구현해야 합니다.
num.value += 10 # 원자적 방법 x
with num.get_lock(): # lock을 획득하여 원자적으로 수행하도록한다.
num.value += 10
만약 위의 방법처럼 공유 메모리의 자원을 수정하게 되는 경우, 데이터의 읽기, 쓰기 동작 사이에 다른 프로세스에서 연산을 수행하게 되어 데이터의 값이 더렵혀져 무결성이 깨지게 되는 문제가 발생할 수 있습니다.
List comprehension의 경우에도 마찬가지입니다. list comprehension
의 경우 loop을 돌며 lock
을 획득하지만 값을 초기화하기 위한 lock
은 획득하지 못하여 값이 변경되지 않게 됩니다. list comprehension
을 사용하여도 오류가 발생하지는 않지만 원하는 결과대로 동작하지 않게 되므로, shared memory
를 통한 데이터 공유시에는 주의해서 사용하여야 합니다.
서버 프로세스는 별도의 메모리 공간과 데이터를 가지고 있는 server process
가 존재하고, 이 server process
에 요청을 통해 데이터를 받아오고, 수정해서 다시 서버로 요청을 보내는 방식으로 메모리를 공유하는 방식입니다. 내부적으로 자원을 관리하는 프로세스와, 해당 프로세스의 메모리 공간에 저장된 데이터를 공유하며 자원을 관리하는 방법입니다. 공유되는 프로세스에 접근해서 데이터에 관한 동작을 할 수 있도록 하는 것이 manager API
입니다.
import multiprocessing
def print_array_or_list(name, values):
for idx, value in enumerate(values):
print ("[%s] num list[%s] : %s" % (name, idx, value))
def worker(v, a, l, d):
p = multiprocessing.current_process()
print ("[%s] value : %s, dict : %s" % (p.name, v, d["key"]))
print_array_or_list(p.name, a)
print_array_or_list(p.name, l)
v.value = 50
for i in range(len(a)):
a[i] = a[i] * 10
for i in range(len(l)):
l[i] = l[i] * 10
d["key"] = "Python3"
def main():
manager = multiprocessing.Manager() # SyncManager 객체 반환
v = manager.Value("i", 5)
a = manager.Array("i", range(10))
l = manager.list(range(10))
d = manager.dict()
d["key"] = "Python2"
p = multiprocessing.Process(name="worker", target=worker, args=(v, a, l, d))
p.start()
p.join()
main_name = "main"
print ("[%s] value : %s, dict : %s" % (main_name, v, d["key"]))
print_array_or_list(main_name, a)
print_array_or_list(main_name, l)
if __name__ == "__main__":
main()
main
프로세스에서 SyncManager
객체를 생성하고, 해당 manager
사용하여 공유되는 Value
, Array
, list
, dict
객체를 생성하고 해당 proxy
를 변수에 할당하였습니다. 이와 같은 proxy
들은 shared memory
에 존재하는 객체처럼 무결성이 보장되며, 여러 프로세스에게 공유될 수 있습니다.
Server process
는 공유되는 객체로 Event
, Locl
, Namespace
, Queue
, RLock
, Semaphore
, Array
, Value
, dict
, list
를 사용할 수 있습니다. 따라서 manager
를 통해 프로세스가 사용할 수 있는 대부분의 기능을 사용할 수 있습니다.
프로세스에서는 pool
을 만들어 작업을 분리하여 처리할 수 있습니다. 작업과 작업에 필요한 데이터를 pool
에 등록하고, 사용할 프로세스의 개수를 입력하면 작업과 작업에 필요한 데이터를 나누어서 처리하게 됩니다.
import multiprocessing
import time
def print_initial_msg(): # initializer : Process가 생성되었을 시점에 실행
print ("Start process : %s" % multiprocessing.current_process().name)
def worker(data):
time.sleep(1)
print("[%s] data : %s"%(multiprocessing.current_process().name, data))
return data * 2
def main():
# 프로세스의 개수 4개로 pool 생성
pool = multiprocessing.Pool(processes=4, initializer=print_initial_msg)
data_list = range(10)
result = pool.map(worker, data_list) # worker 함수와 data_list를 mapping
pool.close()
pool.join()
print ("Result : %s" % result)
if __name__ == "__main__":
main()
main
프로세스에서 프로세스의 개수가 4개인 pool
을 생성하였습니다. 프로세스가 생성되는 시점에 실행되는 initializer
로는 프로세스의 정보를 출력하는 print_initial_msg()
함수로 설정하였고, map()
을 통해 worker
함수와 data_list
를 매핑하여 실행하였습니다.
Start process : SpawnPoolWorker-1
Start process : SpawnPoolWorker-2
Start process : SpawnPoolWorker-3
Start process : SpawnPoolWorker-4
[SpawnPoolWorker-1] data : 0
[SpawnPoolWorker-2] data : 1
[SpawnPoolWorker-3] data : 2
[SpawnPoolWorker-4] data : 3
[SpawnPoolWorker-1] data : 4
[SpawnPoolWorker-2] data : 5
[SpawnPoolWorker-3] data : 6
[SpawnPoolWorker-4] data : 7
[SpawnPoolWorker-1] data : 8
[SpawnPoolWorker-2] data : 9
Result : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
총 4개의 프로세스가 생성되었고, 각 프로세스에서는 데이터들을 병렬적으로 처리하였습니다. 이와 같이 pool
은 단순 반복 작업들을 프로세스를 활용하여 쉽고 빠르게 처리할 수 있도록 도와주어 빅데이터 처리나 연산을 빠르고 유용하게 도와줍니다.