현재 진행중인 프로젝트는 FastAPI로 Lanchain을 사용하여 LLM서비스를 기획하는 것이다. 대화 저장용 DB는 MongDB였는데 기존 langchain의 저장 방법 변경부터 계속해서 MongoDB에 대화가 저장될 때 추가적인 도큐먼트 저장까지의 과정이 담겨있다. 가장 큰 건 새로운 도큐먼트 생성을 방지한 것이다.
일단 우리의 MongoDB 설계에는 기존 MongoDBChatMessageHistory에 있는 것보다 더 많은 것을 저장해야 했다.
기존 MongoDBChatMessageHistory는
def __init__(
self,
connection_string: Optional[str],
session_id: str,
database_name: str = DEFAULT_DBNAME,
collection_name: str = DEFAULT_COLLECTION_NAME,
*,
session_id_key: str = DEFAULT_SESSION_ID_KEY,
history_key: str = DEFAULT_HISTORY_KEY,
create_index: bool = True,
history_size: Optional[int] = None,
index_kwargs: Optional[Dict] = None,
client: Optional[MongoClient] = None,
해당 정보들이 저장된다. 사실상 session_id와 history만 저장된다.
{
"session_id": "1-0b585eb7f37841228d0f827e8f745414",
"history": "[{\"type\": \"human\", \"data\": {\"content\": \"첫번째 메시지\", ...}}]"
}
이런식으로 저장된다고 보면 된다. session_id는 우리가 추가적으로 생성할 것이지만 history는 어떻게 쌓이는 것인가?
이는 Langchain의 기본 구조에서 온다고 한다.
langchain_core.messages.py파일에 message_to_dict라는 함수라고 한다. 자세히는 잘 모르겠다..
어쨋든 이런식으로 저장이 되는데 우리는 created_time, category 등의 정보들이 추가로 필요하기에 상속받아서 해결했다.
기존의 사용방식
chain_with_history = RunnableWithMessageHistory(
chain,
lambda session_id: CustomMongoDBChatMessageHistory(
session_id=session_id,
connection_string=settings.MONGODB_URL,
database_name=settings.MONGODB_DB_NAME,
collection_name=settings.MONGODB_COLLECTION,
),
input_messages_key="input",
history_messages_key="history",
)
해당 방식대로 하면 오직 대화내용만이 저장된다. 따라서
커스텀한 몽고디비
class CustomMongoDBChatMessageHistory(MongoDBChatMessageHistory):
def __init__(
self,
session_id: str,
connection_string: str,
database_name: str,
collection_name: str,
category: Optional[str] = None,
chat_title: Optional[str] = None,
after_keyword: Optional[List[str]] = None,
before_keyword: Optional[List[str]] = None,
report: Optional[dict] = None,
user_id: Optional[str] = None
):
super().__init__(
session_id=session_id,
connection_string=connection_string,
database_name=database_name,
collection_name=collection_name,
session_id_key="session_id",
history_key="history"
)
self.category = category
self.chat_title = chat_title
self.after_keyword = after_keyword or []
self.before_keyword = before_keyword or []
self.report = report or {}
self.user_id = user_id
이런식으로 나에게 필요한 부분들을 추가로 저장하도록 변경하였다. 유저아이디, 채팅방제목, 키워드, 리포트, 생성 시간 등이 포함되었다.
첫 번째 문제: KeyError: 'History'
해당 에러가 뜨는 이유는 대소문자 구별이다(...) history_key="history"같은 것을 잘 정리해야한다.
두 번째 문제: TypeError: the JSON object must be str, bytes or bytearray, not list
나의 경우 세션을 생성하는 함수를 따로 만들었는데
def create_session(self) -> None:
"""세션 Document 생성"""
if self._session_exists():
# 세션이 이미 있으면 만들지 않음
return
session_info = {
"session_id": self.session_id,
"user_id": self.user_id,
"category": self.category,
"chat_title": self.chat_title,
"after_keyword": self.after_keyword,
"before_keyword": self.before_keyword,
"report": self.report,
"created_at": datetime.now(),
"history": "[]"
}
self.collection.insert_one(session_info)
-> 어떻게 고쳤는지 기억이 안난다..
업데이트할 때
self.collection.update_one(
{"session_id": self.session_id},
{"$set": {"history": json.dumps(current_messages)}} # 리스트를 JSON 문자열로 변환
)messages_data = json.loads(doc["history"]) # JSON 문자열을 리스트로 변환
이런식으로 수정했던 것 같다..
가장 큰 문제였다. 사실상 위의 문제들보다 이 문제들을 고치면서 위의 문제들이 발생하게 된다.
MongoDBChatMessageHistory의 소스코드 자체에 들어가보면
def add_message(self, message: BaseMessage) -> None:
"""Append the message to the record in MongoDB"""
try:
self.collection.insert_one(
{
self.session_id_key: self.session_id,
self.history_key: json.dumps(message_to_dict(message)),
}
)
except errors.WriteError as err:
logger.error(err)
코드를 볼 수 있다. 즉 새로운 메세지가 주고 받을 때마다 이 함수가 실행되는데 insert_one이 있다. 새로운 도큐먼트를 생성해서 넣는다는 것이다. 나의 코드가 문제가 아니라 프레임워크 자체가 이렇게 구현되어 있던 것이다. 따라서 두가지의 메서드를 직접 오버라이드했다.
def add_message(self, message: BaseMessage) -> None:
"""메시지 추가"""
if not self._session_exists():
self.create_session()
# 현재 저장된 메시지 가져오기
doc = self.collection.find_one({"session_id": self.session_id})
current_messages = json.loads(doc["history"]) if doc.get("history") else []
# 새 메시지를 딕셔너리로 변환하여 추가
message_dict = message_to_dict(message)
current_messages.append(message_dict)
# 업데이트
self.collection.update_one(
{"session_id": self.session_id},
{"$set": {"history": json.dumps(current_messages)}}
)
@property
def messages(self) -> List[BaseMessage]:
"""메시지 목록 조회"""
doc = self.collection.find_one({"session_id": self.session_id})
if doc is None or not doc.get("history"):
return []
try:
messages_data = json.loads(doc["history"])
if not isinstance(messages_data, list):
return []
return messages_from_dict(messages_data)
except (json.JSONDecodeError, TypeError):
return []