
Ingestion 단계는 시스템이 외부의 비정형 데이터를 내부 지식베이스로 흡수하는 과정을 의미한다. 인간이 음식을 섭취해 에너지원으로 소화·흡수하듯이, 시스템도 웹페이지 HTML, PDF, 로그 등 원본 데이터를 한곳에 모아 정제·분할·메타데이터 부착 등 전처리 과정을 거쳐 유용한 형태로 변환 및 저장한다.
이 과정을 자동화된 일괄 처리로 구현하면 한 번의 코드 실행만으로 수십에서 수백 개에 이르는 문서를 빠르게 수집하고, 미리 정의한 chunk size와 overlap 규칙에 따라 균일하게 분할해 둘 수 있다.
결과적으로 Ingestion 단계는 RAG 파이프라인의 출발점으로서 이후 검색·추론·생성 단계를 효율적이고 안정적으로 운영할 수 있는 토대를 제공한다.
① 환경 변수 로드 및 import
# ingestion.py
from dotenv import load_dotenv
load_dotenv()
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from langchain_community.document_loaders import WebBaseLoader
from langchain_openai import OpenAIEmbeddings
② 대상 URL 리스트 정의
# ingestion.py
urls = [
"https://lilianweng.github.io/posts/2023-06-23-agent/",
"https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
"https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]
위 배열은, 색인할 웹 문서의 URL을 나열한 것이다.
③ 웹 문서 불러오기 및 Flatten
# ingestion.py
docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]
각 URL에 대해 load()을 호출하여 Document 객체 리스트를 얻고, 중첩된 리스트를 1차원 리스트로 평탄화(flatten)한다.
④ 텍스트 분할 설정 및 실행
# ingestion.py
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=250, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)
chunk_size=250 토큰 단위로 overlap 없이 텍스트를 자른다.
⑤ 기존 색인을 불러와 Retriever 초기화
# ingestion.py
retriever = Chroma(
collection_name="rag-chroma",
persist_directory="./.chroma",
embedding_function=OpenAIEmbeddings(),
).as_retriever()
한 번 생성된 rag-chroma 컬렉션을 로드하여, RAG 파이프라인의 검색용 인터페이스(Retriever)로 변환한다.
위와 같이, ingestion.py은 웹 문서 로드 → 텍스트 분할 → 벡터 인덱싱 및 로드의 순서로, RAG Workflow의 검색 단계에 필요한 지식 베이스를 준비하는 역할을 수행한다.
GraphState는 LangGraph 워크플로우의 Thought ‑ Action ‑ Observation 과정에서 사용자 질의, 검색 여부, 생성 결과, 참조 문서 정보를 한곳에 일관되게 보관하여 컨텍스트를 통합하는 핵심 저장소이다.
필요에 의해 새로운 필드를 추가해도 기존 코드 수정 없이 손쉽게 확장할 수 있어 유연성을 제공한다. 무엇보다 상태 객체 하나만으로도 현재까지의 질의 내용, LLM 생성 결과, 웹 검색 트리거 플래그, 관련 문서 리스트가 모두 포함되어 있으므로 디버깅과 로깅이 매우 편리하다.
# state.py
from typing import List, TypedDict
TypedDict를 상속하여, 이 클래스를 통해 생성되는 딕셔너리가 반드시 가져야 할 필수 키와 각 키의 value type을 명시한다. 이렇게 선언하면 오타 또는 잘못된 타입 할당을 사전에 검출할 수 있어 안정성을 보장한다.
# state.py
class GraphState(TypedDict):
"""
Represents the state of our graph.
Attributes:
question: question
generation: LLM generation
web_search: whether to add search
documents: list of documents
"""
question: str
generation: str
web_search: bool
documents: List[str]
question: str
: 사용자가 제시한 질의를 문자열 형태로 저장
generation: str
: LLM이 생성한 응답 텍스트를 기록하여, 이후 추가 생성이나 수정 시 참조
web_search: bool
: 에이전트가 추가 웹 검색을 수행할지 여부를 나타냄
documents: List[str]
: 현재 컨텍스트에서 참조할 문서의 URL이나 식별자를 문자열 리스트로 관리
LangGraph 워크플로우에서 Retrieve 단계는 LLM에 제공할 문맥을 실제로 가져오는 핵심 역할을 수행한다.
사용자의 질문만으로는 충분한 정보를 담보하기 어려우므로, 이 단계에서는 벡터 스토어에 저장된 과거 지식 조각을 검색하여 LLM이 참조할 추가 정보를 확보한다.
또한, 검색 기능을 별도 Node로 분리하여 평가와 디버깅이 용이해지고, 필요한 경우 다른 프로젝트나 테스트 환경에서도 재활용할 수 있어 전체 파이프라인을 모듈화·유연하게 관리할 수 있다.
① import
# retrieve.py
from typing import Any, Dict
from graph.state import GraphState
from ingestion import retriever
② 함수
# retrieve.py
def retrieve(state: GraphState) -> Dict[str, Any]:
print("---RETRIEVE---")
question = state["question"] # 질의 추출
documents = retriever.invoke(question) # 벡터 스토어 검색
return {"documents": documents, "question": question}
# LangGraph의 다음 노드에서는 위 반환값을 받아 문맥으로 활용한다
RAG Pipeline은 Retrieve 단계에서 벡터 유사도 기반으로 문서를 검색해 LLM에 문맥을 제공하지만, 단순 검색만으로는 질문과 무관한 노이즈가 섞일 수 있다.
이러한 불필요한 문서는 모델의 생성 과정에 혼선을 주어 답변의 정확도를 떨어뜨리고, 처리 비용을 불필요하게 증가시킨다.
따라서 검색된 문서를 질문과의 연관성 여부로 이진 판단(yes/no)하여 걸러내는 Relevance Filtering이 필요하다.
이 과정으로 LLM은 핵심적인 정보에만 집중할 수 있고, 만약 관련 문서가 부족하다면 추가 웹 검색을 자동으로 트리거하여 부족한 정보를 보완할 수 있다. 궁극적으로 정제된 Context을 제공하여 일관성 있고 고품질의 답변을 안정적으로 생성할 수 있다.
RAG 파이프라인에 공급되는 문서의 연관성을 자동으로 평가하여, 실제로 질문에 도움이 되는 문서만 후속 생성 단계에 전달하고, 관련 없는 문서가 섞여 있을 경우 추가 웹 검색을 트리거한다.
① import & LLM 초기화
# retrieval_grader.py
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field # 스키마 정의
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(temperature=0)
② Pydantic 모델 정의
# retrieval_grader.py
class GradeDocuments(BaseModel):
"""Binary score for relevance check on retrieved documents."""
binary_score: str = Field(
description="Documents are relevant to the question, 'yes' or 'no'"
)
GradeDocuments은 LLM 출력의 스키마를 정의한다.binary_score 필드를 str 타입으로 선언하여, "yes" 또는 "no"만 허용하도록 문서화 및 검증한다.③ Structured Output Wrapper 구성
# retrieval_grader.py
structured_llm_grader = llm.with_structured_output(GradeDocuments)
llm 인스턴스에 GradeDocuments schema 적용{ "binary_score": "yes" } 또는 { "binary_score": "no" } 형태로 응답하도록 한다.④ Prompt Template 정의
# retrieval_grader.py
system = """You are a grader assessing relevance of a retrieved document to a user question. \n
If the document contains keyword(s) or semantic meaning related to the question, grade it as relevant. \n
Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""
grade_prompt = ChatPromptTemplate.from_messages(
[
("system", system),
("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
]
)
System 메시지: 문서가 질문과 연관 있는지 판단하는 역할을 설명
Human 메시지: 실제 평가할 document와 question 변수를 placeholder로 삽입하도록 지정
ChatPromptTemplate.from_message()로 두 메시지를 순차적으로 조합한 Prompt Template을 만든다.
⑤ chain 구성
# retrieval_grader.py
retrieval_grader = grade_prompt | structured_llm_grader
invoke()하면, 내부적으로 프롬프트에 document, question을 채워 LLM에 전달하고, LLM의 JSON 응답을 GradeDocuments 모델로 파싱하여 돌려준다.① 환경변수 로드 및 import
# test_chains.py
from dotenv import load_dotenv
load_dotenv()
from graph.chains.retrieval_grader import GradeDocuments, retrieval_grader
from ingestion import retriever
② 긍정 사례 테스트(yes)
# test_chains.py
def test_retrival_grader_answer_yes() -> None:
question = "agent memory"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
res: GradeDocuments = retrieval_grader.invoke(
{"question": question, "document": doc_txt}
)
assert res.binary_score == "yes"
binary_score가 "yes"인지 확인하여, 관련 문서를 제대로 식별하는지 검증③ 부정 사례 테스트(no)
# test_chains.py
def test_retrival_grader_answer_no() -> None:
question = "agent memory"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
res: GradeDocuments = retrieval_grader.invoke(
{"question": "how to make pizaa", "document": doc_txt}
)
assert res.binary_score == "no"
binary_score가 "no"인지 확인하여, 관련 없는 문서를 올바르게 걸러내는지 검증이렇게 두 가지 시나리오를 통해, retrieval_grader chain이 질문과 문서 간의 연관성을 정확하게 이진 분류(yes/no)하는지 자동으로 확인한다. 테스트가 모두 통과하면, 코드 변경이나 라이브러리 업데이트에도 해당 기능의 안정성이 보장된다는 것을 의미한다.
본 code는 Retrieve 단계에서 얻은 문서 리스트를 순회하며, 각 문서의 질문 연관성을 retrieval_grader로 평가한다.
그래서 관련 문서만 남기고 연관 없는 문서가 하나라도 있으면 web_search 플래그를 True로 설정해 추가 검색을 트리거하는 워크플로우 상태 업데이트 노드이다.
① import
# grade_documents.py
from typing import Any, Dict
from graph.chains.retrieval_grader import retrieval_grader
from graph.state import GraphState
② function
# grade_documents.py
def grade_documents(state: GraphState) -> Dict[str, Any]:
"""
Determines whether the retrieved documents are relevant to the question
If any document is not relevant, we will set a flag to run web search
Args:
state (dict): The current graph state
Returns:
state (dict): Filtered out irrelevant documents and updated web_search state
"""
print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
question = state["question"]
documents = state["documents"]
filtered_docs = []
web_search = False
for d in documents:
score = retrieval_grader.invoke(
{"question": question, "document": d.page_content}
)
grade = score.binary_score
if grade.lower() == "yes":
print("---GRADE: DOCUMENT RELEVANT---")
filtered_docs.append(d)
else:
print("---GRADE: DOCUMENT NOT RELEVANT---")
web_search = True
continue
return {"documents": filtered_docs, "question": question, "web_search": web_search}
grade_documents 함수는 Retrieve 단계에서 넘어온 문서 리스트를 받아, 각 문서가 질문과 연관성이 있는지 retrieval_grader로 평가한 뒤 관련 문서만 선별한다. 하나라도 무관한 문서가 있으면 web_search 플래그를 True로 설정하여 추가 정보를 가져올 준비를 한다.
내부적으로는 상태 객체에서 질문과 문서 리스트를 꺼내 로그를 출력한 뒤, 반복문을 돌며 “yes”로 판정된 문서만 filtered_docs에 모으고 “no”가 나오면 웹 검색 필요 여부를 표시한다. 최종적으로는 필터링된 문서 리스트, 원본 질문, 그리고 웹 검색 플래그를 담은 새 상태를 반환하여 후속 노드가 이를 활용할 수 있도록 한다.
본 코드는 기존 벡터 스토어에서 관련 문서를 모두 걸러내고도 여전히 정보가 부족할 때, 외부 웹 검색을 통해 추가 컨텍스트를 확보하기 위해 사용된다.
LangGraph 워크플로우 내에서 web_search 노드를 실행하면, 질문을 기반으로 TavilySearch API를 호출하여 최신 웹 결과를 가져오고, 이를 Document 형태로 합쳐 기존 문서 리스트에 추가하여 후속 노드가 보다 풍부한 정보를 참조할 수 있게 한다.
① 환경 설정 및 도구 초기화
# web_search.py
from typing import Any, Dict
from langchain.schema import Document
from langchain_tavily import TavilySearch
from graph.state import GraphState
from dotenv import load_dotenv
load_dotenv()
web_search_tool = TavilySearch(max_results=3)
.env 파일에서 API 키 등을 불러오고, 최대 3 개 결과를 반환하는 TavilySearch 인스턴스를 만든다.
② 함수 시그니처
# web_search.py
def web_search(state: GraphState) -> Dict[str, Any]:
입력 state에는 question(str)과 현재까지 수집된 documents(List[Document] 또는 None)가 담겨 있다.
출력으로는 업데이트된 documents 리스트와 원본 question을 반환한다.
③ 검색 실행 및 결과 병합
# web_search.py
print("---WEB SEARCH---")
question = state["question"]
documents = state["documents"]
tavily_results = web_search_tool.invoke({"query": question})
joined_tavily_result = "\n".join(
[tavily_result["content"] for tavily_result in tavily_results["results"]]
)
web_results = Document(page_content=joined_tavily_result)
TavilySearch.invoke에 질문을 넘겨 웹 검색을 수행하고, 받은 결과들의 "content"를 줄바꿈으로 합친다.
합친 텍스트를 langchain.schema.Document 객체로 래핑한다.
④ 기존 문서 리스트에 추가
# web_search.py
if documents is not None:
documents.append(web_results)
else:
documents = [web_results]
⑤ 그 외
# web_search.py
return {"documents": documents, "question": question}
if __name__ == "__main__":
web_search(state={"question": "agent memory", "documents": None})
# generation.py
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(temperature=0)
prompt = hub.pull("rlm/rag-prompt")
generation_chain = prompt | llm | StrOutputParser()
위 코드는 LangChain Hub에 저장된 RAG용 Prompt Template을 불러와 ChatOpenAI 모델과 연결한 뒤, 최종 출력만 깔끔한 문자열로 뽑아 내는 Generation Chain을 구성한다.
<참고 자료>
https://www.udemy.com/course/langgraph/?couponCode=KEEPLEARNING
https://github.com/emarco177/langgraph-course/commit/513e3cf42b3efc809dccb8a5149f6e8181aa8e6a
https://github.com/emarco177/langgraph-course/commit/03f79ae97fa601a7cd8b8abc52855a6f8095d867
https://github.com/emarco177/langgraph-course/commit/c2d71c79a8ab15d0e619bb7ca5adc75df49143de
https://github.com/emarco177/langgraph-course/commit/9107e7a90833f9c06b2e345ad348c759c34aa1b2
https://github.com/emarco177/langgraph-course/commit/6d4fdc4193f472e57ef9f26d187dd2d9bcc0a90b