LangChain 기반 챗봇에 동시 접속 하기(feat. session_id)

JeongYun Lee·2025년 6월 17일

Chatbot

목록 보기
4/5
post-thumbnail

LangGraph를 활용하여 RAG 기반 챗봇을 구축하였고, 도커라이징 후 디플로이를 진행했는데, 테스트 과정에서 예상치 못한 오류를 발견했다. 바로 서로 다른 다수의 사용자들이 동시에 접속해서 쿼리를 날렸을 때 이 기록들이 섞여서 질문하지 않은 내용의 답변이 도출되는 것이다. 이 문제는 multi-turn 형식을 지원하기 위해 이전 히스토리를 남기는 부분에서 비롯하는데, 여러명이 동시 접속하면 이 히스토리가 어떤 접속에서 비롯됐는지 구분되지 않아서 발생하는 문제이다. 개발 과정에서는 혼자 테스트 했기 때문에 이 문제를 전혀 고려하지 못했다...

처음에 이를 구현하기 위해 로그인 방식을 도입해야 하나..? 고민했지만, session_id로 간단하게 구현할 수 있다는 것을 알게 되었다. 사실 기존에서 chat_history 때문에 session_id를 주긴 했지만, 아무값이나 넣고 돌려쓰는 방식으로 테스트 했고, 큰 문제가 없다고 생각했다. 그러나 위에서 설명한 문제를 해결하기 위해서는 반드시 개별 채팅마다 새로운 아이디가 부여 되어야 하고, reset을 하거나 새로고침 하기 전까지는 동일한 아이디가 유지되어야 한다. 그리고 reset과 새로고침을 한 뒤에는 새로운 session_id가 부여되고, chat_history와 graphstate는 초기화해주어야 한다.

Backend

파이썬과 Fastapi를 기준으로 한다.

class GraphState(TypedDict):
    question: str # 질문
    q_type: str  # 질문의 유형
    context: list | str  # 문서의 검색 결과
    answer: str | list[str]   # llm이 생성한 답변
    relevance: str  # 답변의 문서에 대한 관련성 (groundness check)
    session_id: str  # 세션 ID 추가

# 전역 변수들
store = {}

# 세션 ID를 기반으로 세션 기록을 가져오는 함수
def get_session_history(session_ids):
    if session_ids not in store:
        store[session_ids] = ChatMessageHistory()
    else:
        pass
    return store[session_ids]

# 새로운 세션 ID 생성 함수
def generate_session_id():
    return str(uuid.uuid4())

session_id를 전부 기록해두고 싶다면 Redis와 같은 별도의 데이터베이스를 사용해줘야 하지만, 현재는 로그인 기능을 구현하고 싶은게 아니기 때문에 그냥 전역 변수 안에 아이디와 히슽토리를 저장하고, 이 변수가 비워지는 방식으로 구현하였다.

상단에서 LangGraph에서 사용되는 GraphState를 정의하고, 저장할 store라는 변수를 만든다. 그리고generate_session_id라는 함수에서는 uuid를 사용해서 임의의 아이드를 생성한다.

그러나 이렇게 구현한 경우, 이제 응답에는 문제 없이 잘 구현되었으나, 동시에 쿼리를 하면 store에 저장하는 부분의 로직이 충돌이 발생해 일시적인 에러가 발생했다. 따라서 threading을 사용해서 해당 부분들이 병렬적으로 실행될 수 있도록 수정해주었다.

import threading
from collections import defaultdict

class ThreadSafeStore:
    def __init__(self):
        self._store = {}
        self._lock = threading.RLock()  # 재진입 가능한 락
   
   def get_session_history(self, session_id: str):
        with self._lock:
            if session_id not in self._store:
                self._store[session_id] = ChatMessageHistory()
            return self._store[session_id]
  
   def clear_session(self, session_id: str = None):
        with self._lock:
            if session_id:
                if session_id in self._store:
                    message_count = len(self._store[session_id].messages)
                    del self._store[session_id]
                    return message_count
                return 0
            else:
                total_sessions = len(self._store)
                total_messages = sum(len(history.messages) for history in self._store.values())
                self._store.clear()
                return total_sessions, total_messages
   
   def get_stats(self):
        with self._lock:
            return {
                'total_sessions': len(self._store),
                'total_messages': sum(len(history.messages) for history in self._store.values())
            }

# 전역 스레드 안전 저장소
thread_safe_store = ThreadSafeStore()

# 세션 ID를 기반으로 세션 기록을 가져오는 함수
def get_session_history(session_ids):
    return thread_safe_store.get_session_history(session_ids)
@app.post("/api/")
async def stream_responses(request: Request):
    try:
        data = await request.json()
        message = data.get('message')
        client_session_id = data.get('session_id')
        
        if not message:
            raise HTTPException(status_code=400, detail="Message is required")

        # 🔧 핵심 수정: session_id가 없을 때만 새로 생성
        if not client_session_id:
            client_session_id = generate_session_id()
        else:
            pass

        config = RunnableConfig(
            recursion_limit=15, 
            configurable={
                "thread_id": "HIKE-JUSOCHATBOT-DEMO", 
                "user_id": current_user_id, 
                "session_id": client_session_id
            }
        )

        inputs = GraphState(
            question=message,
            session_id=client_session_id,
            q_type='',
            context='',
            answer='',
            relevance='',
        )

        try:
            final_state = graph.invoke(inputs, stream_mode="values", config=config)
            answer_text = final_state["answer"]
            
            # 응답에 현재 세션의 메시지 수 포함 (디버깅용)
            current_history = get_session_history(client_session_id)
            message_count = len(current_history.messages)
            
            return {
                "answer": answer_text,
                "session_id": client_session_id,  # 클라이언트가 다음에 사용할 수 있도록 반환
                "message_count": message_count
            }
            
        except GraphRecursionError as e:
            print(f"Recursion limit reached: {e}")
            return {
                "answer": "죄송합니다. 해당 질문에 대해서는 답변할 수 없습니다.",
                "session_id": client_session_id
            }
        except Exception as e:
            print(f"An error occurred: {e}")
            return {
                "answer": "죄송합니다. 처리 중 오류가 발생했습니다.",
                "session_id": client_session_id
            }

    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

이건 챗팅을 위한 api로, 우선 data는 프론트에서 들어온 입력값을 json형태로 받아온다. 이때 받아온 data에서 session_id가 없는 경우, 새로운 아이디를 부여하는 것이다. 정리하면, 이전 대화가 있었다면, 프론트로의 출력값에 session_id가 포함되어 있었을 것이고, 이는 다시 백으로 보내는 request, 즉 data에 넘어와야 하는데 이 session_id가 없다는 것은 새로운 채팅이 시작되었음을 의미한다. 따라서 새로운 아이디를 다시 부여해게 되는 것이다.

@app.post("/api/reset")
async def reset_store(request: Request):
    try:
        data = await request.json()
        session_id_to_reset = data.get('session_id')
        
        if session_id_to_reset:
            # 특정 세션만 초기화
            new_session_id = generate_session_id()
            
            return {
                "status": "Session reset successfully",
                "session_id": new_session_id,
            }
        else:
            # 모든 세션 초기화
            total_sessions, total_messages = thread_safe_store.clear_session()
            new_session_id = generate_session_id()
            
            return {
                "status": "All sessions reset successfully",
                "session_id": new_session_id,
                "cleared_sessions": total_sessions,
                "cleared_messages": total_messages
            }
            
    except Exception as e:
        print(f"❌ 리셋 오류: {e}")
        # 오류 발생시에도 새 세션 ID 반환
        new_session_id = generate_session_id()
        
        return {
            "status": "Sessions reset due to error",
            "session_id": new_session_id,
            "error": str(e)
        }

이제 reset 버튼을 누르면 부여된 session_id는 초기화되어야 한다. stream_responses와 마찬가지로 request로 받아온 data에 있는 session_id를 del을 사용해서 지워주는 간단한 로직이다.

Frontend

프론트는 Nuxt 프레임워크를 사용하였다.

const clearSession = () => {
    sessionId.value = null;
    messagesAndAnswers.value = [];
    message.value = '';
};

onMounted(() => {
    clearSession();
});

프론트에서도 reset 버튼을 활성화하기 위한 로직을 구현하고(이 부분은 생략하겠다. 간단한 api 통신 부분) reset을 누를 때마다, 그리고 onMount()가 연결될때마다(새로고침할 때마다) clearSession을 활용해서 session_id와 이전에 있었던 메세지들을 삭제해주면 된다.

.
.
.

생각해보면 간단한데, 처음엔 너무 당황스러웠다!
아 추가로, verifier나 router와 같은 모듈을 중간중간 사용한다면, 해당 모듈의 invoke 부분에도 config로 session를 반드시 주어야 한다. 그래야 해당 질문 쿼리의 기록들을 균일하게 받아오기 때문에!

profile
궁금한 건 많지만, 천천히 알아가는 중입니다

0개의 댓글