LangGraph - Agentic RAG Flows 1

hyeony·2025년 7월 26일

LLM

목록 보기
3/4

2. LangChain Vector Store Ingestion Pipeline

가. Ingestion?

Ingestion 단계는 시스템이 외부의 비정형 데이터를 내부 지식베이스로 흡수하는 과정을 의미한다. 인간이 음식을 섭취해 에너지원으로 소화·흡수하듯이, 시스템도 웹페이지 HTML, PDF, 로그 등 원본 데이터를 한곳에 모아 정제·분할·메타데이터 부착 등 전처리 과정을 거쳐 유용한 형태로 변환 및 저장한다.

이 과정을 자동화된 일괄 처리로 구현하면 한 번의 코드 실행만으로 수십에서 수백 개에 이르는 문서를 빠르게 수집하고, 미리 정의한 chunk size와 overlap 규칙에 따라 균일하게 분할해 둘 수 있다.

결과적으로 Ingestion 단계는 RAG 파이프라인의 출발점으로서 이후 검색·추론·생성 단계를 효율적이고 안정적으로 운영할 수 있는 토대를 제공한다.

나. Code

① 환경 변수 로드 및 import

# ingestion.py

from dotenv import load_dotenv
load_dotenv()

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from langchain_community.document_loaders import WebBaseLoader
from langchain_openai import OpenAIEmbeddings

② 대상 URL 리스트 정의

# ingestion.py

urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]

위 배열은, 색인할 웹 문서의 URL을 나열한 것이다.

③ 웹 문서 불러오기 및 Flatten

# ingestion.py

docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]

각 URL에 대해 load()을 호출하여 Document 객체 리스트를 얻고, 중첩된 리스트를 1차원 리스트로 평탄화(flatten)한다.

④ 텍스트 분할 설정 및 실행

# ingestion.py

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=250, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)

chunk_size=250 토큰 단위로 overlap 없이 텍스트를 자른다.

⑤ 기존 색인을 불러와 Retriever 초기화

# ingestion.py

retriever = Chroma(
    collection_name="rag-chroma",
    persist_directory="./.chroma",
    embedding_function=OpenAIEmbeddings(),
).as_retriever()

한 번 생성된 rag-chroma 컬렉션을 로드하여, RAG 파이프라인의 검색용 인터페이스(Retriever)로 변환한다.

위와 같이, ingestion.py웹 문서 로드 → 텍스트 분할 → 벡터 인덱싱 및 로드의 순서로, RAG Workflow의 검색 단계에 필요한 지식 베이스를 준비하는 역할을 수행한다.

3. Managing Information Flow in LangGraph: The GraphState

가. GraphState?

GraphStateLangGraph 워크플로우의 Thought ‑ Action ‑ Observation 과정에서 사용자 질의, 검색 여부, 생성 결과, 참조 문서 정보를 한곳에 일관되게 보관하여 컨텍스트를 통합하는 핵심 저장소이다.

필요에 의해 새로운 필드를 추가해도 기존 코드 수정 없이 손쉽게 확장할 수 있어 유연성을 제공한다. 무엇보다 상태 객체 하나만으로도 현재까지의 질의 내용, LLM 생성 결과, 웹 검색 트리거 플래그, 관련 문서 리스트가 모두 포함되어 있으므로 디버깅과 로깅이 매우 편리하다.

나. Code

# state.py

from typing import List, TypedDict

TypedDict를 상속하여, 이 클래스를 통해 생성되는 딕셔너리가 반드시 가져야 할 필수 키와 각 키의 value type을 명시한다. 이렇게 선언하면 오타 또는 잘못된 타입 할당을 사전에 검출할 수 있어 안정성을 보장한다.

# state.py

class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        web_search: whether to add search
        documents: list of documents
    """
    
    question: str
    generation: str
    web_search: bool
    documents: List[str]
  • question: str
    : 사용자가 제시한 질의를 문자열 형태로 저장

  • generation: str
    : LLM이 생성한 응답 텍스트를 기록하여, 이후 추가 생성이나 수정 시 참조

  • web_search: bool
    : 에이전트가 추가 웹 검색을 수행할지 여부를 나타냄

  • documents: List[str]
    : 현재 컨텍스트에서 참조할 문서의 URL이나 식별자를 문자열 리스트로 관리

4. Fetching Context for LLMs: The LangGraph Retrieve Node

가. Retrieve?

LangGraph 워크플로우에서 Retrieve 단계는 LLM에 제공할 문맥을 실제로 가져오는 핵심 역할을 수행한다.

사용자의 질문만으로는 충분한 정보를 담보하기 어려우므로, 이 단계에서는 벡터 스토어에 저장된 과거 지식 조각을 검색하여 LLM이 참조할 추가 정보를 확보한다.

또한, 검색 기능을 별도 Node로 분리하여 평가와 디버깅이 용이해지고, 필요한 경우 다른 프로젝트나 테스트 환경에서도 재활용할 수 있어 전체 파이프라인을 모듈화·유연하게 관리할 수 있다.

나. Code

① import

# retrieve.py

from typing import Any, Dict

from graph.state import GraphState
from ingestion import retriever

② 함수

# retrieve.py

def retrieve(state: GraphState) -> Dict[str, Any]:
    print("---RETRIEVE---")
    question = state["question"]	# 질의 추출

    documents = retriever.invoke(question) # 벡터 스토어 검색
    return {"documents": documents, "question": question}
    # LangGraph의 다음 노드에서는 위 반환값을 받아 문맥으로 활용한다

5. Relevance Filter for RAG using LangChain's Structured Output

가. Why Relevance Filter?

RAG PipelineRetrieve 단계에서 벡터 유사도 기반으로 문서를 검색해 LLM에 문맥을 제공하지만, 단순 검색만으로는 질문과 무관한 노이즈가 섞일 수 있다.

이러한 불필요한 문서는 모델의 생성 과정에 혼선을 주어 답변의 정확도를 떨어뜨리고, 처리 비용을 불필요하게 증가시킨다.

따라서 검색된 문서를 질문과의 연관성 여부로 이진 판단(yes/no)하여 걸러내는 Relevance Filtering이 필요하다.

이 과정으로 LLM은 핵심적인 정보에만 집중할 수 있고, 만약 관련 문서가 부족하다면 추가 웹 검색을 자동으로 트리거하여 부족한 정보를 보완할 수 있다. 궁극적으로 정제된 Context을 제공하여 일관성 있고 고품질의 답변을 안정적으로 생성할 수 있다.

나. Code for Retrieval Grader

1) Purpose

RAG 파이프라인에 공급되는 문서의 연관성을 자동으로 평가하여, 실제로 질문에 도움이 되는 문서만 후속 생성 단계에 전달하고, 관련 없는 문서가 섞여 있을 경우 추가 웹 검색을 트리거한다.

2) Code Implementation

① import & LLM 초기화

# retrieval_grader.py

from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field		# 스키마 정의
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(temperature=0)

② Pydantic 모델 정의

# retrieval_grader.py

class GradeDocuments(BaseModel):
    """Binary score for relevance check on retrieved documents."""

    binary_score: str = Field(
        description="Documents are relevant to the question, 'yes' or 'no'"
    )
  • GradeDocuments은 LLM 출력의 스키마를 정의한다.
  • binary_score 필드를 str 타입으로 선언하여, "yes" 또는 "no"만 허용하도록 문서화 및 검증한다.

③ Structured Output Wrapper 구성

# retrieval_grader.py

structured_llm_grader = llm.with_structured_output(GradeDocuments)
  • 기존 llm 인스턴스에 GradeDocuments schema 적용
  • LLM이 항상 JSON 형식으로 { "binary_score": "yes" } 또는 { "binary_score": "no" } 형태로 응답하도록 한다.

④ Prompt Template 정의

# retrieval_grader.py

system = """You are a grader assessing relevance of a retrieved document to a user question. \n 
    If the document contains keyword(s) or semantic meaning related to the question, grade it as relevant. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""
grade_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
    ]
)
  • System 메시지: 문서가 질문과 연관 있는지 판단하는 역할을 설명

  • Human 메시지: 실제 평가할 documentquestion 변수를 placeholder로 삽입하도록 지정

  • ChatPromptTemplate.from_message()로 두 메시지를 순차적으로 조합한 Prompt Template을 만든다.

⑤ chain 구성

# retrieval_grader.py

retrieval_grader = grade_prompt | structured_llm_grader
  • 위 Chain을 invoke()하면, 내부적으로 프롬프트에 document, question을 채워 LLM에 전달하고, LLM의 JSON 응답을 GradeDocuments 모델로 파싱하여 돌려준다.

3) Test

① 환경변수 로드 및 import

# test_chains.py

from dotenv import load_dotenv

load_dotenv()

from graph.chains.retrieval_grader import GradeDocuments, retrieval_grader
from ingestion import retriever

② 긍정 사례 테스트(yes)

# test_chains.py

def test_retrival_grader_answer_yes() -> None:
    question = "agent memory"
    docs = retriever.invoke(question)
    doc_txt = docs[1].page_content

    res: GradeDocuments = retrieval_grader.invoke(
        {"question": question, "document": doc_txt}
    )

    assert res.binary_score == "yes"
  • “agent memory”라는 질문과 유관한 문서 chunk를 인자로 넣어 chain 실행
  • 반환된 binary_score"yes"인지 확인하여, 관련 문서를 제대로 식별하는지 검증

③ 부정 사례 테스트(no)

# test_chains.py

def test_retrival_grader_answer_no() -> None:
    question = "agent memory"
    docs = retriever.invoke(question)
    doc_txt = docs[1].page_content

    res: GradeDocuments = retrieval_grader.invoke(
        {"question": "how to make pizaa", "document": doc_txt}
    )

    assert res.binary_score == "no"
  • 같은 문서 chunk에 대해 완전히 무관한 질문인 “how to make pizaa”을 넣어 chain을 실행
  • 반환된 binary_score"no"인지 확인하여, 관련 없는 문서를 올바르게 걸러내는지 검증

이렇게 두 가지 시나리오를 통해, retrieval_grader chain이 질문과 문서 간의 연관성을 정확하게 이진 분류(yes/no)하는지 자동으로 확인한다. 테스트가 모두 통과하면, 코드 변경이나 라이브러리 업데이트에도 해당 기능의 안정성이 보장된다는 것을 의미한다.

다. Code for grade_documents

1) Purpose

본 code는 Retrieve 단계에서 얻은 문서 리스트를 순회하며, 각 문서의 질문 연관성을 retrieval_grader로 평가한다.

그래서 관련 문서만 남기고 연관 없는 문서가 하나라도 있으면 web_search 플래그를 True로 설정해 추가 검색을 트리거하는 워크플로우 상태 업데이트 노드이다.

2) Code Implementation

① import

# grade_documents.py

from typing import Any, Dict

from graph.chains.retrieval_grader import retrieval_grader
from graph.state import GraphState

② function

# grade_documents.py

def grade_documents(state: GraphState) -> Dict[str, Any]:
    """
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant, we will set a flag to run web search

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Filtered out irrelevant documents and updated web_search state
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    filtered_docs = []
    web_search = False
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score.binary_score
        if grade.lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            web_search = True
            continue
    return {"documents": filtered_docs, "question": question, "web_search": web_search}

grade_documents 함수는 Retrieve 단계에서 넘어온 문서 리스트를 받아, 각 문서가 질문과 연관성이 있는지 retrieval_grader로 평가한 뒤 관련 문서만 선별한다. 하나라도 무관한 문서가 있으면 web_search 플래그를 True로 설정하여 추가 정보를 가져올 준비를 한다.

내부적으로는 상태 객체에서 질문과 문서 리스트를 꺼내 로그를 출력한 뒤, 반복문을 돌며 “yes”로 판정된 문서만 filtered_docs에 모으고 “no”가 나오면 웹 검색 필요 여부를 표시한다. 최종적으로는 필터링된 문서 리스트, 원본 질문, 그리고 웹 검색 플래그를 담은 새 상태를 반환하여 후속 노드가 이를 활용할 수 있도록 한다.

6. Implementing a Web Search Node in LangGraph using Tavily API

가. Purpose

본 코드는 기존 벡터 스토어에서 관련 문서를 모두 걸러내고도 여전히 정보가 부족할 때, 외부 웹 검색을 통해 추가 컨텍스트를 확보하기 위해 사용된다.

LangGraph 워크플로우 내에서 web_search 노드를 실행하면, 질문을 기반으로 TavilySearch API를 호출하여 최신 웹 결과를 가져오고, 이를 Document 형태로 합쳐 기존 문서 리스트에 추가하여 후속 노드가 보다 풍부한 정보를 참조할 수 있게 한다.

나. Code Implementation

① 환경 설정 및 도구 초기화

# web_search.py

from typing import Any, Dict

from langchain.schema import Document
from langchain_tavily import TavilySearch

from graph.state import GraphState
from dotenv import load_dotenv

load_dotenv()
web_search_tool = TavilySearch(max_results=3)

.env 파일에서 API 키 등을 불러오고, 최대 3 개 결과를 반환하는 TavilySearch 인스턴스를 만든다.

② 함수 시그니처

# web_search.py

def web_search(state: GraphState) -> Dict[str, Any]:
  • 입력 state에는 question(str)과 현재까지 수집된 documents(List[Document] 또는 None)가 담겨 있다.

  • 출력으로는 업데이트된 documents 리스트와 원본 question을 반환한다.

③ 검색 실행 및 결과 병합

# web_search.py

    print("---WEB SEARCH---")
    question = state["question"]
    documents = state["documents"]

    tavily_results = web_search_tool.invoke({"query": question})
    joined_tavily_result = "\n".join(
    	[tavily_result["content"] for tavily_result in tavily_results["results"]]
    )
    web_results = Document(page_content=joined_tavily_result)
  • TavilySearch.invoke에 질문을 넘겨 웹 검색을 수행하고, 받은 결과들의 "content"를 줄바꿈으로 합친다.

  • 합친 텍스트를 langchain.schema.Document 객체로 래핑한다.

④ 기존 문서 리스트에 추가

# web_search.py

    if documents is not None:
        documents.append(web_results)
    else:
        documents = [web_results]
  • 이전에 수집된 문서가 있으면 뒤에 붙이고, 없으면 새 리스트를 만들어 할당한다.

⑤ 그 외

# web_search.py

    return {"documents": documents, "question": question}

if __name__ == "__main__":
    web_search(state={"question": "agent memory", "documents": None})
  • 업데이트된 문서 리스트와 질문을 포함한 딕셔너리를 반환하여, 워크플로우 다음 단계에서 사용 가능하도록 한다.

7. Generation

# generation.py

from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(temperature=0)
prompt = hub.pull("rlm/rag-prompt")

generation_chain = prompt | llm | StrOutputParser()

위 코드는 LangChain Hub에 저장된 RAG용 Prompt Template을 불러와 ChatOpenAI 모델과 연결한 뒤, 최종 출력만 깔끔한 문자열로 뽑아 내는 Generation Chain을 구성한다.

<참고 자료>
https://www.udemy.com/course/langgraph/?couponCode=KEEPLEARNING
https://github.com/emarco177/langgraph-course/commit/513e3cf42b3efc809dccb8a5149f6e8181aa8e6a
https://github.com/emarco177/langgraph-course/commit/03f79ae97fa601a7cd8b8abc52855a6f8095d867
https://github.com/emarco177/langgraph-course/commit/c2d71c79a8ab15d0e619bb7ca5adc75df49143de
https://github.com/emarco177/langgraph-course/commit/9107e7a90833f9c06b2e345ad348c759c34aa1b2
https://github.com/emarco177/langgraph-course/commit/6d4fdc4193f472e57ef9f26d187dd2d9bcc0a90b

profile
Chung-Ang Univ. EEE.

0개의 댓글