글을 작성하기 전에 미리 말을 하자면, 학교에서 진행한 프로젝트에서 사용한 코드라 야매라는 점을 꼭 말하고 싶다. 실제로 아래처럼 굴린다면 무슨 문제가 생길 지 장담하지 못한다...
우리는 파스타 네 개를 동시에 만들고 API로 각 파스타의 진행 상황을 공유받고 싶었다.
기존 처리 방식은 '파스타 네 개를 동시에 만드는 것'을 프론트에게 떠넘기는 작전이었다. (아무래도 내가 프론트와 백 모두 하고 있었기에 가능한 작전이 아니었을까...)
장점
단점
import os
from dotenv import load_dotenv
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
import json
import httpx
from urllib.parse import urlencode
load_dotenv()
public_api_endpoint = os.getenv("PUBLIC_API_ENDPOINT")
# 배포 시를 고려해 .env를 이용한 전역 변수 사용
router = APIRouter()
async def pasta_cooking(***):
try:
#### 재료 준비 ####
query = {***}
query_string = urlencode(query)
url = f"{public_api_endpoint}/material/?{query_string}"
async with httpx.AsyncClient() as client:
response = await client.get(url) # 재료 준비 API 호출
if response.status_code != 200: # 응답 오류
raise Exception(response.json())
result = response.json()["data"]
yield json.dumps({
"step": "재료 준비",
"result": result
})+ "\n" # 진행 상황 전송, '\n'으로 전송이 끝났음을 표시
#### 물 끓이기 ####
url = f"{public_api_endpoint}/water_boil/"
async with httpx.AsyncClient() as client:
response = await client.get(url) # 물 끓이기 API 호출
if response.status_code != 200: # 응답 오류
raise Exception(response.json())
result = response.json()["data"]
yield json.dumps({
"step": "물 끓이기",
"result": result
})+ "\n" # 진행 상황 전송, '\n'으로 전송이 끝났음을 표시
#### 이후 내용 중략... ####
except Exception as e:
yield json.dumps({
"step": "파스타 요리 실패",
"result": str(e),
})+ "\n"
@router.get(
"/pasta/",
summary="***",
description="***",
tags=["Pasta"],
responses={
200: {
"description": "***",
"content": {
"application/json": {
"example": {
"step": "",
"result": {
"": ""
},
}
}
},
},
422: { # 실제로 422 에러를 날리지는 않지만, 요리 실패 시를 명시하기 위해
"description": "Fail to cooking a pasta",
"content": {
"application/json": {
"example": {
"step": "파스타 요리 실패",
"result": "A error message",
}
}
},
},
},
)
async def pasta(***):
return StreamingResponse(
pasta_cooking(***),
media_type="text/event-stream"
)
새로운 처리 방식을 들여 온 이유는 단지 배포한 Streamlit 웹앱에서 문제가 생겼기 때문이다. (근데 문제 원인은 프론트에 작업을 떠넘긴 것이 아니라, '파스타 요리 실패' 응답을 보내는 로직이 모종의 이유로 작동하지 않았기 때문이었다...)
장점
단점
import os
import json
import requests
import sseclient
from dotenv import load_dotenv
from urllib.parse import urlencode
load_dotenv()
public_api_endpoint = os.getenv("PUBLIC_API_ENDPOINT")
query = {***}
query_string = urlencode(query)
url = (f"{public_api_endpoint}/pasta/?{query_string}&pasta_num=4"
with requests.get(url, headers={"Accept": "text/event-stream"}, stream=True) as response:
client = sseclient.SSEClient(response)
for event in client.events():
if event.event == "end":
break # API 응답 종료
if event.data:
client_data = json.loads(event.data)
## 내용 생략 ##
방식 자체는 '기존 프론트엔드'처럼 동시에 함수를 실행하되, 각각의 진행 상황을 ID를 추가해 Queue에 저장하고 빼내는 방식으로 유사함
import os
from dotenv import load_dotenv
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
import json
import httpx
import asyncio
from typing import AsyncGenerator
from urllib.parse import urlencode
load_dotenv()
public_api_endpoint = os.getenv("PUBLIC_API_ENDPOINT")
# 배포 시를 고려해 .env를 이용한 전역 변수 사용
router = APIRouter()
async def pasta_cooking(pasta_id: int, queue: asyncio.Queue, ***):
try:
#### 재료 준비 ####
query = {***}
query_string = urlencode(query)
url = f"{public_api_endpoint}/material/?{query_string}"
async with httpx.AsyncClient() as client:
response = await client.get(url) # 재료 준비 API 호출
if response.status_code != 200: # 응답 오류
raise Exception(response.json())
result = response.json()["data"]
await queue.put({
"id": pasta_id,
"step": "재료 준비",
"result": result
})
#### 물 끓이기 ####
url = f"{public_api_endpoint}/water_boil/"
async with httpx.AsyncClient() as client:
response = await client.get(url) # 물 끓이기 API 호출
if response.status_code != 200: # 응답 오류
raise Exception(response.json())
result = response.json()["data"]
await queue.put({
"id": pasta_id,
"step": "물 끓이기",
"result": result
})
#### 이후 내용 중략... ####
except Exception as e:
await queue.put({
"id": pasta_id,
"step": "파스타 요리 실패",
"result": str(e),
})
async def n_pasta_cooking(
queue: asyncio.Queue, pasta_num: int
) -> AsyncGenerator[str, None]:
pasta_finish = 0 # 실패/완성된 파스타 수
while pasta_finish < pasta_num:
data = await queue.get()
if data["step"] == "파스타 요리 실패" or data["step"] == "파스타 요리 완성":
pasta_finish += 1
yield f"data: {json.dumps(data)}\n\n"
yield "event: end\ndata: Stream closed\n\n" # 파스타 요리 종료
@router.get(
"/pasta/",
summary="***",
description="***",
tags=["Pasta"],
responses={
200: {
"description": "***",
"content": {
"application/json": {
"example": {
"id": 0,
"step": "",
"result": {
"": ""
},
}
}
},
},
422: { # 실제로 422 에러를 날리지는 않지만, 요리 실패 시를 명시하기 위해
"description": "Fail to cooking a pasta",
"content": {
"application/json": {
"example": {
"id": 0,
"step": "파스타 요리 실패",
"result": "A error message",
}
}
},
},
},
)
async def pasta(***, pasta_num: int = 1):
queue: asyncio.Queue = asyncio.Queue()
pasta_set = []
for i in range(pasta_num):
pasta_id = i
pasta = asyncio.create_task(pasta_cooking(pasta_id, queue, ***))
pasta_set.append(pasta)
return StreamingResponse(
n_pasta_cooking(queue, pasta_num), media_type="text/event-stream"
)
분명 공식 문서에서의 설명이나 이런 시도를 했던 누군가를 찾을 수 있지 않을까 싶었지만 (GPT 마저도...), 결국 찾지 못해서 구현에 시간이 오래 걸렸다. 구현 자체는 프론트엔드에서의 동시성처리가 백엔드로 넘어간 것뿐이라서 고생에 비해 크게 달라진 것이 없는게 아쉬울 뿐이다.