FastAPI로 AI Agent를 호출하는 API를 개발하던 중, 성능 문제를 발견했습니다.
# ❌ 문제가 있는 코드
async def create_report(self, request: CreateReportRequest) -> dict:
analyzer = self._get_analyzer()
# 동기 함수를 그냥 호출 → 이벤트 루프 블로킹!
result = analyzer.analyze(
target_location,
building_info,
asset_info,
analysis_params
)
return result
analyzer.analyze()는 동기 함수인데, async 함수 내에서 직접 호출FastAPI는 비동기 이벤트 루프를 사용합니다:
[이벤트 루프]
├─ 요청 A 처리 (async)
├─ 요청 B 처리 (async)
└─ 요청 C 처리 (async)
하지만 동기 함수를 직접 호출하면:
[이벤트 루프]
├─ 요청 A 처리 중... (30초 동기 작업 실행 중)
│ ↓ 다른 요청들 모두 대기...
│ ↓
│ ↓ (30초 경과)
├─ 요청 B 처리 시작 (뒤늦게)
└─ 요청 C 처리 시작 (더 늦게)
동기 함수를 별도 스레드에서 실행하여 이벤트 루프를 블로킹하지 않도록 개선합니다.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial
class ReportService:
def __init__(self):
self._analyzer = None
self._report_results = {}
# 최대 4개의 worker 스레드로 ThreadPool 생성
self._executor = ThreadPoolExecutor(max_workers=4)
max_workers=4의 의미:
async def create_report(self, request: CreateReportRequest) -> dict:
analyzer = self._get_analyzer()
# Language 파라미터 준비
language = request.language.value if request.language else 'ko'
# ✅ 개선된 코드: 비동기 실행
loop = asyncio.get_event_loop()
# partial로 함수와 인자를 미리 바인딩
analyze_func = partial(
analyzer.analyze,
target_location,
building_info,
asset_info,
analysis_params,
language=language
)
# ThreadPool에서 실행 (이벤트 루프는 블로킹되지 않음!)
result = await loop.run_in_executor(self._executor, analyze_func)
return result
asyncio.get_event_loop()현재 실행 중인 이벤트 루프 인스턴스를 가져옵니다.
loop = asyncio.get_event_loop()
functools.partial()함수와 인자를 미리 결합하여 새로운 함수를 만듭니다.
# 원본 함수
def analyze(location, building, asset, params, language):
...
# partial로 인자를 미리 바인딩
analyze_func = partial(
analyze,
location_data,
building_data,
asset_data,
params_data,
language='ko'
)
# 나중에 인자 없이 호출 가능
result = analyze_func() # 위에서 바인딩한 인자들이 자동으로 전달됨
왜 필요한가?
run_in_executor()는 인자가 없는 callable을 받기 때문에, partial로 인자를 미리 묶어둬야 합니다.
# ❌ 이렇게는 안 됨
result = await loop.run_in_executor(
executor,
analyzer.analyze(location, building, ...) # 즉시 실행되어 버림!
)
# ✅ partial로 감싸면 됨
analyze_func = partial(analyzer.analyze, location, building, ...)
result = await loop.run_in_executor(executor, analyze_func)
loop.run_in_executor(executor, func)ThreadPool의 별도 스레드에서 함수를 실행하고, 완료될 때까지 await로 대기합니다.
result = await loop.run_in_executor(self._executor, analyze_func)
동작 원리:
[Main Thread - Event Loop]
├─ 요청 A 시작
├─ run_in_executor() 호출 → Worker Thread 1에 작업 전달
├─ 요청 B 시작 (블로킹 안 됨!)
├─ run_in_executor() 호출 → Worker Thread 2에 작업 전달
└─ 요청 C 시작 (블로킹 안 됨!)
[Worker Thread 1]
└─ analyzer.analyze() 실행 중... (30초)
[Worker Thread 2]
└─ analyzer.analyze() 실행 중... (30초)
요청 1: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초)
요청 2: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초)
요청 3: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초)
총 소요 시간: 90초
요청 1: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초)
요청 2: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초) ← 동시 실행!
요청 3: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초) ← 동시 실행!
총 소요 시간: 30초
3배 빠른 처리! (max_workers=4 기준, 4개까지 동시 처리 가능)
import os
# CPU 코어 수에 따라 동적 설정
max_workers = min(32, (os.cpu_count() or 1) * 2)
self._executor = ThreadPoolExecutor(max_workers=max_workers)
class ReportService:
def __init__(self):
self._executor = ThreadPoolExecutor(max_workers=4)
def __del__(self):
"""서비스 종료 시 ThreadPool 정리"""
self._executor.shutdown(wait=True)
import asyncio
try:
# 60초 타임아웃
result = await asyncio.wait_for(
loop.run_in_executor(self._executor, analyze_func),
timeout=60.0
)
except asyncio.TimeoutError:
return {"error": "Analysis timeout"}
Python의 GIL로 인해 CPU-bound 작업은 ThreadPool로 성능 개선이 제한적입니다.
우리의 경우는 LLM API 호출이 포함되어 I/O-bound이므로 ThreadPool이 효과적입니다.
여러 스레드에서 동시에 접근하는 변수는 Thread-safe해야 합니다.
# ❌ 위험: 여러 스레드에서 동시 수정
self._report_results[report_id] = result
# ✅ 안전: Lock 사용
import threading
class ReportService:
def __init__(self):
self._lock = threading.Lock()
self._report_results = {}
async def create_report(self, request):
result = await loop.run_in_executor(...)
with self._lock:
self._report_results[report_id] = result
from concurrent.futures import ThreadPoolExecutor
from functools import partial
import asyncio
class ReportService:
def __init__(self):
self._analyzer = None
self._report_results = {}
self._executor = ThreadPoolExecutor(max_workers=4)
def _get_analyzer(self):
if self._analyzer is None:
from ai_agent import SKAXPhysicalRiskAnalyzer
from ai_agent.config.settings import load_config
config = load_config()
self._analyzer = SKAXPhysicalRiskAnalyzer(config)
return self._analyzer
async def create_report(self, request: CreateReportRequest) -> dict:
analyzer = self._get_analyzer()
# 데이터 준비
target_location = {...}
building_info = {...}
asset_info = {...}
analysis_params = {...}
language = request.language.value if request.language else 'ko'
# 비동기 실행
loop = asyncio.get_event_loop()
analyze_func = partial(
analyzer.analyze,
target_location,
building_info,
asset_info,
analysis_params,
language=language
)
result = await loop.run_in_executor(self._executor, analyze_func)
return result
def __del__(self):
self._executor.shutdown(wait=True)
| Before | After |
|---|---|
| 동기 함수를 async 함수에서 직접 호출 | run_in_executor()로 별도 스레드 실행 |
| 이벤트 루프 블로킹 발생 | 이벤트 루프 블로킹 없음 |
| 요청들이 순차 처리 | 요청들이 동시 처리 (max_workers까지) |
| 3개 요청 = 90초 | 3개 요청 = 30초 (3배 빠름) |
핵심 패턴:
loop = asyncio.get_event_loop()
func = partial(sync_function, arg1, arg2, ...)
result = await loop.run_in_executor(executor, func)
이 패턴을 사용하면 FastAPI에서 동기 라이브러리를 사용하면서도 비동기의 장점을 그대로 누릴 수 있습니다! 🚀