프로젝트 수행 중에 Fastapi에서 작업시간이 5분이상 소요되는 작업이 존재했었다. 성능을 개선함으로써 시간을 단축하고자 한다.
이 글에서 예시로 들 SELECT문은 SIMULATION 테이블에서 해당 조건 (simulation) 을 만족하는 목록을 모두 가져오는 것이다.
기존 로직
from google.cloud import bigquery
def simulation_iteration(simulation_list): # [{"simulation_id": "2dsadwq", "simulation_id": "22dsadwq", "simulation_id": "25dsadwq"}, ...]
client = bigquery.Client() # Bigquery Client 객체
results = [] # 모든 결과를 담은 리스트
for idx, simulation in enumerate(simulation_list):
query = f"""
SELECT *
FROM SIMULATION
WHERE 1=1
AND SIMULATION_ID = '{simulation_id}';
""" # 작성할 쿼리문(SELECT .. FROM ...)
query_job = client.query(query=query, location="asia-northeast3")
results.append([row for row in query_job.result()]) # 1개의 쿼리결과가 list 형태
생성된 케이스 하나 당 10~15초 걸리기 때문에 배열의 길이가 길수록 처리시간이 매우 길어짐
MutiProcessing는 여러 개의 프로세스가 동시에 실행되는 시스템을 말한다. 각 프로세스는 독립된 메모리 공간과 자원을 가지며, 운영 체제에 의해 관리되고 스케줄링 된다.
for 문을 실행한다면, 각 쿼리마다 10~15초 가량 소요된다. 그래서 쿼리를 실행해서 결과를 받는 부분을 함수로 분리한다. 그리고 그 부분을 Pool(as pool)에 넣어서 실행하면 된다.
변경 후 소스 코드
from multiprocessing import Pool
from google.cloud import bigquery
def run_query(query: str): # 실행할 쿼리
client = bigquery.Client()
query_job = client.query(query=query, location="asia-northeast3")
return [row for row in query_job.result()]
def simulation_iteration(simulation_list): # [{"simulation_id": "2dsadwq", "simulation_id": "22dsadwq", "simulation_id": "25dsadwq"}, ...]
queries = [] # 작업들을 담을 쿼리
results = [] # 쿼리 결과들을 모두 담은 리스트
for idx, simulation in enumerate(simulation_list):
query = f"""
SELECT *
FROM SIMULATION
WHERE 1=1
AND SIMULATION_ID = '{simulation_id}';
""" # 작성할 쿼리문(SELECT .. FROM ...)
queries.append(query)
with Pool(processes=4) as pool: # 프로세스 4개로 병렬 처리 수행
results = list(pool.map(run_query, queries)) # pool.map(<실행할 함수>, <<해당 매개변수>>...)
return results
처리 완료 결과 8분 → 2분으로 단축
5 → 10개 (2분 → 1분20초)
Context Switching이란 멀티태스킹을 수행하는데, 운영 체제가 하나의 프로세스 또는 쓰레드에서 다른 프로세스나 쓰레드로 CPU의 제어를 전환하는 과정이다. 멀티태스킹 시스템에서 작업 간에 CPU 시간을 공유하고, 시스템 자원을 효율적으로 관리하기 위해 필요하다.
위와 같은 과정을 통해 여러 프로세스와 쓰레드 간에 공유하고 분배한다. 하지만 그만큼 비용이 증가하기 때문에 적절하게 사용할 수 있도록 고민이 필요하다.
위와 같이 MutiProcessing의 비용 적인 단점에 대해 보완할 방법인 MultiThreading 기법이 있다.
MultiThreading은 MutiProcessing과 다르게 한 프로세스 내에서 여러 흐름을 사용하는 것이기 때문에, 메모리 영역인 Stack만 독립되어있다.
사용법은 MutiProcessing와 거의 동일하다. Pool -> ThreadPoolExecutor 로 바꿔주면 된다.
소스코드
from concurrent.futures import ThreadPoolExecutor
from google.cloud import bigquery
def run_query(query: str):
client = bigquery.Client()
query_job = client.query(query=query, location="asia-northeast3")
return [row for row in query_job.result()]
def simulation_iteration(simulation_list): # [{"simulation_id": "2dsadwq", "simulation_id": "22dsadwq", "simulation_id": "25dsadwq"}, ...]
queries = [] # 작업들을 담을 쿼리
results = []
for idx, simulation in enumerate(simulation_list):
query = f"""
SELECT *
FROM SIMULATION
WHERE 1=1
AND SIMULATION_ID = '{simulation_id}';
""" # 작성할 쿼리문(SELECT .. FROM ...)
queries.append(query)
with ThreadPoolExecutor(max_workers=16) as executor: # 객체 부분만 다르다.
results = list(executor.map(run_query, queries))
return results
실행 결과
Core 개수가 서버에서 4개의 Core를 사용하기 때문에 4의 배수로 측정하였다. 총 8번의 요청에서 해당 PC에서 사용했을 때 worker 수 대비 처리 시간을 나타냄. 16개 까지는 적어도 수행 시간이 단축되는 결과가 나왔다.
그래프를 그려보니 worker수가 12개까지 가파르게 수행 시간의 차이가 보인다.
평균 7~8분정도 되는 작업을 1분 이내로 줄임으로써 작업속도를 크게 향상시켰다. 그리고 멀티쓰레딩과 멀티프로세싱의 장단점을 고려해보며 해당 방법을 선택하였다.