Ray를 사용한 병렬 처리

Michael Kim·2022년 6월 28일
1

0. Intro

이전 회사에 있었을 때, 몇 만 장의 이미지와 그에 대한 라벨링 데이터를 처리하면서 속도가 너무 낮아 애를 쓴 적이 있다. 이때부터 파이썬의 성능 개선 필요성을 느끼고 있었다.

그리고 앞으로 AI Engineer로 일하면서 python을 많이 사용하게 될텐데, 이에 Ray는 업무 효율을 높여주는 데에 도움이 많이 될 것 같다는 생각이 들었다.

파이썬에서는 Threading, Multiprocessing 그리고 Asyncio를 지원한다.

  • Threading은 하나의 cpu, 프로세스 안에서만 동작하며, 여러 동작을 조금씩 나누어서 동작하는 원리로 속도 개선은 되지 않는다.
  • Asyncio의 경우엔, 비동기 처리를 지원하여 threading이랑 비슷하게 동작하는 것 같지만, I/O 처리 시간에 다른 코드를 실행시키며 동작 시간은 단축시킨다는 차이점이 있다. 예를 들어 request 모듈을 사용하여 다른 곳에 정보를 요청할 때, 요청하고 응답 받는 사이 시간에 짬을 내서 코드를 작동시킨다.
  • Multiprocessing은 파이썬의 GIL(Global Interpreter Lock) 제약 없이 여러 프로세스를 사용할 수 있게 해준다. 하지만, 각 프로세스끼리 데이터를 전달하고 처리하는 과정이 많아짐에 따라 동작 시간을 더 늘어나게 되는 경우도 있다.

이러한 모듈에 비해 Ray는 직렬화 오버헤드 문제 없이, 간단한 코드를 추가하는 것만으로 병렬 처리를 가능하게 한다. (Ray는 윗 모듈과 완전히 다른 라이브러리가 아닌, 내부에서 threading, asyncio, multiprocessing 모두 사용하여 효율을 최적화시킨 라이브러리로 보인다.)

1. Ray - Task Test

다음은 매개변수 n을 받아, n초 만큼 쉬고 n^2을 반환하는 함수다.

import time


def function(n):
    time.sleep(n)
    return n*n

print("start")
start = time.time()

results = []
for n in range(1, 6):
    results.append(function(n))
print("결과값: ", results)
    
end = time.time()
print("실행 시간: ",end - start)

============
start
결과값:  [1, 4, 9, 16, 25]
실행 시간:  15.124118328094482
>>>

이 함수에 다음과 같이 @ray.remote 데코레이터만 입히는 것으로 병렬 처리가 가능해진다. 함수에 인수를 넣을 때는 remote안에 넣어 작동하며, remote 함수를 호출하면 Object(Future 객체) Ref(공유 메모리 주소)를 반환한다. 그리고 ray.get을 통해 Object를 실행시킬 수 있다. 주의할 점은 ray.get에 넣은 Object Ref들이 가리키는 작업들이 모두 끝날 때까지 다음 코드를 실행하지 않는다. (참고: Antipattern: Processing results in submission order using ray.get)

import time
import ray


ray.init()

@ray.remote
def ray_function(n):
    time.sleep(n)
    return n*n

start = time.time()
print("ray start")

obj_refs = []
for n in range(1, 6):
    obj_refs.append(ray_function.remote(n))
    
results = ray.get(obj_refs)

print("결과값: ", results)
print("실행 시간: ",time.time() - start)
============
ray start
결과값:  [1, 4, 9, 16, 25]
실행 시간:  6.043393850326538
>>>

ray.wait를 사용하면 준비된 Object Ref와 그렇지 않은 Object Ref를 반환받을 수 있다. 이를 통해 get에 준비된 Object Ref를 원하는 만큼씩만 넣고 중간 결과값을 반환받을 수 있다.

import time
import ray


ray.init()


@ray.remote
def ray_function(n):
    time.sleep(n)
    return n*n

start = time.time()
print("ray start")

obj_refs = []
for n in range(1, 6):
    obj_refs.append(ray_function.remote(n))

results = []
total_task_size = len(obj_refs)
task_time_check = time.time()
while obj_refs:
    done, obj_refs = ray.wait(obj_refs)
    result = ray.get(done[0])
    print(f'[{total_task_size-len(ret)}/{total_task_size}] 결과값: {result}, 실행 시간: {time.time()-task_time_check}')
    results.append(result)
    task_time_check = time.time()

print("총 결과값: ", results)    
print("총 실행 시간: ",time.time() - start)

============
ray start
[1/5] 결과값: 1, 실행 시간: 1.020899772644043
[2/5] 결과값: 4, 실행 시간: 1.0051283836364746
[3/5] 결과값: 9, 실행 시간: 0.9929628372192383
[4/5] 결과값: 16, 실행 시간: 1.0078113079071045
[5/5] 결과값: 25, 실행 시간: 2.0091397762298584
총 결과값:  [1, 4, 9, 16, 25]
총 실행 시간:  6.051797389984131
>>>

Ref.

profile
정리하고 복습하고 일기도 쓰고

0개의 댓글