FastAPI에서 진행 상황을 전송하기 (근데, 동시성처리를 곁들인...)

반야·2024년 10월 5일
0

글을 작성하기 전에 미리 말을 하자면, 학교에서 진행한 프로젝트에서 사용한 코드라 야매라는 점을 꼭 말하고 싶다. 실제로 아래처럼 굴린다면 무슨 문제가 생길 지 장담하지 못한다...



🫠 FastAPI에서 진행 상황을 전송하려는 이유

  1. 현재 프로젝트에서 사용하는 API가 여러 단계의 API를 연속적으로 호출하는 형태임.
    • 예를 들자면, '파스타 요리 API'를 만드는데, 그 API 내부에서 '재료 준비 API', '물 끓이기 API', '면 삶기 API', '소스 만들기 API', ...를 연속적으로 호출하는 상황.

  2. 프론트엔드 측에서 현재 진행 상황을 공유받고 싶어함.
    • 즉, 내 파스타가 어느 단계까지 만들어 졌는지, 그 단계까지 진행한 결과는 어떻게 되는지를 표시하고자 함. (현재, 재료 준비를 완료했고, 무슨무슨 재료를 준비했다 등)


😵 문제 상황

우리는 파스타 네 개를 동시에 만들고 API로 각 파스타의 진행 상황을 공유받고 싶었다.


🤪 기존 처리 방식

기존 처리 방식은 '파스타 네 개를 동시에 만드는 것'을 프론트에게 떠넘기는 작전이었다. (아무래도 내가 프론트와 백 모두 하고 있었기에 가능한 작전이 아니었을까...)

장점

  • FastAPI에서 StreamingResponse로 진행 상황을 전송하는 코드를 GPT에게서 얻을 수 있었다. (구현이 쉬웠음)

단점

  • 프론트엔드에서 동시성 처리를 하는 것이 일반적이지 않다.
  • 프론트엔드 작업을 Streamlit로 해서 Python을 사용했기에 동시성 처리가 가능했지, 아마 React에서 TS를 썼다면 불가능 했을지도...

기존 프론트엔드

  1. Thread를 이용해 동시에 4개의 API에 요청
  2. 각 API의 진행 상황을 Thread ID를 추가해 Queue에 Put
  3. Queue에서 Get하면서 각 파스타의 진행 상황을 프론트엔드에 표시
  • 이때 중요한 것은 각 파스타 요리의 성공/실패 여부를 확인해서, API 응답이 종료되었는지를 감지해야한다.

기존 백엔드 코드 일부

import os
from dotenv import load_dotenv
from fastapi import APIRouter
from fastapi.responses import StreamingResponse

import json
import httpx
from urllib.parse import urlencode

load_dotenv()
public_api_endpoint = os.getenv("PUBLIC_API_ENDPOINT") 
# 배포 시를 고려해 .env를 이용한 전역 변수 사용

router = APIRouter()

async def pasta_cooking(***):
    try:
        #### 재료 준비 ####
        query = {***}
        query_string = urlencode(query)
        
        url = f"{public_api_endpoint}/material/?{query_string}"

        async with httpx.AsyncClient() as client:
            response = await client.get(url) # 재료 준비 API 호출

        if response.status_code != 200: # 응답 오류
            raise Exception(response.json())

        result = response.json()["data"]

        yield json.dumps({
            "step": "재료 준비",
            "result": result
        })+ "\n" # 진행 상황 전송, '\n'으로 전송이 끝났음을 표시
        
        
        #### 물 끓이기 ####
        url = f"{public_api_endpoint}/water_boil/"

        async with httpx.AsyncClient() as client:
            response = await client.get(url) # 물 끓이기 API 호출

        if response.status_code != 200: # 응답 오류
            raise Exception(response.json())

        result = response.json()["data"]

        yield json.dumps({
            "step": "물 끓이기",
            "result": result
        })+ "\n" # 진행 상황 전송, '\n'으로 전송이 끝났음을 표시


        #### 이후 내용 중략... ####
        

    except Exception as e:
        yield json.dumps({
            "step": "파스타 요리 실패",
            "result": str(e),
        })+ "\n"
        

@router.get(
    "/pasta/",
    summary="***",
    description="***",
    tags=["Pasta"],
    responses={
        200: {
            "description": "***",
            "content": {
                "application/json": {
                    "example": {
                        "step": "",
                        "result": {
                            "": ""
                        },
                    }
                }
            },
        },
        422: { # 실제로 422 에러를 날리지는 않지만, 요리 실패 시를 명시하기 위해
            "description": "Fail to cooking a pasta",
            "content": {
                "application/json": {
                    "example": {
                        "step": "파스타 요리 실패",
                        "result": "A error message",
                    }
                }
            },
        },
    },
)
async def pasta(***):
    return StreamingResponse(
        pasta_cooking(***), 
        media_type="text/event-stream"
    )

🤯 새로운 처리 방식

새로운 처리 방식을 들여 온 이유는 단지 배포한 Streamlit 웹앱에서 문제가 생겼기 때문이다. (근데 문제 원인은 프론트에 작업을 떠넘긴 것이 아니라, '파스타 요리 실패' 응답을 보내는 로직이 모종의 이유로 작동하지 않았기 때문이었다...)


장점

  • 백엔드에서 동시성 처리를 해서 프론트엔드로 보내는 것이 일반적인 방식이다.

단점

  • FastAPI에서 StreamingResponse로 진행 상황을 전송하는 함수를 동시성 처리하는 코드의 예시를 찾을 수 없었다. (구현이 어려웠음)

새로운 프론트엔드 코드 일부

import os
import json
import requests
import sseclient
from dotenv import load_dotenv
from urllib.parse import urlencode

load_dotenv()
public_api_endpoint = os.getenv("PUBLIC_API_ENDPOINT")

query = {***}
query_string = urlencode(query)

url = (f"{public_api_endpoint}/pasta/?{query_string}&pasta_num=4"
with requests.get(url, headers={"Accept": "text/event-stream"}, stream=True) as response:
    client = sseclient.SSEClient(response)
    for event in client.events():
        if event.event == "end":
            break # API 응답 종료
        if event.data:
            client_data = json.loads(event.data)
            ## 내용 생략 ##

새로운 백엔드 코드 일부

방식 자체는 '기존 프론트엔드'처럼 동시에 함수를 실행하되, 각각의 진행 상황을 ID를 추가해 Queue에 저장하고 빼내는 방식으로 유사함

import os
from dotenv import load_dotenv
from fastapi import APIRouter
from fastapi.responses import StreamingResponse

import json
import httpx
import asyncio
from typing import AsyncGenerator
from urllib.parse import urlencode

load_dotenv()
public_api_endpoint = os.getenv("PUBLIC_API_ENDPOINT") 
# 배포 시를 고려해 .env를 이용한 전역 변수 사용

router = APIRouter()

async def pasta_cooking(pasta_id: int, queue: asyncio.Queue, ***):
    try:
        #### 재료 준비 ####
        query = {***}
        query_string = urlencode(query)
        
        url = f"{public_api_endpoint}/material/?{query_string}"

        async with httpx.AsyncClient() as client:
            response = await client.get(url) # 재료 준비 API 호출

        if response.status_code != 200: # 응답 오류
            raise Exception(response.json())

        result = response.json()["data"]

        await queue.put({
        	"id": pasta_id,
            "step": "재료 준비",
            "result": result
        })
        
        
        #### 물 끓이기 ####
        url = f"{public_api_endpoint}/water_boil/"

        async with httpx.AsyncClient() as client:
            response = await client.get(url) # 물 끓이기 API 호출

        if response.status_code != 200: # 응답 오류
            raise Exception(response.json())

        result = response.json()["data"]

        await queue.put({
            "id": pasta_id,
            "step": "물 끓이기",
            "result": result
        })


        #### 이후 내용 중략... ####
        

    except Exception as e:
        await queue.put({
            "id": pasta_id,
            "step": "파스타 요리 실패",
            "result": str(e),
        })


async def n_pasta_cooking(
    queue: asyncio.Queue, pasta_num: int
) -> AsyncGenerator[str, None]:
    pasta_finish = 0 # 실패/완성된 파스타 수
    while pasta_finish < pasta_num:
        data = await queue.get()
        if data["step"] == "파스타 요리 실패" or data["step"] == "파스타 요리 완성":
            pasta_finish += 1
        yield f"data: {json.dumps(data)}\n\n"

    yield "event: end\ndata: Stream closed\n\n" # 파스타 요리 종료


@router.get(
    "/pasta/",
    summary="***",
    description="***",
    tags=["Pasta"],
    responses={
        200: {
            "description": "***",
            "content": {
                "application/json": {
                    "example": {
                    	"id": 0,
                        "step": "",
                        "result": {
                            "": ""
                        },
                    }
                }
            },
        },
        422: { # 실제로 422 에러를 날리지는 않지만, 요리 실패 시를 명시하기 위해
            "description": "Fail to cooking a pasta",
            "content": {
                "application/json": {
                    "example": {
                    	"id": 0,
                        "step": "파스타 요리 실패",
                        "result": "A error message",
                    }
                }
            },
        },
    },
)
async def pasta(***, pasta_num: int = 1):
    queue: asyncio.Queue = asyncio.Queue()

    pasta_set = []
    for i in range(pasta_num):
        pasta_id = i
        pasta = asyncio.create_task(pasta_cooking(pasta_id, queue, ***))
        pasta_set.append(pasta)

    return StreamingResponse(
        n_pasta_cooking(queue, pasta_num), media_type="text/event-stream"
    )

분명 공식 문서에서의 설명이나 이런 시도를 했던 누군가를 찾을 수 있지 않을까 싶었지만 (GPT 마저도...), 결국 찾지 못해서 구현에 시간이 오래 걸렸다. 구현 자체는 프론트엔드에서의 동시성처리가 백엔드로 넘어간 것뿐이라서 고생에 비해 크게 달라진 것이 없는게 아쉬울 뿐이다.

profile
SKKU_CSE_23

0개의 댓글