[FastApi]: langchain + llm에 StreamingResponse 사용하기

홍석·2023년 12월 4일

시간날때 작성..

  • 구글링 자료를 찾아봐도 BaseCallBackHandler에서 전역Queue를 사용하거나, 다른 이상한 방법들로 알려주는 내용밖에없었고, 이를 StreamingResponse로 변환하는 방법을 찾기 어려웠다.
  • 해결방법은 asyncio에서 create_task로 이벤트를 만들고 AsyncIteratorCallbackHandler()를 사용하여 .aiter()에서 token을 yield하는 방법을 사용하였다.
  • 해결방법은 구글링을 통해 얻은 정보를 조합하였다.
  • 전역변수로 qChain을 설정한것을, 함수내부로 이동하여 쓰레드, 코루틴 에러를 해결할 수 있었다.

FastApi - api

@router.get("")
async def chatToAI(chat: str):
    stream_it = AsyncIteratorCallbackHandler()
    gen = create_gen(chat, stream_it)

    return StreamingResponse(gen, media_type="text/event-stream")

LangChain

async def run_call(query: str, stream_it: AsyncIteratorCallbackHandler):
    llm = ChatOpenAI(temperature=0.5, model_name="gpt-3.5-turbo-16k",
                     openai_api_key=gpt_api_key, streaming=True,
                     callbacks=[stream_it],
                     )
    # Question & Answer chain 구성
    qChain = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever = vectorstore.as_retriever(search_kwargs={'k': 3}),
        return_source_documents=False,
    )

    response = await qChain.acall(query)
    return response


async def create_gen(text: str, stream_it: AsyncIteratorCallbackHandler):
    task = asyncio.create_task(run_call(text, stream_it))
    async for token in stream_it.aiter():
        yield token
    await task
profile
bayy1216.tistory.com <- 블로그 이전했습니다 🥹

0개의 댓글