파일을 로드시켜 langchain
을 활용한 RAG 구축을 해 봤다.
오늘은 외부 폴더에서 불러온 다양한 (3개ㅎㅎ) 프롬프트를 활용했을때, RAG를 통해 나오는 출력값들을 파일로 저장해 확인해 보는 실습을 가졌다.
처음 이 실습을 할때
이게 뭐지?, 했다.
예시를 보는데 잘 이해가 안갔는데 .txt
와 Prompts/
를 통해 외부 폴더를 만들고 그 안에 프롬프트를 사용하라는 거구나 라고 겨우 이해할 수 있었다.
또한 출력값들을 파일로 자동저장하는 코드를 구성해야 했다.
(이름도 timestamp가 포함돼야 한다.)
어쨋든 온몸 비틀기 하면서 나온 결과를 살펴보자
import os
from dotenv import load_dotenv
from langchain_community.document_loaders import PyPDFLoader
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain.storage import LocalFileStore
from langchain.embeddings import CacheBackedEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.runnables import RunnableMap
from datetime import datetime
# .env 파일 로드
# 환경 변수에서 API 키 불러오기
api_key = os.getenv("OPENAI_API_KEY")
# 모델 초기화
model = ChatOpenAI(model="gpt-4o-mini")
file_path = "인공지능최신동향.pdf"
# PDF 파일 경로
loader = PyPDFLoader(file_path=file_path)
docs = loader.load()
recursive_text_splitter = RecursiveCharacterTextSplitter(
splits = recursive_text_splitter.split_documents(docs)
# OpenAI 임베딩 모델 초기화
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# 로컬 파일 저장소 설정
store = LocalFileStore("C:\\Users\\1\\Desktop\\emb")
# 캐시를 지원하는 임베딩 생성 - 임베딩시 계속 api 호출을 방지하기 위해 로컬에 임베팅 파일을 저장하는 형식
cached_embedder = CacheBackedEmbeddings.from_bytes_store(
namespace=embeddings.model, # 기본 임베딩과 저장소를 사용하여 캐시 지원 임베딩을 생성
import faiss
from langchain_community.vectorstores import FAISS
vectorstore = FAISS.from_documents(documents=splits, embedding=cached_embedder)
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3}) # 가져올 청크 수를 3으로 늘림
class DebugPassThrough(RunnablePassthrough):
def invoke(self, *args, **kwargs):
output = super().invoke(*args, **kwargs)
print("Debug Output:", output)
return output
# 문서 리스트를 텍스트로 변환하는 단계 추가
class ContextToText(RunnablePassthrough):
def invoke(self, inputs, config=None, **kwargs): # config 인수 추가
# context의 각 문서를 문자열로 결합
context_text = "\n".join([doc.page_content for doc in inputs["context"]])
return {"context": context_text, "question": inputs["question"]}
def load_chat_prompt_template(prompt_path):
프롬프트 경로에서 프롬프트 파일을 읽어 시스템 메시지와 사용자 메시지로 분리하고
이를 사용하여 ChatPromptTemplate을 생성
with open(prompt_path, 'r', encoding='utf-8') as file:
prompt_text = file.read()
# 'system'과 'human'으로 분리
sections = prompt_text.strip().split('human') # 문자열 양쪽에 공백 제거 후, 'human'찾아 섹션을 나누고 리스트로 분리하기
# 정의 부분
system_prompt = ''
user_prompt = ''
if len(sections) == 2: # system과 human으로 분리되어 있을 때
system_prompt = sections[0].replace('system', '').strip()
user_prompt = sections[1].strip()
user_prompt = prompt_text.strip() # 아니면 바로 user_prompt로 할당.
# 메시지 리스트 생성
messages = []
if system_prompt:
messages.append(("system", system_prompt))
if user_prompt:
messages.append(("user", user_prompt))
# ChatPromptTemplate 생성
prompt = ChatPromptTemplate.from_messages(messages)
return prompt
# 체인 생성 함수
def create_rag_chain(prompt_template, retriever, model):
chain = (
"question": DebugPassThrough(),
"context": retriever
| ContextToText()
| prompt_template
| model
return chain
# 타임스탬프 생성
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# 프롬프트 및 응답 폴더 설정
prompt_folder = 'Prompts'
prompt_files = [f for f in os.listdir(prompt_folder) if f.endswith('.txt')]
output_folder = 'responses'
os.makedirs(output_folder, exist_ok=True)
response = chain.invoke(query)
print(type(response.content)) # <class 'dict'>
while True:
query = input("질문을 입력하세요 (종료하려면 빈 줄 입력): ")
if not query.strip():
for prompt_file in prompt_files:
prompt_path = os.path.join(prompt_folder, prompt_file)
prompt_template = load_chat_prompt_template(prompt_path)
chain = create_rag_chain(prompt_template, retriever, model)
print(f"\nUsing prompt from {prompt_file}")
response = chain.invoke(query) # 문자열 요구 chain.invoke({"question": query}) -> chain.invoke(query) 수정
print("Final Response:")
# 응답 저장
output_file = f"{os.path.splitext(prompt_file)[0]}_{timestamp}_result.txt"
output_path = os.path.join(output_folder, output_file)
with open(output_path, 'a', encoding='utf-8') as file:
file.write("\nQuestion: " + query + "\nResponse: " + response.content + "\n")
print(f"Response saved to {output_file}")
당신은 질문-답변(Question-Answering)을 수행하는 친절한 AI 어시스턴트입니다. 당신의 임무는 주어진 문맥(context) 에서 주어진 질문(question) 에 답하는 것입니다.
검색된 다음 문맥(context) 을 사용하여 질문(question) 에 답하세요. 만약, 주어진 문맥(context) 에서 답을 찾을 수 없다면, 답을 모른다면 `주어진 정보에서 질문에 대한 정보를 찾을 수 없습니다` 라고 답하세요.
한글로 답변해 주세요. 단, 기술적인 용어나 이름은 번역하지 않고 그대로 사용해 주세요. Don't narrate the answer, just answer the question. Let's think step-by-step.
You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.
Question: {question}
Context: {context}
You are an expert AI on a question and answer task.
Use the "Following Context" when answering the question. If you don't know the answer, reply to the "Following Text" in the header and answer to the best of your knowledge, or if you do know the answer, answer without the "Following Text". If a question is asked in Korean, translate it to English and always answer in Korean.
Following Text: "주어진 정보에서 답변을 찾지는 못했지만, 제가 아는 선에서 답을 말씀드려볼게요! **틀릴 수도 있으니 교차검증은 필수입니다!**"
Following Context: {context}
Question: {question}
Helpful Answer:
각 prompt 텍스트 파일의 내용을
앞의 코드들은 전 포스팅에서 설명을 잘(?)한 것 같아서 오늘은 추가한 코드와 변경된 코드 위주로 살펴보자.
def load_chat_prompt_template(prompt_path):
프롬프트 경로에서 프롬프트 파일을 읽어 시스템 메시지와 사용자 메시지로 분리하고
이를 사용하여 ChatPromptTemplate을 생성
with open(prompt_path, 'r', encoding='utf-8') as file:
prompt_text = file.read()
# 'system'과 'human'으로 분리
sections = prompt_text.strip().split('human') # 문자열 양쪽에 공백 제거 후, 'human'찾아 섹션을 나누고 리스트로 분리하기
# 정의 부분
system_prompt = ''
user_prompt = ''
if len(sections) == 2: # system과 human으로 분리되어 있을 때
system_prompt = sections[0].replace('system', '').strip()
user_prompt = sections[1].strip()
user_prompt = prompt_text.strip() # 아니면 바로 user_prompt로 할당.
# 메시지 리스트 생성
messages = []
if system_prompt:
messages.append(("system", system_prompt))
if user_prompt:
messages.append(("user", user_prompt))
# ChatPromptTemplate 생성
prompt = ChatPromptTemplate.from_messages(messages)
return prompt
with open(prompt_path, 'r', encoding='utf-8') as file:
에 내용을 읽어옴.sections = prompt_text.strip().split('human')
)하고, 'human'
이라는 문자열을 기준으로 나누어 리스트(sections
)로 분리.시스템 메시지와 사용자 메시지 분리
if len(sections) == 2
: sections
의 길이가 2이면, system
과 human
구문이 포함된 것으로 간주하고 각각 할당.system_prompt = sections[0].replace('system', '').strip()
: system
키워드를 제거하고 공백을 다듬은 후 시스템 메시지로 할당.user_prompt = sections[1].strip()
: 두 번째 요소를 사용자 메시지로 할당.else
: human
구문이 없으면 전체 텍스트를 user_prompt
에 할당.메시지 리스트 생성 및 ChatPromptTemplate 생성
리스트에 시스템 메시지와 사용자 메시지를 추가.ChatPromptTemplate.from_messages(messages)
를 호출하여 messages
리스트를 기반으로 프롬프트 템플릿을 생성하고 반환. 결론 : 이 함수는 주어진 텍스트 파일에서 시스템과 사용자 메시지를 분리하여 ChatPromptTemplate 객체로 반환하는 역할이다.
prompt를 시스템과, 휴먼으로 구분지어서 이런 로직을 짰는데, 이런식으로 하면 prompt.txt에 형식이 맞지 않은 프롬프트가 들어가면 구현이 안될것 같다.
(.txt에 prompt를 넣을때 형식을 맞추긴해서 돌아가긴 했다. 다만, 다른 형식의 프롬프트를 가져와서 적용하려면 이 코드를 수정해야 하는 불편함이 예상된다.)
# 체인 생성 함수
def create_rag_chain(prompt_template, retriever, model):
chain = (
"question": DebugPassThrough(),
"context": retriever
| ContextToText()
| prompt_template
| model
return chain
을 이용하여 체인 생성 함수를 구성했다.
을 사용함으로써question
를 독립적으로 준비하고, 이후의 체인에서 병합하여 사용하게 하여 입력을 효율적으로 처리하고 있습니다.
라고 한다...
# 타임스탬프 생성
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# 프롬프트 및 응답 폴더 설정
prompt_folder = 'Prompts'
prompt_files = [f for f in os.listdir(prompt_folder) if f.endswith('.txt')]
output_folder = 'responses'
os.makedirs(output_folder, exist_ok=True)
코드 설명
prompt_folder = 'Prompts'
는 프롬프트 파일이 위치한 폴더의 이름prompt_files = [f for f in os.listdir(prompt_folder) if f.endswith('.txt')]
를 통해 prompt_folder
내의 모든 파일 및 디렉토리 이름을 가져온다..txt
로 끝나는 파일만 필터링하고, 이 파일들을 prompt_files
리스트에 저장prompt_files
는 prompt_folder
폴더 내에 있는 모든 텍스트 파일(.txt)의 목록을 포함output_folder = 'responses'
는 결과 파일들이 저장될 폴더 이름os.makedirs(output_folder, exist_ok=True)
폴더를 생성, exist_ok=True
는 폴더가 이미 존재할 경우 에러를 발생시키지 않고 그대로 진행하도록 설정하는 옵션responses
폴더가 없으면 새로 생성, 존재하면 아무것도 안함while True:
query = input("질문을 입력하세요 (종료하려면 빈 줄 입력): ")
if not query.strip():
for prompt_file in prompt_files:
prompt_path = os.path.join(prompt_folder, prompt_file)
prompt_template = load_chat_prompt_template(prompt_path)
chain = create_rag_chain(prompt_template, retriever, model)
print(f"\nUsing prompt from {prompt_file}")
response = chain.invoke(query) # 문자열 요구 chain.invoke({"question": query}) -> chain.invoke(query) 수정
print("Final Response:")
# 응답 저장
output_file = f"{os.path.splitext(prompt_file)[0]}_{timestamp}_result.txt"
output_path = os.path.join(output_folder, output_file)
with open(output_path, 'a', encoding='utf-8') as file:
file.write("\nQuestion: " + query + "\nResponse: " + response.content + "\n")
print(f"Response saved to {output_file}")
코드 설명
query = input("질문을 입력하세요 (종료하려면 빈 줄 입력): ")
if not query.strip():
에서 공백을 제거한 후 내용이 없으면 break
로 루프를 종료(간단한 종료 로직)for prompt_file in prompt_files:
리스트에 있는 각 프롬프트 파일에 대해 반복 작업을 수행prompt_path = os.path.join(prompt_folder, prompt_file)
와 prompt_file
을 결합하여 생성prompt_template = load_chat_prompt_template(prompt_path)
함수를 호출하여 프롬프트 파일을 읽고 템플릿을 생성chain = create_rag_chain(prompt_template, retriever, model)
함수를 사용하여 prompt_template
, retriever
, model
을 기반으로 체인을 생성response = chain.invoke(query)
를 사용하여 질문(query
)에 대한 응답을 생성{"question": query}
형태였으나, 문자열만 요구하므로 query
만 전달하도록 수정)print("Final Response:")
response = chain.invoke(query)
print(type(response.content)) # <class 'str'>
코드 설명
response = chain.invoke(query)
메서드를 사용하여 query
에 대한 응답을 생성합니다.query
를 입력으로 받아, 체인을 통해 처리된 결과를 response
변수에 저장합니다.print(type(response.content))
의 데이터 유형을 확인하기 위해 type()
함수를 사용하여 출력합니다.response.content
가 str
형식임을 확인