Python 병렬처리

snooby·2022년 11월 18일
1

⚒ Python

목록 보기
10/14
post-thumbnail

병렬처리의 필요성

이전에 분산처리에 대한 포스팅을 한 적이 있다.
그 포스팅에서도 중요하게 다루었지만 대용량 데이터를 다루기 위해서는 병렬처리 활용 방식을 필수적으로 알아두어야합니다.
데이터 전처리 방식들도 중요하지만 그 방식에 대한 연산속도나 메모리 관리도 신경써야하는 것이죠.

머신러닝이나 딥러닝 프레임워크를 활용하면 학습과정에서 function 내 병렬처리 되도록 기능을 제공하지만 데이터 전처리나 가공 과정에서 주로 사용하는 pandas에서는 해당 기능을 따로 제공하지 않고 있습니다.

Multiprocessing

파이썬 multiprocessing라이브러리의 Pool과 Process를 활용하여 병렬구조로 연산을 처리할 수 있습니다.

1. pool

Pool은 입력 받은 job을 process에 분배하여 함수 실행의 병렬처리를 도와줍니다.

먼저 어떤 값을 5제곱해주는 work_func을 정의하고, 1~12의 12개 값을 map함수를 통해 연산을 시도해봅시다. 그리고 각 실행마다 1초간 멈추며(sleep) os.getpid()로 현재 프로세스를 나타낼 겁니다.

비교를 위해 일반적으로 연산하는 것과 함께 살펴보겠습니다.

  • 일반적 연산
import time, os

def work_func(x):
    print("value %s is in PID : %s" % (x, os.getpid()))
    time.sleep(1)
    return x**5

def main():
    start = int(time.time())
    print(list(map(work_func, range(0,12))))
    print("***run time(sec) :", int(time.time()) - start)

if __name__ == "__main__":
    main()

command line

value 0 is in PID : 15832
value 1 is in PID : 15832
value 2 is in PID : 15832
value 3 is in PID : 15832
value 4 is in PID : 15832
value 5 is in PID : 15832
value 6 is in PID : 15832
value 7 is in PID : 15832
value 8 is in PID : 15832
value 9 is in PID : 15832
value 10 is in PID : 15832
value 11 is in PID : 15832
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049, 100000, 161051]
***run time(sec) : 12

결과를 보면 1개의 피드(PID : 12448)가 작업을 처리하고, 1초간 멈추라고 했으므로 작업 수행까지 12초가 걸린 것을 확인 할 수 있습니다.

  • 병렬처리
import time, os
from multiprocessing import Pool

def work_func(x):
    print("value %s is in PID : %s" % (x, os.getpid()))
    time.sleep(1)
    return x**5

def main():
    start = int(time.time())
    num_cores = 4 # Process의 수
    pool = Pool(num_cores)
    print(pool.map(work_func, range(1,13)))
    print("***run time(sec) :", int(time.time()) - start)

if __name__ == "__main__":
    main()

command line

value 1 is in PID : 25672
value 2 is in PID : 34824
value 3 is in PID : 34000
value 4 is in PID : 21904
value 5 is in PID : 25672
value 6 is in PID : 34824
value 7 is in PID : 34000
value 8 is in PID : 21904
value 9 is in PID : 25672
value 10 is in PID : 34824
value 11 is in PID : 34000
value 12 is in PID : 21904
[1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049, 100000, 161051, 248832]
***run time(sec) : 3

Pool안의 값으로 job을 할당받을 Process의 수를 의미한다. 위와 같이 4개의 Process에 할당하고 동일하게 map함수를 전달해본 결과,
4개의 피드(PID : 25672,34824,34000,21904)가 12개의 수를 3개씩 할당받아 작업을 수행했고 수행시간도 3초로 감소했음을 알 수 있습니다.
WOW!!!

확인했듯이 대용량의 데이터를 처리해야하는 경우 Pool을 사용해서 더 빠른 처리를 도모하는 것이 필요할 것 같습니다.

이제 pool로 병렬처리하는 방법을 배웠다면 pool을 사용해서 데이터프레임 전처리를 병렬적으로 처리해봅시다.

데이터프레임 병렬처리

import time, os
import numpy as np
import pandas as pd
from multiprocessing import Pool

def work_func(data):
    print('PID :', os.getpid())
    data['length_str'] = data['Name'].apply(lambda x : len(x))
    return data

def parallel_dataframe(df, func, num_cores):
    df_split = np.array_split(df, num_cores) # 프로세스(num_cores)만큼 데이터를 분리
    pool = Pool(num_cores)
    df = pd.concat(pool.map(func, df_split)) # 병렬처리 명령을 내리고 다시 pd.concat으로 합쳐준다.
    # 이는 작업 완료 후에도 메모리를 계속적으로 잡아먹는 것을 방지하기 위함.
    pool.close() # 작업이 완료되면 pool을 종료
    pool.join()
    return df

def main():
    start = int(time.time())
    my_dir = r"D:\Python\kaggle\titanic\\"
    df = pd.read_csv(my_dir + "train.csv", dtype=str)
    num_cores = 4
    df = parallel_dataframe(df,work_func, num_cores)
    print("***run time(sec) :", int(time.time()) - start)

if __name__ == "__main__":
    main()

command line

PID : 35312
PID : 16280
PID : 35312
PID : 16280
***run time(sec) : 1

2. Process

Process는 하나의 프로세스를 하나의 함수에 할당하여 실행합니다.

import time, os
from multiprocessing import Process

def work_func(x):
    print("value %s is in PID : %s" % (x, os.getpid()))
    return x**5

def main():
    start = int(time.time())
    procs = []    
    for num in range(1,10):
        proc = Process(target=work_func, args=(num,))
        procs.append(proc)
        proc.start()
        
    for proc in procs:
        proc.join()
    print("***run time(sec) :", int(time.time()) - start)

if __name__ == "__main__":
    main()

command line

value 2 is in PID : 19676
value 3 is in PID : 21344
value 1 is in PID : 11528
value 4 is in PID : 21992
value 6 is in PID : 35212
value 5 is in PID : 11984
value 7 is in PID : 24540
value 9 is in PID : 32868
value 8 is in PID : 32832
***run time(sec) : 0

Process함수는 target=인자에 작업을 할당하고, args=(agr1,)에 인자를 할당하여 프로세스 객체를 생성합니다. 그리고 start()로 시작하여 join()으로 프로세스의 종료를 기다린다. 확인해보면 각 값들이 모두 다른 프로세스(PID)에 할당되어있죠.

profile
데이터를 가치있게 다루고 싶은 개발자 🐥

0개의 댓글