FastAPI에서 진행 상황을 전송하기 (근데, 동시성처리를 곁들인...)

반야·2024년 10월 5일
0

글을 작성하기 전에 미리 말을 하자면, 학교에서 진행한 프로젝트에서 사용한 코드라 야매라는 점을 꼭 말하고 싶다. 실제로 아래처럼 굴린다면 무슨 문제가 생길 지 장담하지 못한다...



🫠 FastAPI에서 진행 상황을 전송하려는 이유

  1. 현재 프로젝트에서 사용하는 API가 여러 단계의 API를 연속적으로 호출하는 형태임.
    • 예를 들자면, '파스타 요리 API'를 만드는데, 그 API 내부에서 '재료 준비 API', '물 끓이기 API', '면 삶기 API', '소스 만들기 API', ...를 연속적으로 호출하는 상황.

  2. 프론트엔드 측에서 현재 진행 상황을 공유받고 싶어함.
    • 즉, 내 파스타가 어느 단계까지 만들어 졌는지, 그 단계까지 진행한 결과는 어떻게 되는지를 표시하고자 함. (현재, 재료 준비를 완료했고, 무슨무슨 재료를 준비했다 등)


😵 문제 상황

우리는 파스타 네 개를 동시에 만들고 API로 각 파스타의 진행 상황을 공유받고 싶었다.


🤪 기존 처리 방식

기존 처리 방식은 '파스타 네 개를 동시에 만드는 것'을 프론트에게 떠넘기는 작전이었다. (아무래도 내가 프론트와 백 모두 하고 있었기에 가능한 작전이 아니었을까...)

장점

  • FastAPI에서 StreamingResponse로 진행 상황을 전송하는 코드를 GPT에게서 얻을 수 있었다. (구현이 쉬웠음)

단점

  • 프론트엔드에서 동시성 처리를 하는 것이 일반적이지 않다.
  • 프론트엔드 작업을 Streamlit로 해서 Python을 사용했기에 동시성 처리가 가능했지, 아마 React에서 TS를 썼다면 불가능 했을지도...

기존 프론트엔드

  1. Thread를 이용해 동시에 4개의 API에 요청
  2. 각 API의 진행 상황을 Thread ID를 추가해 Queue에 Put
  3. Queue에서 Get하면서 각 파스타의 진행 상황을 프론트엔드에 표시
  • 이때 중요한 것은 각 파스타 요리의 성공/실패 여부를 확인해서, API 응답이 종료되었는지를 감지해야한다.

기존 백엔드 코드 일부

import os
from dotenv import load_dotenv
from fastapi import APIRouter
from fastapi.responses import StreamingResponse

import json
import httpx
from urllib.parse import urlencode

load_dotenv()
public_api_endpoint = os.getenv("PUBLIC_API_ENDPOINT") 
# 배포 시를 고려해 .env를 이용한 전역 변수 사용

router = APIRouter()

async def pasta_cooking(***):
    try:
        #### 재료 준비 ####
        query = {***}
        query_string = urlencode(query)
        
        url = f"{public_api_endpoint}/material/?{query_string}"

        async with httpx.AsyncClient() as client:
            response = await client.get(url) # 재료 준비 API 호출

        if response.status_code != 200: # 응답 오류
            raise Exception(response.json())

        result = response.json()["data"]

        yield json.dumps({
            "step": "재료 준비",
            "result": result
        })+ "\n" # 진행 상황 전송, '\n'으로 전송이 끝났음을 표시
        
        
        #### 물 끓이기 ####
        url = f"{public_api_endpoint}/water_boil/"

        async with httpx.AsyncClient() as client:
            response = await client.get(url) # 물 끓이기 API 호출

        if response.status_code != 200: # 응답 오류
            raise Exception(response.json())

        result = response.json()["data"]

        yield json.dumps({
            "step": "물 끓이기",
            "result": result
        })+ "\n" # 진행 상황 전송, '\n'으로 전송이 끝났음을 표시


        #### 이후 내용 중략... ####
        

    except Exception as e:
        yield json.dumps({
            "step": "파스타 요리 실패",
            "result": str(e),
        })+ "\n"
        

@router.get(
    "/pasta/",
    summary="***",
    description="***",
    tags=["Pasta"],
    responses={
        200: {
            "description": "***",
            "content": {
                "application/json": {
                    "example": {
                        "step": "",
                        "result": {
                            "": ""
                        },
                    }
                }
            },
        },
        422: { # 실제로 422 에러를 날리지는 않지만, 요리 실패 시를 명시하기 위해
            "description": "Fail to cooking a pasta",
            "content": {
                "application/json": {
                    "example": {
                        "step": "파스타 요리 실패",
                        "result": "A error message",
                    }
                }
            },
        },
    },
)
async def pasta(***):
    return StreamingResponse(
        pasta_cooking(***), 
        media_type="text/event-stream"
    )

🤯 새로운 처리 방식

새로운 처리 방식을 들여 온 이유는 단지 배포한 Streamlit 웹앱에서 문제가 생겼기 때문이다. (근데 문제 원인은 프론트에 작업을 떠넘긴 것이 아니라, '파스타 요리 실패' 응답을 보내는 로직이 모종의 이유로 작동하지 않았기 때문이었다...)


장점

  • 백엔드에서 동시성 처리를 해서 프론트엔드로 보내는 것이 일반적인 방식이다.

단점

  • FastAPI에서 StreamingResponse로 진행 상황을 전송하는 함수를 동시성 처리하는 코드의 예시를 찾을 수 없었다. (구현이 어려웠음)

새로운 프론트엔드 코드 일부

import os
import json
import requests
import sseclient
from dotenv import load_dotenv
from urllib.parse import urlencode

load_dotenv()
public_api_endpoint = os.getenv("PUBLIC_API_ENDPOINT")

query = {***}
query_string = urlencode(query)

url = (f"{public_api_endpoint}/pasta/?{query_string}&pasta_num=4"
with requests.get(url, headers={"Accept": "text/event-stream"}, stream=True) as response:
    client = sseclient.SSEClient(response)
    for event in client.events():
        if event.event == "end":
            break # API 응답 종료
        if event.data:
            client_data = json.loads(event.data)
            ## 내용 생략 ##

새로운 백엔드 코드 일부

방식 자체는 '기존 프론트엔드'처럼 동시에 함수를 실행하되, 각각의 진행 상황을 ID를 추가해 Queue에 저장하고 빼내는 방식으로 유사함

import os
from dotenv import load_dotenv
from fastapi import APIRouter
from fastapi.responses import StreamingResponse

import json
import httpx
import asyncio
from typing import AsyncGenerator
from urllib.parse import urlencode

load_dotenv()
public_api_endpoint = os.getenv("PUBLIC_API_ENDPOINT") 
# 배포 시를 고려해 .env를 이용한 전역 변수 사용

router = APIRouter()

async def pasta_cooking(pasta_id: int, queue: asyncio.Queue, ***):
    try:
        #### 재료 준비 ####
        query = {***}
        query_string = urlencode(query)
        
        url = f"{public_api_endpoint}/material/?{query_string}"

        async with httpx.AsyncClient() as client:
            response = await client.get(url) # 재료 준비 API 호출

        if response.status_code != 200: # 응답 오류
            raise Exception(response.json())

        result = response.json()["data"]

        await queue.put({
        	"id": pasta_id,
            "step": "재료 준비",
            "result": result
        })
        
        
        #### 물 끓이기 ####
        url = f"{public_api_endpoint}/water_boil/"

        async with httpx.AsyncClient() as client:
            response = await client.get(url) # 물 끓이기 API 호출

        if response.status_code != 200: # 응답 오류
            raise Exception(response.json())

        result = response.json()["data"]

        await queue.put({
            "id": pasta_id,
            "step": "물 끓이기",
            "result": result
        })


        #### 이후 내용 중략... ####
        

    except Exception as e:
        await queue.put({
            "id": pasta_id,
            "step": "파스타 요리 실패",
            "result": str(e),
        })


async def n_pasta_cooking(
    queue: asyncio.Queue, pasta_num: int
) -> AsyncGenerator[str, None]:
    pasta_finish = 0 # 실패/완성된 파스타 수
    while pasta_finish < pasta_num:
        data = await queue.get()
        if data["step"] == "파스타 요리 실패" or data["step"] == "파스타 요리 완성":
            pasta_finish += 1
        yield f"data: {json.dumps(data)}\n\n"

    yield "event: end\ndata: Stream closed\n\n" # 파스타 요리 종료


@router.get(
    "/pasta/",
    summary="***",
    description="***",
    tags=["Pasta"],
    responses={
        200: {
            "description": "***",
            "content": {
                "application/json": {
                    "example": {
                    	"id": 0,
                        "step": "",
                        "result": {
                            "": ""
                        },
                    }
                }
            },
        },
        422: { # 실제로 422 에러를 날리지는 않지만, 요리 실패 시를 명시하기 위해
            "description": "Fail to cooking a pasta",
            "content": {
                "application/json": {
                    "example": {
                    	"id": 0,
                        "step": "파스타 요리 실패",
                        "result": "A error message",
                    }
                }
            },
        },
    },
)
async def pasta(***, pasta_num: int = 1):
    queue: asyncio.Queue = asyncio.Queue()

    pasta_set = []
    for i in range(pasta_num):
        pasta_id = i
        pasta = asyncio.create_task(pasta_cooking(pasta_id, queue, ***))
        pasta_set.append(pasta)

    return StreamingResponse(
        n_pasta_cooking(queue, pasta_num), media_type="text/event-stream"
    )

분명 공식 문서에서의 설명이나 이런 시도를 했던 누군가를 찾을 수 있지 않을까 싶었지만 (GPT 마저도...), 결국 찾지 못해서 구현에 시간이 오래 걸렸다. 구현 자체는 프론트엔드에서의 동시성처리가 백엔드로 넘어간 것뿐이라서 고생에 비해 크게 달라진 것이 없는게 아쉬울 뿐이다.

profile
𝚂𝙺𝙺𝚄 𝙲𝚂𝙴 𝟸𝟹

0개의 댓글