Python - Pool로 함수를 병렬 처리

Human Being·2022년 5월 4일
0
post-custom-banner

CPU 정보 확인

병렬처리를 위해 CPU 개수 확인하기

import multiprocessing as mp
print(mp.cpu_count()
        ,mp.current_process().name)

Pool

참고 : https://yganalyst.github.io/data_handling/memo_17_parallel/

import time, os

def work_func(x):
    print("work_func:", x, "PID", os.getpid())
    time.sleep(1)
    return x**5

if __name__ == "__main__":
    start = int(time.time())
   
    # pool을 위한 추가 ####
    from multiprocessing import Pool
    cpu = 4
    pool = Pool(cpu)
    print(pool.map(work_func, range(0,12)))
		######################
   
    #print(list(map(work_func, range(0,12))))
    print("***run time(sec) :", int(time.time()) - start)

Before Pool

1개의 process가 작업을 처리해서

모두 같은 PID가 출력되고

12개를 처리하는데 12초 소요

work_func: 0 PID 5592
work_func: 1 PID 5592
work_func: 2 PID 5592
work_func: 3 PID 5592
work_func: 4 PID 5592
work_func: 5 PID 5592
work_func: 6 PID 5592
work_func: 7 PID 5592
work_func: 8 PID 5592
work_func: 9 PID 5592
work_func: 10 PID 5592
work_func: 11 PID 5592
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049, 100000, 161051]
***run time(sec) : 12

After Pool

cpu 4개로 설정하니 4줄씩 출력되면서
12개를 3초에 처리

0, 4, 8 끼리 같은 PID

work_func: 0 PID 5743
work_func: 1 PID 5744
work_func: 2 PID 5745
work_func: 3 PID 5746
work_func: 4 PID 5743
work_func: 5 PID 5744
work_func: 6 PID 5745
work_func: 7 PID 5746
work_func: 8 PID 5743
work_func: 9 PID 5744
work_func: 10 PID 5745
work_func: 11 PID 5746
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049, 100000, 161051]
***run time(sec) : 3

심화

참고 : https://docs.python.org/ko/3/library/multiprocessing.html#multiprocessing-programming


from multiprocessing import Pool
import multiprocessing
import time
import random
import sys

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
    )
def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

if __name__ == '__main__':
    cpu = 4
		num_of_tasks = 10
    
    with multiprocessing.Pool(cpu) as pool:
        TASKS = [(mul, (i, 7)) for i in range(num_of_tasks)] + \
            [(plus, (i, 8)) for i in range(num_of_tasks)]  

				results = pool.map(calculatestar, TASKS) # len(results)
        async_result = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS) # imap_it._length
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results - map():')
        for r in results:
            print('\t', r)
        print()

				print('Ordered async_results - apply_async():')
        for r in async_result:
            print('\t', r.get())
        print()

        print('Ordered results - imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results - imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

소요 시간

num of task = 10개 (task 개수가 적을 때)

map finish 1.4051220417022705
apply_async finish 0.0005249977111816406
imap finish 4.792213439941406e-05
imap_unordered finish 3.0040740966796875e-05

num of task = 100개 (task 개수가 많을 때)
직후에 출력하였고 결과물의 길이(개수) 확인해보니 imap에서 None이 나옴

map finish : 14.041080713272095, 200
apply_async finish : 0.001519918441772461, 200 (허수)
imap finish : 2.09808349609375e-05, None
imap_unordered finish : 2.09808349609375e-05, None

map()

SpawnPoolWorker-1: mul(0, 7) = 0
SpawnPoolWorker-1: mul(1, 7) = 7
SpawnPoolWorker-2: mul(2, 7) = 14
SpawnPoolWorker-2: mul(3, 7) = 21
SpawnPoolWorker-3: mul(4, 7) = 28
SpawnPoolWorker-3: mul(5, 7) = 35
SpawnPoolWorker-4: mul(6, 7) = 42
SpawnPoolWorker-4: mul(7, 7) = 49
SpawnPoolWorker-2: mul(8, 7) = 56
SpawnPoolWorker-2: mul(9, 7) = 63
SpawnPoolWorker-3: plus(0, 8) = 8
SpawnPoolWorker-3: plus(1, 8) = 9
SpawnPoolWorker-2: plus(2, 8) = 10
SpawnPoolWorker-2: plus(3, 8) = 11
SpawnPoolWorker-1: plus(4, 8) = 12
SpawnPoolWorker-1: plus(5, 8) = 13
SpawnPoolWorker-4: plus(6, 8) = 14
SpawnPoolWorker-4: plus(7, 8) = 15
SpawnPoolWorker-2: plus(8, 8) = 16
SpawnPoolWorker-2: plus(9, 8) = 17

apply_async()

SpawnPoolWorker-4: mul(0, 7) = 0
SpawnPoolWorker-1: mul(1, 7) = 7
SpawnPoolWorker-2: mul(2, 7) = 14
SpawnPoolWorker-3: mul(3, 7) = 21
SpawnPoolWorker-1: mul(4, 7) = 28
SpawnPoolWorker-2: mul(5, 7) = 35
SpawnPoolWorker-4: mul(6, 7) = 42
SpawnPoolWorker-4: mul(7, 7) = 49
SpawnPoolWorker-2: mul(8, 7) = 56
SpawnPoolWorker-1: mul(9, 7) = 63
SpawnPoolWorker-3: plus(0, 8) = 8
SpawnPoolWorker-4: plus(1, 8) = 9
SpawnPoolWorker-4: plus(2, 8) = 10
SpawnPoolWorker-1: plus(3, 8) = 11
SpawnPoolWorker-2: plus(4, 8) = 12
SpawnPoolWorker-3: plus(5, 8) = 13
SpawnPoolWorker-3: plus(6, 8) = 14
SpawnPoolWorker-4: plus(7, 8) = 15
SpawnPoolWorker-1: plus(8, 8) = 16
SpawnPoolWorker-3: plus(9, 8) = 17

imap()

SpawnPoolWorker-3: mul(0, 7) = 0
SpawnPoolWorker-3: mul(1, 7) = 7
SpawnPoolWorker-2: mul(2, 7) = 14
SpawnPoolWorker-2: mul(3, 7) = 21
SpawnPoolWorker-1: mul(4, 7) = 28
SpawnPoolWorker-4: mul(5, 7) = 35
SpawnPoolWorker-3: mul(6, 7) = 42
SpawnPoolWorker-3: mul(7, 7) = 49
SpawnPoolWorker-3: mul(8, 7) = 56
SpawnPoolWorker-1: mul(9, 7) = 63
SpawnPoolWorker-2: plus(0, 8) = 8
SpawnPoolWorker-4: plus(1, 8) = 9
SpawnPoolWorker-3: plus(2, 8) = 10
SpawnPoolWorker-1: plus(3, 8) = 11
SpawnPoolWorker-4: plus(4, 8) = 12
SpawnPoolWorker-2: plus(5, 8) = 13
SpawnPoolWorker-1: plus(6, 8) = 14
SpawnPoolWorker-1: plus(7, 8) = 15
SpawnPoolWorker-3: plus(8, 8) = 16
SpawnPoolWorker-2: plus(9, 8) = 17

imap_unordered()

SpawnPoolWorker-4: mul(0, 7) = 0
SpawnPoolWorker-3: mul(3, 7) = 21
SpawnPoolWorker-3: mul(5, 7) = 35
SpawnPoolWorker-2: mul(1, 7) = 7
SpawnPoolWorker-4: mul(4, 7) = 28
SpawnPoolWorker-3: mul(6, 7) = 42
SpawnPoolWorker-1: mul(2, 7) = 14
SpawnPoolWorker-4: mul(8, 7) = 56
SpawnPoolWorker-4: plus(1, 8) = 9
SpawnPoolWorker-3: mul(9, 7) = 63
SpawnPoolWorker-2: mul(7, 7) = 49
SpawnPoolWorker-4: plus(2, 8) = 10
SpawnPoolWorker-1: plus(0, 8) = 8
SpawnPoolWorker-1: plus(6, 8) = 14
SpawnPoolWorker-4: plus(5, 8) = 13
SpawnPoolWorker-3: plus(3, 8) = 11
SpawnPoolWorker-4: plus(8, 8) = 16
SpawnPoolWorker-2: plus(4, 8) = 12
SpawnPoolWorker-1: plus(7, 8) = 15
SpawnPoolWorker-3: plus(9, 8) = 17

정리

참고 : https://stackoverflow.com/questions/26520781/multiprocessing-pool-whats-the-difference-between-map-async-and-imap

  • map
    • 오래 걸리지만 지정된 cpu(process) 개수만큼 동시에 실행
    • cpu 순서대로 일을 배정
      • 4개의 CPU를 가동했다면 그들이 동시에 끝나야 다음 번 묶음의 일을 진행
      • iterable(리스트)를 한 묶음에 cpu 개수만큼 나눈 목록을 메모리에 보관해야 하기에
      • 메모리 비용 소모 주의
    • 결과값을 모두 반환받을 때까지 기다려야 함 → 이후 과정에 영향을 줄 때 사용
  • async_result
    • 비동기로 실행 → 결과값 바로 볼 수 없음
      • → 객체를 반환해주기에 나중에 확인 가능 (리스트에 객체 개수로 200개가 남아있지만 그 즉시 함수 결과값을 볼 순 없음)
    • 여러 개의 task를 넘기지 못하고 하나씩 꺼내서 다시 리스트에 담아서 확인하는 식
    • 항상 그 순간에 일이 가능한 프로세스에게 일 시키는 방식
  • imap
    • 비동기로 실행 → 결과값을 바로 볼 수 없음 → 객체를 반환해주기에 나중에 확인 가능
    • imap 객체로 반환
    • map()과의 차이점은, iterable(리스트)를 묶음 단위로 나누지 않음.
      • 그래서 개수가 많을 수록 느려지는 단점
      • chunksize 옵션으로 묶음 개수를 1보다 크게 주어 완화할 수 있음
post-custom-banner

0개의 댓글