python multiprocessing(3) - joblib

오영주·2021년 9월 13일
2

multiprocessing을 Joblib으로 바꿔보자

1. Joblib이란?

  • joblib이란
    - python multiprocessing 모듈을 개선한 모듈
    경량 pipelining 을 활성화하면서 병렬 계산을 쉽게하고, 결과를 쉽게 디스크기반의 캐시로 사용할 수 있게 해준다.
  • joblib을 활용하면 쉽게 성능을 높일 수 있는 경우
    - 병렬적인 루프(embarrasingly parallel loop)를 처리하는데 순수 python 을 사용중일 때
    • 출력을 디스크에 저장해 세션과 세션 사이에 결과를 캐시할 수 있는데도, 부작용 없으면서도 비용만 많이 드는 함수를 호출할 때
      (부작용이 없는 함수 : 함수의 출력이 입력에 따라서만 결정되고, 다른 어떤 상태에 따라서도 결정되지 않으며, 함수가 출력값을 반환하는 행위 이외에 외부에 영향을 끼치는 다른 행위를 하지 않는 경우)
    • 프로세스사이에 numpy를 공유하고 있지만 어떻게 하는지 모르는 경우
  • joblib 기본
    - Parallel 클래스와 delayed 데커레이터를 사용한다.
    - Parallel 클래스 : multiprocessing 의 pool과 비슷한 프로세스 풀 생성한다.
    • delayed 데커레이터 : 대상 함수를 감싸서 함수가 이터레이터를 통해 인스턴스화된 Parallel객체에 접근할 수 있게 한다.

2. 몬테카를로 원주율 추정 과정에 joblib을 이용해보자

from joblib import Parallel, delayed
t1=time.time()
nbr_in_quarter_unit_circles2=Parallel(n_jobs=nbr_parallel_blocks, verbose=1) \
(delayed(estimate_nbr_points_in_quarter_circle)(nbr_samples_per_worker)\
 for sample_idx in range(nbr_parallel_blocks))
# Parallel - n_job 매개변수 - 프로세스 실행 갯수 지정
# Parallel에는 iterable 을 매개변수로 받는 __call__호출 가능 메서드가 있음
# 괄호 안에 iterable (...for sample_idx in range(nbr_parallel_blocks)) 을 넘기는 것
# 호출 가능 메서드는 각 delayed(estimate_nbr_points_in_quarter_circle) 함수를 반복하면서 함수 실행을 인자에 묶음으로 넘긴다
# (nbr_samples_per_worker)

t2=time.time()

print(nbr_in_quarter_unit_circles)
pi_estimate=sum(nbr_in_quarter_unit_circles2)*4/float(nbr_samples_in_total)
print("Estimated pi", pi_estimate)
print("Delta:", t2-t1)
  • output
[Parallel(n_jobs=6)]: Using backend LokyBackend with 6 concurrent workers.
[Parallel(n_jobs=6)]: Done   2 out of   6 | elapsed:   11.5s remaining:   23.0s
[Parallel(n_jobs=6)]: Done   6 out of   6 | elapsed:   11.6s finished

[13088998, 13090624, 13088555, 13091653, 13093314, 13089257]
Estimated pi 3.14165232
Delta: 11.658189058303833
Executing estimate_nbr_points_in_quarter_circle 
    with 16,666,666.666666666 on pid 3294

Executing estimate_nbr_points_in_quarter_circle 
    with 16,666,666.666666666 on pid 3295

Executing estimate_nbr_points_in_quarter_circle 
    with 16,666,666.666666666 on pid 3291

Executing estimate_nbr_points_in_quarter_circle 
    with 16,666,666.666666666 on pid 3290

Executing estimate_nbr_points_in_quarter_circle 
    with 16,666,666.666666666 on pid 3292

Executing estimate_nbr_points_in_quarter_circle 
    with 16,666,666.666666666 on pid 3293

3. 함수 호출 결과를 캐싱해보자

  • joblib의 메모리 캐시
    - joblib 에는 함수의 실행 결과를 저장하는 메모리캐시가 존재한다.
    • 함수의 결과를 입력 인자에 따라 디스크 캐시로 저장하는 데커레이터다.
    • 파이선 세션간에 영속적으로 유지되므로 컴퓨터를 껐다가 다음날 켜서 같은 코드를 실행해도 캐시에 저장한 결과를 사용할 수 있다.
    • 첫번째 호출을 완료하면 그 후에는 같은 인자로 같은 함수를 호출할 때 캐시에서 결과를 갖고 오게 됨 (코드를 두번째 호출하면 즉시 완료됨)
    • 단, 인자에 따라 결과를 저장하게 되기때문에, 인자에 상관없는 값 즉, random 함수등에 대한 결과를 저장하는 경우에 대해서는 주의해서 사용해야한다
from joblib import Memory
memory=Memory("./joblib_cache", verbose=0)
@memory.cache
def estimate_nbr_points_in_quarter_circle_with_idx(nbr_estimates, idx):
    print(f"""Executing estimate_nbr_points_in_qurter_circle with
    {nbr_estimates} on sample {idx} on pid {os.getpid()}""")
    nbr_trials_in_quarter_unit_circle=0
    for step in range(int(nbr_estimates)):
        x=random.uniform(0,1)
        y=random.uniform(0,1)
        is_in_unit_circle=x*x+y*y<=1.0
        nbr_trials_in_quarter_unit_circle+=is_in_unit_circle
    return nbr_trials_in_quarter_unit_circle
# 앞에서 실행한 def estimate_nbr_points_in_quarter_circle 이 함수는 서로 구분할 수 있는 인잣값을 전달하지 않음
# nbr_estimates 를 호출할 떄마다 호출 시그니쳐가 같아서 항상 같은 결과를 얻게 됨
# 그래서 인자값에 호출 인덱스를 받도록 함수를 재정의
  • 첫 번째 실행에서의 걸린 시간을 확인해보자
t1=time.time()
nbr_in_quarter_unit_circles=Parallel(n_jobs=nbr_parallel_blocks) \
(delayed(estimate_nbr_points_in_quarter_circle_with_idx) \
 (nbr_samples_per_worker, idx) for idx in range(nbr_parallel_blocks))
t2=time.time()

print(nbr_in_quarter_unit_circles)
pi_estimate=sum(nbr_in_quarter_unit_circles)*4/float(nbr_samples_in_total)
print("Estimated pi", pi_estimate)
print("Delta:", t2-t1)

output

[13091606, 13089706, 13090371, 13092312, 13090225, 13088439]
Estimated pi 3.14170636
Delta: 11.370219945907593
  • 두 번째 실행에서의 걸린 시간을 확인해보자
t1=time.time()
nbr_in_quarter_unit_circles=Parallel(n_jobs=nbr_parallel_blocks) \
(delayed(estimate_nbr_points_in_quarter_circle_with_idx) \
 (nbr_samples_per_worker, idx) for idx in range(nbr_parallel_blocks))
t2=time.time()

print(nbr_in_quarter_unit_circles)
pi_estimate=sum(nbr_in_quarter_unit_circles)*4/float(nbr_samples_in_total)
print("Estimated pi", pi_estimate)
print("Delta:", t2-t1)

output

[13091606, 13089706, 13090371, 13092312, 13090225, 13088439]
Estimated pi 3.14170636
Delta: 0.010504961013793945
profile
data scientist

0개의 댓글