실무에서 내가 만든 모델을 적용하려고 하다 보면, 소요시간이 문제가 된다. 그래서 반복적인 작업을 빠르게 수행할 수 있는 방법을 찾아보다가 병렬처리에 대해 알게 되었는데, 처음에 병렬처리를 적용할 때는 방법론만 익혀서 빠르게 적용해 봤었다. 그런데 어떤 작업들은 병렬처리를 적용하면 오히려 속도가 느려지기도 하고 코드도 복잡해져서 결국 제대로 사용하진 못했다.
그러다가 최근 다시 알고리즘 속도 개선 문제 때문에 병렬처리를 적용해 보고 있는데, 앞으로도 병렬처리를 똑똑하게 사용하기 위해서는 개념부터 잘 익혀두어야 겠다는 생각이 들어서 내용을 정리해 보려 한다. 더불어, 대안으로써 병렬처리와 비교되는 비동기 처리 방법에 대해서도 공부해 보았다.
비동기에 대해서는 사실 FastAPI를 적용하면서 대략적인 개념만 알았는데, (fastAPI에 async def가 사용된다.) 개념도 잘 정리해 두고 비동기 처리 모듈(asyncio)을 사용해서 적용도 해 보려 한다.
병렬 처리의 대표적인 예는 multi-thread
와 multi-process
가 있다.
Multi-Thread란?
: 하나의 프로세스에서 여러 스레드(thread)로 자원을 공유하며, 태스크를 나누어 동시에 병렬적(parallel)으로 수행한다.Multi-Thread의 장단점
장점 단점 1. 응답성
: 작업을 분리해서 수행하므로 실시간으로 사용자에게 응답하나의 프로세스 안에서 작동하므로, 하나의 스레드에서 문제가 생기면 전체 프로세스에 영향 2. 효율성
: 속한 프로세스 내 스레드와 메모리, 자원을 공유하여 효율적으로 사용3. 경제성
: 프로세스 생성 비용보다 스레드 생성 비용이 적고, context switching이 프로세스보다 빠름
Thread.start()
스레드 활동을 시작한다. 스레드 객체 당 1번만 호출된다 (1번 이상 호출되면 RuntimeError). 객체의 run() 메서드가 별도 제어 스레드에서 호출되게 배치한다.
Thread.join()
스레드가 종료될 때까지 기다린다. join() 메서드가 호출된 스레드가 정상적으로, 혹은 예외를 통해 종료하거나 선택적 시간제한 초과가 발생할 때까지 호출하는 스레드를 블록한다.
스레드는 여러 번 join()될 수 있다.
threading
module을 사용한 multi-thread 구현
import time
import threading
def test() :
time.sleep(1)
print('Work Finished!')
if __name__ == '__main__' :
start =time.time()
threads = []
for i in range(10) : # 반복 스텝마다 쓰레드 생성 및 실행
t = threading.Thread(target=test)
t.start()
threads.append(t)
for thread in threads :
thread.join() # 실행 완료된 쓰레드를 합침
# ---- 다른 표현 방식 --------------------------------------
# threads = [None] * 10
#
# for i in range(10) : # 반복 스텝마다 프로세스 생성 및 실행
# threads[i] = threading.Thread(target=test, args=[i])
# threads[i].start()
#
# for i in range(10) :
# threads[i].join()
# -----------------------------------------------------------
print(f'Run Time : {time.time() - start :.4f}') # --> multi-thread
Origin 소요시간 10.09s ---> Multi-Thread 소요시간 1.01s (약 1/10 단축)
Concurrent 모듈을 사용한 Multi-thread 구현
이 모듈은 threading처럼 join을 해주지 않아도 된다는 것이 장점이다.
import time
from concurrent import futures
def test() :
time.sleep(1)
return 'Work Finished!'
if __name__ == '__main__' :
start = time.time()
with futures.ThreadPoolExecutor() as e :
results = [e.submit(test) for i in range(10)]
for f in futures.as_completed(results) :
print(f.result())
print(f'Run Time : {time.time() - start :.4f}') # --> multi-thread
Origin 소요시간 10.09s ---> Multi-Thread 소요시간 1.00s (약 1/10 단축)
리턴 값을 받고자 할 때(concurrent 모듈로 구현해 봄)
import time
from concurrent import futures
def test(x) :
sum_ = 1
for i in range(x[0], x[1]) : # 범위에 대한 연산 수행 (누적곱)
sum_ *= i
print('Work Finished !')
return sum_
if __name__ == '__main__' :
start =time.time()
with futures.ProcessPoolExecutor() as e :
sub = [(i, i+999) for i in range(1, 10000, 1000)] # 1부터 10000까지
result = e.map(test ,sub) # 1000 단위로 10개 끊어서 함수 연산 수행
print(sum(result)) # 결과
print(f'Run Time : {time.time() - start :.4f}') # --> multi-thread
Origin 소요시간 0.021 ---> Mutli-Thread 소요시간 0.7917
수학 연산은 단일 스레드 연산이 더 빨랐다 !
두개 이상의 다수 프로세서(CPU)가 협력적으로 하나 이상의 태스크를 동시에 병렬(parallel)로 처리하는 것이다.
Multi-Process의 장단점
장점 단점 독립성 : 독립 구조로 안전성이 높으며, 프로세스 하나에 문제가 생겨도 다른 프로세스에 영향을 끼치지 않음 작업량이 많으면(context switching) 오버헤드가 발생할 수 있음
multi-thread 예제 2과 같은 태스크
전체적인 코드 양상은 multi-threading과 비슷하다(모듈만 변경)
import time
import multiprocessing
def test() :
time.sleep(1)
print('Work Finished!')
if __name__ == '__main__' :
start =time.time()
processes = []
for i in range(10) : # 반복 스텝마다 프로세스 생성 및 실행
p = multiprocessing.Process(target=test)
p.start()
processes.append(p)
for process in processes :
process.join() # 실행 완료된 프로세스 합침
print(f'Run Time : {time.time() - start :.4f}')
Origin 소요시간 10.09s ---> Multi-process 소요시간 1.7s
ThreadPoolExecutor() →
ProcessPoolExecutor()
로 변경되었다. (multi-thread 예제 2과 같은 태스크)
import time
from concurrent import futures
def test() :
time.sleep(1)
return 'Work Finished!'
if __name__ == '__main__' :
start = time.time()
with futures.ProcessPoolExecutor() as e :
results = [e.submit(test) for i in range(10)]
for f in futures.as_completed(results) :
print(f.result())
print(f'Run Time : {time.time() - start :.4f}')
Origin 소요시간 10.09s ---> Concurrent 모듈을 사용한 Multi-Process 소요시간 1.9s
리턴 값을 받고 싶을때(Concurrent 모듈로 구현)
import time
from concurrent import futures
def test(x) :
sum_ = 1
for i in range(x[0], x[1]) : # 범위에 대한 연산 수행 (누적곱)
sum_ *= i
print('Work Finished !')
return sum_
if __name__ == '__main__' :
start =time.time()
with futures.ProcessPoolExecutor() as e :
sub = [(i, i+999) for i in range(1, 10000, 1000)] # 1부터 10000까지
result = e.map(test ,sub) # 1000 단위로 10개 끊어서 함수 연산 수행
print(sum(result)) # 결과
print(f'Run Time : {time.time() - start :.4f}') # --> multi-process
Origin 소요시간 0.0138 ---> Multi-process 소요시간 0.9
다음 연산은 단일 프로세스 연산이 더 빨랐다. 수학 연산은 cpu 연산인데 multi-thread보다 연산시간이 더 오래 걸린 이유는 무엇일까?? 🥺 context switching이 많이 발생해서,,? 개념처럼 적용이 쉽지만은 않은것 같다..
apply에 Multi-process를 적용하고자 할 때 사용할 수 있는 예제이다.
from multiprocessing import Pool
import numpy as np
import time
def parallelize_dataframe(df, func, n_cores=12):
df_split = np.array_split(df, n_cores) # core의 개수만큼 df를 나눔
pool = Pool(n_cores) # pool을 core개수만큼 생성
df = pd.concat(pool.map(func, df_split)) # 나누어진 df를 func을 적용해서 수행 및 concat
pool.close()
pool.join() # 모두 완료될 때까지 대기
return df
def calculator(df) :
start = time.time()
df['result'] = df.apply(lambda x : my_function(x['input']),axis=1)
print(f'process end : {time.time() - start}')
return df
if __name__ == '__main__' :
start = time.time()
run = parallelize_dataframe(df, calculator)
print(run)
그렇다면 멀티 스레드와 멀티 프로세스는 언제 쓰는 것이 좋을까? python에서는 GIL 정책
에 따라 멀티 스레드가 제대로 작동하지 않는 경우가 있다(단일 스레드와 별반 다르지 않은 결과).
** GIL(Global Interpreter Lock)
GIL은 python의 객체에 대한 접근을 보호하는
mutex
로, Python bytecode를 동시에 여러 스레드에서 실행하지 못하도록 막는다.python의 객체는 참조 횟수(:객체를 가리키는 참조가 몇 개 존재하는가)를 저장하는 필드를 갖고,이러한 참조 횟수가 0이 되면 GC(Garbage Collection)는 해당 객체를 메모리에서 삭제시켜 자원을 관리한다.
이 때 여러 스레드가 python 인터프리터를 동시에 실행하게 되면,Race Condition
(하나의 값에 여러 스레드가 동시에 접근하여, 값이 올바르지 않게 읽히거나 쓰일 수 있는 상태)이 발생할 수 있다. 즉, 여러 스레드가 python 인터프리터를 동시 실행할 경우, 각 객체 참조 횟수가 올바르게 관리되지 못하여 GC가 제대로 동작하지 못할 수 있다는 뜻이다. 이러한 Race Condition은, mutex를 이용하면 예방이 가능하다.
mutex
란, 멀티 스레딩 환경에서 여러 개의 스레드가 어떠한 공유 자원에 접근하기 위해 가지고 있어야 하는 일종의 열쇠와 같은 것이다. 하나의 스레드가 공유 자원에 뮤텍스를 가지고 있다면, 다른 스레드는 그 뮤텍스가 풀리기 전 까지는 기다려야 한다. GC가 올바른 동작을 하기 위해서는 모든 객체에 뮤텍스를 걸어주어야 하기 때문에, 애초에 하나의 스레드가 python 인터프리터를 실행하면 다른 스레드에서 인터프리터를 실행하지 못하도록 막는 것이다.GIL 덕분에 자원 관리를 더 쉽게 구현할 수 있었지만, 이로써 python에서는 멀티 스레딩의 효과가 없는 것 처럼 보인다. python에서 스레드가 쓸모 없는 것은 아니다. GIL이 적용되는 것은 cpu 동작에서이고 쓰레드가 cpu 동작을 마치고 I/O 작업을 실행하는 동안에는 다른 스레드가 cpu 동작을 동시에 실행할 수 있다. 따라서 cpu 동작이 많지 않고 I/O동작이 더 많은 프로그램에서는 멀티 스레드만으로 성능적으로 큰 효과를 얻을 수 있다.
정리하자면, 보편적으로 CPU 동작이 많은 작업에서는 Multi-Process
를, I/O 동작이 많은 작업에서는 Multi-Thread
를 사용하는 것이 좋다!
비동기(asynchronous) 처리
는 여러 작업을 처리하도록 예약한 뒤 작업이 끝나면 결과를 받는 방식이다. CPU 유휴 시간을 줄여 프로그램의 퍼포먼스를 높이는 것이 비동기 처리의 목적이다.
Parallelism
은 한 개의 작업을 여러 작업으로 나누어 물리적으로 동시에 진행(=여러개 cpu를 동시에)하는 것이다.Concurrency
는 여러 작업을 같은 순간에 처리하는 것이다.일반적인 함수는 콜하면 실행되고 결과값을 리턴하면 함수가 종료되는 방식이나, 코루틴(coroutine)은 suspend/resume
이 가능하다. 즉, 결과값을 바로 리턴하지 않고 suspend/yield 할 수 있으며, 중단 시점부터 resume 할 수 있다. generator
는 이러한 코루틴의 한 형태이다.
asycio
는 python에서 coroutine을 동시에(concurrently)
실행시켜 주는 모듈이다. event loop와 coroutine을 기반으로 하므로, multi-thread 대비 context switching 비용이 적게 들어간다.
asyncio 또한 multi-thread와 마찬가지로 GIL 정책에 따라, cpu 작업이 아닌 I/O 작업에서 효율적
이다.
asyncio.run()
코루틴을 실행시키고 결과를 리턴한다. 동일한 스레드에서 asyncio event loop가 실행중이면, 이 작업을 수행할 수 없다. 항상 새로운 event loop를 생성하며, 작업이 끝나면 event loop를 close한다.
asyncio.gather()
awaitable한 객체를 concurrent하게 실행시킨다.
concurrent 하게 태스크를 수행하는 예제이다(공식 문서 참조)
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({number}), currently i={i}...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
return f
async def main():
# Schedule three calls *concurrently*:
L = await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
print(L)
asyncio.run(main()) # factorial 연산 수행
멀티 프로세스와 스레드는 context switching을 통해서 작업을 잘게 나눠 실행하는데 병렬성이라고 할 수 있나요?