본 글은 Python의 동시성 개념과 이를 활용한 Streamlit에서의 멀티스레딩 구현에 대해 다룹니다. 동시성 개념, ThreadPoolExecutor의 사용법 등, Streamlit, Lanchain을 기본적으로 이해하고 있다는 전제로 작성되었습니다. 개념이 생소하다면 Python 동시성 개념 정리, Streamlit, Langchain을 먼저 참고하시는 것을 권장합니다.
같은 프롬프트에 대해 여러 LLM의 결과를 손쉽게 비교하고 싶었습니다. LLM API 호출은 Langhchain을 활용하여, 결과 비교는 GPT, Claude 등의 chat-UI와 유사한 UI를 손쉽게 만들어주는 Streamlit을 활용하기로 했습니다. 또한, 각 API를 순차적으로 처리하여 화면에 보여주는 것이 아니라 동시에 여러 API 호출하고 그 결과가 화면에 즉시 보이게 하고 싶었습니다. 그러나 아쉽게도 아쉽게도 Streamlit에서는 multi-write-stream을 제공하지 않습니다. 또한 Langchain에서도 batch stream 기능을 제공할 계획이 없다고 합니다. (GitHub 이슈 참고)
이를 해결하기 위해 Python의 동시성 처리 방법을 살펴보고, 이를 활용하여 문제를 해결해 보고자 합니다.
동시성은 여러 작업을 동시에 처리하는 것처럼 보이게 하는 기법입니다. Python은 GIL(Global Interpreter Lock) 때문에 스레드를 통한 진정한 병렬 처리는 어렵지만, I/O 바운드 작업에서는 동시성을 통해 효율적인 처리가 가능합니다. Python은 multi-threading과 asyncio 등을 통해 동시성을 구현할 수 있으며, 본 글에서는 multi-threading을 중심으로 설명합니다.
우선, multi-threading의 기본 개념을 이해하기 위해 LangChain의 배치 작업 예시를 살펴보겠습니다. Langchain에서는 멀티스레딩을 사용해 여러 작업을 병렬로 처리합니다. 이 작업은 아래와 같이 ThreadPoolExecutor
를 통해 이루어집니다.
with get_executor_for_config(configs[0]) as executor:
# get_executor_for_config는 ThreadPoolExecutor을 상속 받아 구현한 객체입니다.
return cast(List[Output], list(executor.map(invoke, inputs, configs)))
이 코드는 다소 복잡해 보일 수 있지만, 간단히 submit
으로 바꿔보면 더 쉽게 이해할 수 있습니다.
with get_executor_for_config(configs[0]) as executor:
futures: List[Future] = [
executor.submit(invoke, input, config)
for input, config in zip(inputs, configs)
]
results: List[Output] = []
for future in futures:
results.append(future.result())
return cast(List[Output], results)
submit
함수는 스레드에 작업을 던져주고 현재 스레드는 결과를 기다리지 않고 다음 라인 코드로 넘어갑니다. 그렇다면 우리는 어떻게 해당 스레드의 작업 상태와 결과를 확인할 수 있을까요? 바로 submit
이 반환하는 Future 객체를 통해 가능합니다.
Future 객체는 스레드의 상태, 결과 등을 기억하고 있는 객체입니다.
class Future(object):
def __init__(self):
self._condition = threading.Condition()
self._state = PENDING
self._result = None
self._exception = None
self._waiters = []
self._done_callbacks = []
이를 통해 비동기 작업의 상태, 완료 여부, 결과 등을 확인할 수 있으며, 콜백 등록도 가능합니다.
정리하자면, ThreadPoolExecutor는 작업마다 Future 객체를 생성해 _worker
함수를 통해 비동기로 작업을 처리해 여러 작업을 동시에 효율적으로 처리할 수 있습니다. (자세한 사항은 Python의 ThreadPoolExecutor 코드를 참고하세요. 또한 python thread는 C 언어 네이티브 스레드 라이브러리를 통해 생성이 됩니다. 또한 Thread를 직접 생성할 경우 Future객체는 생성되지 않습니다.)
이제 이 지식을 활용하여 Streamlit에서 여러 LLM을 동시에 호출하고 결과를 실시간으로 스트리밍해 보겠습니다. Thread를 직접 생성하고 관리하는 것은 복잡할 수 있으므로, Python의 ThreadPoolExecutor
를 사용하였습니다. 그렇다면 아래 코드를 실행하면 여러 LLM 호출과 결과가 화면에 stream으로 표시되지 않을까요?
def v1(llm : BaseChatModel, message:List[BaseMessage]):
with st.chat_message("assistant"):
response = st.write_stream(llm.stream(message))
return response
with ThreadPoolExecutor(max_workers=2) as executor:
results = [executor.submit(tempt, llm, message) for llm, message in zip(llms, messages)]
하지만 위 코드를 실행하면 'ThreadPoolExecutor-2_0': missing ScriptRunContext
와 같은 경고와 화면에 아무 것도 출력되지 않는 것을 알 수 있습니다. 왜 그럴까요?
ScriptRunContext는 Streamlit에서는 화면의 어느 부분과 어느 위치에 출력해야 하는지를 지정해 주는 정보를 가지고 있는 객체입니다. 아래 코드를 보면 각 스레드는 자신만의 ScriptRunContext를 가지고 있어, 자신이 어느 화면, 어떤 위치에 데이터를 출력해야 하는지 알고 있습니다.
def get_script_run_ctx(suppress_warning: bool = False) -> ScriptRunContext | None:
"""
Parameters
----------
suppress_warning : bool
If True, don't log a warning if there's no ScriptRunContext.
Returns
-------
ScriptRunContext | None
The current thread's ScriptRunContext, or None if it doesn't have one.
"""
thread = threading.current_thread()
ctx: ScriptRunContext | None = getattr(thread, SCRIPT_RUN_CONTEXT_ATTR_NAME, None)
v1
코드는 멀티 스레드를 사용하고, 각 스레드는 ScriptRunContext
가 메인 스레드와 다르거나 None
인 것을 추측할 수 있습니다. 그렇기 때문에 각 스레드는 어느 화면의 어떤 위치에 표시해야 하는지를 모르는 상태입니다.
따라서, 여러 스레드가 동시에 화면 정보를 공유하면서 작업을 수행하려면, 각 스레드에 메인 스레드의 ScriptRunContext
객체를 전달해 주어야 합니다. 이를 구현해 봅시다.
def v2(context : ScriptRunContext, llm : BaseChatModel, message:List[BaseMessage]):
add_script_run_ctx(ctx=context)
with st.chat_message("assistant"):
response = st.write_stream(llm.stream(message))
return response
context = get_script_run_ctx()
with ThreadPoolExecutor(max_workers=2) as executor:
results = [executor.submit(tempt, context, llm, message) for llm, message in zip(llms, messages)]
짠! 이렇게 간단하게 멀티스레딩을 활용해 LLM 모델의 호출을 동시에 스트림으로 구현하였습니다. 물론 각 LLM 모델 응답을 저장하고, 다음 호출에 이전 대화 기록을 함께 보내야 하는 작업 등의 다양한 작업이 남아 있습니다.
본 글에서는 Python의 동시성을 활용해 Streamlit에서 여러 LLM을 동시에 호출하고 결과를 실시간으로 스트리밍하는 방법을 살펴보았습니다. 동시성 개념을 잘 이해하고 이를 응용하면 다양한 상황에서 더욱 효율적인 코드를 작성할 수 있습니다. 사실 Streamlit의 write_stream과 LangChain의 stream 기능은 Python의 Generator와 깊은 관련이 있습니다. 다음 글에서는 Generator의 개념을 확장한 Python의 asyncio를 활용한 비동기 프로그래밍에 대해 알아보겠습니다. 또한, 본 글에서 다루지 않았던 동시성을 구현 할 때의 '주의점'도 함께 살펴볼 예정입니다.
긴 글 읽어주셔서 감사합니다 :)