RAG & LANCHAIN (6)- Langchain 기반 RAG 기본 구조 update 버전

이영락·2024년 9월 2일
0

인공지능 공부

목록 보기
20/33

1. 환경 설정

  • API 키 로드, 필요한 라이브러리 불러오기, LangSmith 추적 설정 등은 동일하게 유지합니다.
from dotenv import load_dotenv
from langchain_teddynote import logging
import os

# 환경 변수 로드 (API 키 등)
load_dotenv()

# 모델 및 캐시 경로 설정
os.environ["TRANSFORMERS_CACHE"] = "./cache/"
os.environ["HF_HOME"] = "./cache/"

# LangSmith 추적 설정 (프로젝트 이름 입력)
logging.langsmith("RAG-Project")
LangSmith 추적을 시작합니다.

2. 문서 로드

  • 다양한 문서 형식(PDF, CSV, 웹페이지 등)에 대한 로딩과 텍스트 분할 방법을 고려합니다.
from langchain.document_loaders import PyMuPDFLoader, CSVLoader, WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter, CodeTextSplitter

def load_and_split_document(file_path, file_type="text"):
    # 문서 로더 선택
    if file_type == "pdf":
        loader = PyMuPDFLoader(file_path=file_path)
    elif file_type == "csv":
        loader = CSVLoader(file_path=file_path)
    elif file_type == "web":
        loader = WebBaseLoader(url=file_path)
    else:
        loader = TextLoader(file_path=file_path)
    
    # 문서 로드
    documents = loader.load()

    # 텍스트 분할기 선택
    if file_type == "code":
        text_splitter = CodeTextSplitter(chunk_size=1000, chunk_overlap=200)
    else:
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)

    # 문서 분할
    documents = text_splitter.split_documents(documents)
    return documents

3. 임베딩 생성 및 벡터 저장소 구축

  • 다양한 임베딩 모델(Sentence Transformers, Hugging Face Models 등)을 선택할 수 있으며, 벡터 저장소로 FAISS 외에도 Chroma, Pinecone, Milvus 등을 고려할 수 있습니다.
from langchain.embeddings import OpenAIEmbeddings, HuggingFaceEmbeddings
from langchain.vectorstores import FAISS, Chroma, Pinecone

def create_vector_store(documents, embedding_model="openai", vector_store_type="faiss"):
    # 임베딩 모델 설정
    if embedding_model == "openai":
        embedding_model = OpenAIEmbeddings()
    elif embedding_model == "huggingface":
        embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")

    # 벡터 저장소 구축
    if vector_store_type == "faiss":
        vector_store = FAISS.from_documents(documents, embedding_model)
    elif vector_store_type == "chroma":
        vector_store = Chroma.from_documents(documents, embedding_model)
    elif vector_store_type == "pinecone":
        # Pinecone API 설정 필요
        vector_store = Pinecone.from_documents(documents, embedding_model)
    
    return vector_store

4. 검색기(Retriever) 설정

  • 검색기 설정은 동일하며, 특정 벡터 저장소에 따라 다른 옵션을 설정할 수 있습니다.
from langchain.retrievers import VectorStoreRetriever

def setup_retriever(vector_store):
    # 검색기 설정
    retriever = VectorStoreRetriever(vectorstore=vector_store)
    return retriever

5. 모델 로드

  • 언어 모델을 로드하는 부분은 동일합니다. 필요한 경우, 추가적인 모델 옵션을 고려할 수 있습니다.
from langchain_openai import ChatOpenAI

def load_language_model(model_name="gpt-4", temperature=0.7):
    # ChatOpenAI 객체 생성
    gpt = ChatOpenAI(
        temperature=temperature,  # 창의성과 일관성을 조절
        model_name=model_name,  # 모델명
        streaming=True
    )
    return gpt

맞습니다. load_language_model 함수는 사용자가 선택하는 언어 모델에 따라 내부적으로 호출하는 함수가 달라질 수 있습니다. 현재 코드에서는 ChatOpenAI만 사용하고 있지만, 다른 모델(예: Hugging Face 모델, Anthropic Claude, Cohere 등)을 사용할 경우 다른 함수나 라이브러리를 사용해야 합니다.

아래는 다양한 모델 선택에 따라 적절한 함수나 객체를 반환하는 방식으로 load_language_model 함수를 확장한 예시입니다.

from langchain_openai import ChatOpenAI
from langchain_anthropic import Anthropic
from langchain_cohere import Cohere
from transformers import pipeline

def load_language_model(model_name="gpt-4", temperature=0.7, provider="openai"):
    if provider == "openai":
        # OpenAI 모델 로드
        gpt = ChatOpenAI(
            temperature=temperature,
            model_name=model_name,
            streaming=True
        )
    elif provider == "anthropic":
        # Anthropic Claude 모델 로드
        gpt = Anthropic(
            model=model_name,
            temperature=temperature,
            streaming=True
        )
    elif provider == "cohere":
        # Cohere 모델 로드
        gpt = Cohere(
            model=model_name,
            temperature=temperature,
            streaming=True
        )
    elif provider == "huggingface":
        # Hugging Face 모델 로드 (예: Transformers 사용)
        gpt = pipeline("text-generation", model=model_name)
    else:
        raise ValueError(f"Unknown provider: {provider}")
    
    return gpt

예시 사용:

  1. OpenAI 모델:

    gpt = load_language_model(model_name="gpt-4", provider="openai")
  2. Anthropic Claude 모델:

    gpt = load_language_model(model_name="claude-v1", provider="anthropic")
  3. Cohere 모델:

    gpt = load_language_model(model_name="command-xlarge-nightly", provider="cohere")
  4. Hugging Face Transformers 모델:

    gpt = load_language_model(model_name="gpt2", provider="huggingface")

설명:

  • OpenAI: ChatOpenAI 객체를 사용합니다.
  • Anthropic: Anthropic 객체를 사용합니다.
  • Cohere: Cohere 객체를 사용합니다.
  • Hugging Face: transformers 라이브러리의 pipeline을 사용하여 텍스트 생성 모델을 로드합니다.

이 방식으로, 사용자가 다양한 모델을 선택할 수 있으며, 각 모델에 적합한 함수를 호출하여 gpt 객체를 생성할 수 있습니다. 이렇게 하면 코드는 확장 가능하고 유연해져, 여러 모델을 사용하는 프로젝트에서 쉽게 관리할 수 있습니다.

6. 프롬프트 템플릿 및 결과 파서 설정

  • 모델의 출력을 적절한 형식으로 변환하기 위해 프롬프트 템플릿과 결과 파서를 설정합니다.
from langchain.prompts import PromptTemplate

def setup_prompt():
    # 프롬프트 템플릿 구성
    prompt_template = PromptTemplate.from_template(
        "Given the following context, answer the question: {context}\n\nQuestion: {question}\n\nAnswer:"
    )
    output_parser = StrOutputParser()
    return prompt_template, output_parser

7. 체인 구성

  • 프롬프트 템플릿, 모델, 결과 파서를 체인으로 연결합니다.
from langchain_core.output_parsers import StrOutputParser
from langchain.chains import RetrievalQA

def create_rag_chain(gpt, retriever, output_parser):
    rag_chain = RetrievalQA.from_chain_type(
        llm=gpt,
        chain_type="stuff",
        retriever=retriever,
        return_source_documents=True,
        output_parser=output_parser  # 체인에 결과 파서를 포함
    )
    return rag_chain

8. 체인 실행 및 스트리밍 출력

  • 사용자의 질문을 입력으로 받아 검색과 답변 생성이 이루어집니다. 스트리밍을 통해 실시간으로 출력을 받습니다.
from langchain_teddynote.messages import stream_response

def generate_answer(rag_chain, question):
    answer = rag_chain({"question": question})
    stream_response(answer["result"])  # 스트리밍 출력
    return answer

9. 체인 실행 및 오류 처리

  • 오류 처리와 같은 추가 기능을 포함합니다.
def main():
    try:
        # 1. 환경 설정
        initialize_environment()

        # 2. 문서 로드 및 분할
        documents = load_and_split_document(file_path="your_document.txt", file_type="text")

        # 3. 임베딩 생성 및 벡터 저장소 구축
        vector_store = create_vector_store(documents)

        # 4. 검색기 설정
        retriever = setup_retriever(vector_store)

        # 5. 모델 로드
        gpt = load_language_model()

        # 6. 프롬프트 템플릿 및 결과 파서 설정
        prompt_template, output_parser = setup_prompt()

        # 7. 체인 구성
        rag_chain = create_rag_chain(gpt, retriever, output_parser)

        # 8. 체인 실행 및 스트리밍 출력
        question = "What is the capital of South Korea?"
        generate_answer(rag_chain, question)
    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    main()

10. LangSmith를 통한 모니터링 및 로깅

  • 추가로 LangSmith를 통해 추적하고 분석할 수 있는 로그 데이터의 예를 들어 설명합니다.
# Example of tracking model performance and response time
logging.langsmith("RAG-Project")
log_data = {
    "question": question,
    "response_time": response_time,  # Calculate response time
    "model_output": model_output,  # Log the actual output from the model
    "source_documents": source_documents  # Log the documents retrieved from the vector store
}
logging.log(log_data)

11. 대화 내용을 기억하는 RAG 체인

  • 대화 내용이 커질 경우 메모리 관리를 위한 방법을 설명합니다.
# 대화 기록 관리 함수에서 오래된 대화 내용 삭제
def manage_session_history(session_ids):
    if session_ids in store:
        # Example: Keep only the last 10 interactions to save memory
        store[session_ids].messages = store[session_ids].messages[-10:]

통합코드

from dotenv import load_dotenv
from langchain_teddynote import logging
import os
from langchain.document_loaders import PyMuPDFLoader, CSVLoader, WebBaseLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter, CodeTextSplitter
from langchain.embeddings import OpenAIEmbeddings, HuggingFaceEmbeddings
from langchain.vectorstores import FAISS, Chroma, Pinecone
from langchain.retrievers import VectorStoreRetriever
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain.chains import RetrievalQA
from langchain_teddynote.messages import stream_response
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory


# 1. 환경 설정
def initialize_environment():
    load_dotenv()
    os.environ["TRANSFORMERS_CACHE"] = "./cache/"
    os.environ["HF_HOME"] = "./cache/"
    logging.langsmith("RAG-Project")
    print("LangSmith 추적을 시작합니다.")


# 2. 문서 로드 및 분할
def load_and_split_document(file_path, file_type="text"):
    if file_type == "pdf":
        loader = PyMuPDFLoader(file_path=file_path)
    elif file_type == "csv":
        loader = CSVLoader(file_path=file_path)
    elif file_type == "web":
        loader = WebBaseLoader(url=file_path)
    else:
        loader = TextLoader(file_path=file_path)
    
    documents = loader.load()

    if file_type == "code":
        text_splitter = CodeTextSplitter(chunk_size=1000, chunk_overlap=200)
    else:
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)

    documents = text_splitter.split_documents(documents)
    return documents


# 3. 임베딩 생성 및 벡터 저장소 구축
def create_vector_store(documents, embedding_model="openai", vector_store_type="faiss"):
    if embedding_model == "openai":
        embedding_model = OpenAIEmbeddings()
    elif embedding_model == "huggingface":
        embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")

    if vector_store_type == "faiss":
        vector_store = FAISS.from_documents(documents, embedding_model)
    elif vector_store_type == "chroma":
        vector_store = Chroma.from_documents(documents, embedding_model)
    elif vector_store_type == "pinecone":
        vector_store = Pinecone.from_documents(documents, embedding_model)
    
    return vector_store


# 4. 검색기 설정
def setup_retriever(vector_store):
    retriever = VectorStoreRetriever(vectorstore=vector_store)
    return retriever


# 5. 모델 로드 
def load_language_model(model_name="gpt-4", temperature=0.7, provider="openai"):
    if provider == "openai":
        # OpenAI 모델 로드
        gpt = ChatOpenAI(
            temperature=temperature,
            model_name=model_name,
            streaming=True
        )
    elif provider == "anthropic":
        # Anthropic Claude 모델 로드
        gpt = Anthropic(
            model=model_name,
            temperature=temperature,
            streaming=True
        )
    elif provider == "cohere":
        # Cohere 모델 로드
        gpt = Cohere(
            model=model_name,
            temperature=temperature,
            streaming=True
        )
    elif provider == "huggingface":
        # Hugging Face 모델 로드 (예: Transformers 사용)
        gpt = pipeline("text-generation", model=model_name)
    else:
        raise ValueError(f"Unknown provider: {provider}")
    
    return gpt
# 6. 프롬프트 템플릿 및 결과 파서 설정
def setup_prompt():
    prompt_template = PromptTemplate.from_template(
        "Given the following context, answer the question: {context}\n\nQuestion: {question}\n\nAnswer:"
    )
    output_parser = StrOutputParser()
    return prompt_template, output_parser


# 7. 체인 구성
def create_rag_chain(gpt, retriever, output_parser):
    rag_chain = RetrievalQA.from_chain_type(
        llm=gpt,
        chain_type="stuff",
        retriever=retriever,
        return_source_documents=True,
        output_parser=output_parser
    )
    return rag_chain


# 8. 체인 실행 및 스트리밍 출력
def generate_answer(rag_chain, question):
    answer = rag_chain({"question": question})
    stream_response(answer["result"])
    return answer


# 세션 기록을 관리하는 함수
store = {}

def manage_session_history(session_ids):
    if session_ids in store:
        store[session_ids].messages = store[session_ids].messages[-10:]

def get_session_history(session_ids):
    if session_ids not in store:
        store[session_ids] = ChatMessageHistory()
    manage_session_history(session_ids)
    return store[session_ids]


# 9. RAG 체인에 대화 기록을 추가
def create_rag_chain_with_history(gpt, retriever, output_parser):
    rag_chain = create_rag_chain(gpt, retriever, output_parser)
    return RunnableWithMessageHistory(
        rag_chain,
        get_session_history,
        input_messages_key="question",
        history_messages_key="chat_history"
    )


# 10. 체인 실행 및 오류 처리
def main():
    try:
        initialize_environment()

        documents = load_and_split_document(file_path="your_document.txt", file_type="text")

        vector_store = create_vector_store(documents)

        retriever = setup_retriever(vector_store)

        gpt = load_language_model()

        prompt_template, output_parser = setup_prompt()

        rag_chain = create_rag_chain_with_history(gpt, retriever, output_parser)

        question = "What is the capital of South Korea?"
        generate_answer(rag_chain, question)
    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    main()

설명:

  • 문서 로드 및 분할: 다양한 문서 형식(PDF, CSV, 웹 등)에 대해 로드하고 텍스트를 분할합니다.
  • 임베딩 생성 및 벡터 저장소 구축: 다양한 임베딩 모델과 벡터 저장소(Faiss, Chroma, Pinecone 등)를 사용할 수 있도록 선택지를 제공합니다.
  • 체인 구성: 체인을 구성하는 과정에서 대화 기록을 관리하는 기능을 포함시켰습니다.
  • 오류 처리 및 메모리 관리: 오류 처리 및 대화 기록의 메모리 관리를 위한 기능도 포함시켰습니다.

참고자료

https://wikidocs.net/250954

profile
AI Engineer / 의료인공지능

0개의 댓글

관련 채용 정보