풀을 만드는 방법은 크게 2가지가 있습니다.
concurrent.futures 모듈은 비동기적으로 콜러블을 실행하는 고수준 인터페이스를 제공합니다.
비동기 실행은 (ThreadPoolExecutor를 사용해서) 스레드나 (ProcessPoolExecutor를 사용해서) 별도의 프로세스로 수행 할 수 있습니다. 둘 다 추상 Executor 클래스로 정의된 것과 같은 인터페이스를 구현합니다.
Thread/Process Pool 구현에 필요한 부분만 살펴보겠습니다.
비동기적으로 호출을 실행하는 메서드를 제공하는 추상 클래스입니다. 직접 사용해서는 안 되며, 구체적인 하위 클래스를 통해 사용해야 합니다.
ThreadPoolExecutor 는 스레드 풀을 사용하여 호출을 비동기적으로 실행하는 Executor 서브 클래스입니다.
Executor 객체를 이용하면 스레드 생성, 시작, 조인 같은 작업을 할 때, with 컨텍스트 관리자와 같은 방법으로 가독성 높은 코드를 구현할 수 있다.
with ThreadPoolExecutor() as executor:
future = executor.submit(함수이름, 인자)
ProcessPoolExecutor 클래스는 프로세스 풀을 사용하여 호출을 비동기적으로 실행하는 Executor 서브 클래스입니다. ProcessPoolExecutor 는 multiprocessing 모듈을 사용합니다. 전역 인터프리터 록 을 피할 수 있도록 하지만, 오직 피클 가능한 객체만 실행되고 반환될 수 있음을 의미합니다.
multiprocessing.Pool.map을 통해 여러 개의 프로세스에 특정 함수를 매핑해서 병렬처리하도록 구현하는 방법
from multiprocessing import Pool
from os import getpid
def double(i):
print("I'm processing ", getpid()) # pool 안에서 이 메소드가 실행될 때 pid를 확인해 봅시다.
return i * 2
with Pool() as pool:
result = pool.map(double, [1, 2, 3, 4, 5])
print(result)
double(i)이라는 메소드가 pool을 통해 각각 다른 pid를 가진 프로세스들 위에서 multiprocess로 실행된다.
해당 코드에서 pid
에 대해 잘못된 설명을 적었었는데, Process ID
로 운영체제에서 각 프로세스에 부여하는 번호를 일컫는다고 합니다. (whdgmawkd님 댓글로 고쳤습니다!)
Future 클래스는 콜러블 객체의 비동기 실행을 캡슐화합니다. Future 인스턴스는 Executor.submit() 에 의해 생성됩니다.
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
실행 결과 값
import math
import concurrent
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
소수(prime) 판별 문제로 PRIMES 변수에 선언된 숫자들이 소수인지 아닌지 판별
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
main() 함수를 따로 작성하여 소수 판별 함수(is_prime) 호출
맵-리듀스(map-reduce) 스타일로 코드를 작성하고 map() 함수를 ProcessPoolExecutor() 인스턴스에서 생성된 executor 에서 실행
concurrent.futures 라이브러리의 프로세스 풀에서 동작하게 하기 위해 with 문을 써서 구현
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
import time
def main():
print("병렬처리 시작")
start = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
end = time.time()
print("병렬처리 수행 시각", end-start, 's')
start = time.time()
for number, prime in zip(PRIMES, map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
end = time.time()
print("단일처리 수행 시각", end-start, 's')
main()
실행 결과
병렬 처리시 0.73초, 단일처리시 2.79초 발생합니다.
※이터레이터, 제너레이터
여기서 말하는 PID는 제어기법이 아니라 Process ID로 운영체제에서 각 프로세스에 부여하는 번호입니다