FastAPI에서 동기 함수 비동기로 실행하기: 이벤트 루프 블로킹 방지

낭가인·2025년 12월 2일

SKALA최종프로젝트

목록 보기
3/9

🔍 문제 상황

FastAPI로 AI Agent를 호출하는 API를 개발하던 중, 성능 문제를 발견했습니다.

# ❌ 문제가 있는 코드
async def create_report(self, request: CreateReportRequest) -> dict:
    analyzer = self._get_analyzer()

    # 동기 함수를 그냥 호출 → 이벤트 루프 블로킹!
    result = analyzer.analyze(
        target_location,
        building_info,
        asset_info,
        analysis_params
    )

    return result

문제점

  1. 이벤트 루프 블로킹: analyzer.analyze()는 동기 함수인데, async 함수 내에서 직접 호출
  2. 동시성 상실: 다른 요청들이 현재 분석이 끝날 때까지 대기해야 함
  3. 응답 시간 증가: 분석에 30초 걸리면 다른 요청도 30초 이상 대기

왜 문제가 될까?

FastAPI는 비동기 이벤트 루프를 사용합니다:

[이벤트 루프]
  ├─ 요청 A 처리 (async)
  ├─ 요청 B 처리 (async)
  └─ 요청 C 처리 (async)

하지만 동기 함수를 직접 호출하면:

[이벤트 루프]
  ├─ 요청 A 처리 중... (30초 동기 작업 실행 중)
  │   ↓ 다른 요청들 모두 대기...
  │   ↓
  │   ↓ (30초 경과)
  ├─ 요청 B 처리 시작 (뒤늦게)
  └─ 요청 C 처리 시작 (더 늦게)

💡 해결 방법: ThreadPoolExecutor

동기 함수를 별도 스레드에서 실행하여 이벤트 루프를 블로킹하지 않도록 개선합니다.

1. 필요한 모듈 임포트

import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial

2. Service 클래스에 ThreadPool 추가

class ReportService:
    def __init__(self):
        self._analyzer = None
        self._report_results = {}
        # 최대 4개의 worker 스레드로 ThreadPool 생성
        self._executor = ThreadPoolExecutor(max_workers=4)

max_workers=4의 의미:

  • 최대 4개의 분석 작업을 동시에 처리 가능
  • CPU 코어 수와 메모리를 고려하여 조정 (권장: CPU 코어 수 × 2)

3. 비동기 실행 패턴 적용

async def create_report(self, request: CreateReportRequest) -> dict:
    analyzer = self._get_analyzer()

    # Language 파라미터 준비
    language = request.language.value if request.language else 'ko'

    # ✅ 개선된 코드: 비동기 실행
    loop = asyncio.get_event_loop()

    # partial로 함수와 인자를 미리 바인딩
    analyze_func = partial(
        analyzer.analyze,
        target_location,
        building_info,
        asset_info,
        analysis_params,
        language=language
    )

    # ThreadPool에서 실행 (이벤트 루프는 블로킹되지 않음!)
    result = await loop.run_in_executor(self._executor, analyze_func)

    return result

🔧 핵심 개념 설명

1. asyncio.get_event_loop()

현재 실행 중인 이벤트 루프 인스턴스를 가져옵니다.

loop = asyncio.get_event_loop()

2. functools.partial()

함수와 인자를 미리 결합하여 새로운 함수를 만듭니다.

# 원본 함수
def analyze(location, building, asset, params, language):
    ...

# partial로 인자를 미리 바인딩
analyze_func = partial(
    analyze,
    location_data,
    building_data,
    asset_data,
    params_data,
    language='ko'
)

# 나중에 인자 없이 호출 가능
result = analyze_func()  # 위에서 바인딩한 인자들이 자동으로 전달됨

왜 필요한가?

run_in_executor()는 인자가 없는 callable을 받기 때문에, partial로 인자를 미리 묶어둬야 합니다.

# ❌ 이렇게는 안 됨
result = await loop.run_in_executor(
    executor,
    analyzer.analyze(location, building, ...)  # 즉시 실행되어 버림!
)

# ✅ partial로 감싸면 됨
analyze_func = partial(analyzer.analyze, location, building, ...)
result = await loop.run_in_executor(executor, analyze_func)

3. loop.run_in_executor(executor, func)

ThreadPool의 별도 스레드에서 함수를 실행하고, 완료될 때까지 await로 대기합니다.

result = await loop.run_in_executor(self._executor, analyze_func)

동작 원리:

[Main Thread - Event Loop]
  ├─ 요청 A 시작
  ├─ run_in_executor() 호출 → Worker Thread 1에 작업 전달
  ├─ 요청 B 시작 (블로킹 안 됨!)
  ├─ run_in_executor() 호출 → Worker Thread 2에 작업 전달
  └─ 요청 C 시작 (블로킹 안 됨!)

[Worker Thread 1]
  └─ analyzer.analyze() 실행 중... (30초)

[Worker Thread 2]
  └─ analyzer.analyze() 실행 중... (30초)

📊 성능 비교

Before (동기 호출)

요청 1: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초)
요청 2:                               ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초)
요청 3:                                                               ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초)

총 소요 시간: 90초

After (비동기 실행)

요청 1: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초)
요청 2: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초) ← 동시 실행!
요청 3: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초) ← 동시 실행!

총 소요 시간: 30초

3배 빠른 처리! (max_workers=4 기준, 4개까지 동시 처리 가능)

🎯 실전 적용 팁

1. Worker 수 설정

import os

# CPU 코어 수에 따라 동적 설정
max_workers = min(32, (os.cpu_count() or 1) * 2)
self._executor = ThreadPoolExecutor(max_workers=max_workers)

2. Executor 정리 (Graceful Shutdown)

class ReportService:
    def __init__(self):
        self._executor = ThreadPoolExecutor(max_workers=4)

    def __del__(self):
        """서비스 종료 시 ThreadPool 정리"""
        self._executor.shutdown(wait=True)

3. 타임아웃 설정

import asyncio

try:
    # 60초 타임아웃
    result = await asyncio.wait_for(
        loop.run_in_executor(self._executor, analyze_func),
        timeout=60.0
    )
except asyncio.TimeoutError:
    return {"error": "Analysis timeout"}

🚨 주의사항

1. GIL (Global Interpreter Lock)

Python의 GIL로 인해 CPU-bound 작업은 ThreadPool로 성능 개선이 제한적입니다.

  • I/O-bound 작업: ThreadPool 효과 큼 (네트워크 요청, 파일 읽기 등)
  • CPU-bound 작업: ProcessPoolExecutor 사용 권장 (CPU 집약적 계산)

우리의 경우는 LLM API 호출이 포함되어 I/O-bound이므로 ThreadPool이 효과적입니다.

2. 상태 공유 주의

여러 스레드에서 동시에 접근하는 변수는 Thread-safe해야 합니다.

# ❌ 위험: 여러 스레드에서 동시 수정
self._report_results[report_id] = result

# ✅ 안전: Lock 사용
import threading

class ReportService:
    def __init__(self):
        self._lock = threading.Lock()
        self._report_results = {}

    async def create_report(self, request):
        result = await loop.run_in_executor(...)

        with self._lock:
            self._report_results[report_id] = result

📝 전체 코드

from concurrent.futures import ThreadPoolExecutor
from functools import partial
import asyncio

class ReportService:
    def __init__(self):
        self._analyzer = None
        self._report_results = {}
        self._executor = ThreadPoolExecutor(max_workers=4)

    def _get_analyzer(self):
        if self._analyzer is None:
            from ai_agent import SKAXPhysicalRiskAnalyzer
            from ai_agent.config.settings import load_config

            config = load_config()
            self._analyzer = SKAXPhysicalRiskAnalyzer(config)
        return self._analyzer

    async def create_report(self, request: CreateReportRequest) -> dict:
        analyzer = self._get_analyzer()

        # 데이터 준비
        target_location = {...}
        building_info = {...}
        asset_info = {...}
        analysis_params = {...}
        language = request.language.value if request.language else 'ko'

        # 비동기 실행
        loop = asyncio.get_event_loop()
        analyze_func = partial(
            analyzer.analyze,
            target_location,
            building_info,
            asset_info,
            analysis_params,
            language=language
        )
        result = await loop.run_in_executor(self._executor, analyze_func)

        return result

    def __del__(self):
        self._executor.shutdown(wait=True)

🎓 핵심 요약

BeforeAfter
동기 함수를 async 함수에서 직접 호출run_in_executor()로 별도 스레드 실행
이벤트 루프 블로킹 발생이벤트 루프 블로킹 없음
요청들이 순차 처리요청들이 동시 처리 (max_workers까지)
3개 요청 = 90초3개 요청 = 30초 (3배 빠름)

핵심 패턴:

loop = asyncio.get_event_loop()
func = partial(sync_function, arg1, arg2, ...)
result = await loop.run_in_executor(executor, func)

이 패턴을 사용하면 FastAPI에서 동기 라이브러리를 사용하면서도 비동기의 장점을 그대로 누릴 수 있습니다! 🚀


📚 참고 자료

profile
안녕하세요

0개의 댓글