LangChain 완전 정복 시리즈 (2편)

당니·2026년 1월 20일

LLM

목록 보기
5/19
post-thumbnail

2편: LCEL로 복잡한 Chain 구성하기

시작하며

기본적인 RAG 구조만으로도 동작은 하지만, 실제 활용 사례를 살펴보면 단순한 파이프라인으로는 한계가 드러나는 경우가 많습니다. 검색 결과가 항상 기대한 만큼 정확하지 않거나, 상황에 따라 여러 데이터 소스를 동시에 조회해야 하는 요구가 생기기도 합니다.

이번 편에서는 이런 실무적인 요구를 어떻게 구조적으로 풀어낼 수 있는지를 살펴보려고 합니다. LCEL(LangChain Expression Language)을 활용해 조건에 따라 흐름을 분기하고, 여러 작업을 병렬로 처리하며, 특정 단계가 실패했을 때 대안 경로를 실행하는 방식까지 다뤄볼 예정입니다. 기본 RAG를 넘어, 보다 실전적인 파이프라인을 구성하는 데 초점을 맞춰보겠습니다.


LCEL을 사용하는 이유

처음엔 RetrievalQA만 써도 충분할 것 같았는데, 요구사항이 복잡해지면서 한계가 보이기 시작했습니다. "검색이 안 되면 다른 방법을 써주세요", "이 질문은 다른 데이터베이스에서 찾아주세요" 같은 걸 구현하려니 코드가 복잡해지더라고요.

LCEL의 주요 장점은 다음과 같습니다. 파이프(|) 연산자로 데이터 흐름을 직관적으로 볼 수 있고, 각 컴포넌트를 독립적으로 테스트할 수 있습니다. 또한 스트리밍이나 비동기 처리가 별도 코드 없이 자동으로 지원됩니다.


기본 문법 정리

1편에서 간단히 다뤘지만, 다시 한번 정리하고 넘어가겠습니다.

from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

# 파이프 연산자로 연결
simple_chain = (
    ChatPromptTemplate.from_template("Tell me a joke about {topic}")
    | ChatOpenAI(model="gpt-4o-mini")
    | StrOutputParser()
)

result = simple_chain.invoke({"topic": "programming"})
print(result)

파이프(|)는 왼쪽의 출력을 오른쪽의 입력으로 전달합니다. Unix 파이프와 같은 개념입니다.


병렬 검색 구현하기

여러 작업을 동시에 실행해야 할 때가 있습니다. 예를 들어 회사 내부 문서와 위키를 동시에 검색하는 경우입니다.

from langchain_core.runnables import RunnableParallel
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

# 두 개의 벡터 스토어 준비
company_db = Chroma(
    persist_directory="./company_docs",
    embedding_function=OpenAIEmbeddings()
)

wiki_db = Chroma(
    persist_directory="./wiki_docs",
    embedding_function=OpenAIEmbeddings()
)

company_retriever = company_db.as_retriever(search_kwargs={"k": 3})
wiki_retriever = wiki_db.as_retriever(search_kwargs={"k": 3})

def format_docs(docs):
    return "\n\n".join(f"[{i+1}] {doc.page_content}" for i, doc in enumerate(docs))

# 병렬 검색 구성
parallel_search = RunnableParallel({
    "company_docs": company_retriever | format_docs,
    "wiki_docs": wiki_retriever | format_docs,
    "question": RunnablePassthrough()
})

prompt = ChatPromptTemplate.from_template("""
회사 문서:
{company_docs}

위키 문서:
{wiki_docs}

질문: {question}

답변 시 정보의 출처를 명시해주세요.
""")

# 최종 체인
multi_source_chain = (
    parallel_search
    | prompt
    | ChatOpenAI(model="gpt-4o-mini", temperature=0)
    | StrOutputParser()
)

response = multi_source_chain.invoke("LangChain의 주요 특징은 무엇인가요?")
print(response)

두 retriever가 동시에 실행되어 검색 시간을 절반으로 줄일 수 있습니다.


조건부 라우팅 만들기

질문 유형에 따라 다른 데이터베이스를 검색하고 싶을 때 사용하는 패턴입니다. 인사 관련 질문은 HR 데이터베이스에서, 기술 관련 질문은 기술 문서에서 검색하는 방식입니다.

# 질문 분류기
classifier_prompt = ChatPromptTemplate.from_template("""
다음 질문의 카테고리를 'HR', 'TECH', 'GENERAL' 중 하나로 분류해주세요.

질문: {question}

카테고리:""")

classifier = (
    classifier_prompt
    | ChatOpenAI(model="gpt-4o-mini", temperature=0)
    | StrOutputParser()
)

# 카테고리별 라우팅 함수
def route_to_database(info):
    question = info["question"]
    category = info["category"].strip().upper()
    
    if "HR" in category:
        docs = hr_retriever.invoke(question)
    elif "TECH" in category:
        docs = tech_retriever.invoke(question)
    else:
        docs = general_retriever.invoke(question)
    
    return {
        "context": format_docs(docs),
        "question": question,
        "category": category
    }

# 전체 체인
routing_chain = (
    {
        "question": RunnablePassthrough(),
        "category": classifier
    }
    | RunnablePassthrough.assign(retrieval=route_to_database)
    | ChatPromptTemplate.from_template("""
카테고리: {category}

참고 문서:
{context}

질문: {question}

답변:""")
    | ChatOpenAI(model="gpt-4o-mini")
    | StrOutputParser()
)

print(routing_chain.invoke("연차 신청 방법을 알려주세요"))  # HR 라우팅
print(routing_chain.invoke("API 인증은 어떻게 하나요?"))  # TECH 라우팅

폴백 전략 구현하기

검색이 실패했을 때 다른 방법을 자동으로 시도하도록 구성할 수 있습니다.

from langchain_core.runnables import RunnableLambda

def check_quality(docs):
    """검색 결과 품질 확인"""
    if not docs or len(docs) == 0:
        return False
    # 실제 환경에서는 유사도 점수 등을 확인합니다
    return True

# 1차: 벡터 검색
def vector_search(question):
    docs = vectorstore.as_retriever().invoke(question)
    if check_quality(docs):
        return {"context": format_docs(docs), "method": "벡터 검색"}
    raise Exception("벡터 검색 실패")

# 2차: 키워드 검색
def keyword_search(question):
    docs = bm25_retriever.invoke(question)
    if check_quality(docs):
        return {"context": format_docs(docs), "method": "키워드 검색"}
    raise Exception("키워드 검색 실패")

# 3차: LLM 일반 지식 활용
def use_llm_knowledge(question):
    return {
        "context": "검색 결과가 없어 일반 지식으로 답변합니다.",
        "method": "LLM 지식"
    }

# 폴백 체인 구성
retrieval_with_fallback = RunnableLambda(vector_search).with_fallbacks([
    RunnableLambda(keyword_search),
    RunnableLambda(use_llm_knowledge)
])

fallback_chain = (
    {
        "question": RunnablePassthrough(),
        "retrieval": retrieval_with_fallback
    }
    | ChatPromptTemplate.from_template("""
검색 방법: {retrieval[method]}

참고 자료:
{retrieval[context]}

질문: {question}

답변:""")
    | ChatOpenAI(model="gpt-4o-mini")
    | StrOutputParser()
)

response = fallback_chain.invoke("특정 질문")

벡터 검색이 실패하면 자동으로 키워드 검색을 시도하고, 그것도 실패하면 LLM의 일반 지식으로 답변합니다.


Multi-Query 패턴

하나의 질문을 여러 방식으로 변형하여 검색하면 더 정확한 결과를 얻을 수 있습니다.

# 질문 변형 생성
multi_query_prompt = ChatPromptTemplate.from_template("""
다음 질문을 의미는 같지만 다른 표현으로 3가지 버전을 만들어주세요.

원래 질문: {question}

변형된 질문들:
1.""")

def parse_queries(output: str) -> list:
    """LLM 출력에서 질문들 추출"""
    lines = output.strip().split('\n')
    queries = []
    for line in lines:
        if line.strip() and line[0].isdigit():
            query = line.split('.', 1)[1].strip()
            queries.append(query)
    return queries[:3]

# 여러 질문으로 검색 후 결과 통합
def search_and_combine(queries: list):
    all_docs = []
    seen = set()
    
    for q in queries:
        docs = vectorstore.as_retriever().invoke(q)
        for doc in docs:
            content_hash = hash(doc.page_content)
            if content_hash not in seen:
                seen.add(content_hash)
                all_docs.append(doc)
    
    return all_docs[:10]

# Multi-Query 체인
multi_query_chain = (
    {"question": RunnablePassthrough()}
    | RunnablePassthrough.assign(
        alternative_questions=(
            multi_query_prompt
            | ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
            | StrOutputParser()
            | parse_queries
        )
    )
    | RunnablePassthrough.assign(
        documents=lambda x: search_and_combine([x["question"]] + x["alternative_questions"])
    )
    | RunnablePassthrough.assign(
        context=lambda x: format_docs(x["documents"])
    )
    | ChatPromptTemplate.from_template("""
사용된 검색 쿼리:
- {question}
{alternative_questions}

검색 결과:
{context}

원래 질문: {question}

답변:""")
    | ChatOpenAI(model="gpt-4o-mini", temperature=0)
    | StrOutputParser()
)

response = multi_query_chain.invoke("회사 복지 제도에는 어떤 것이 있나요?")
print(response)

"회사 복지 제도에는 어떤 것이 있나요?"가 "직원 혜택은 무엇인가요?", "복리후생 프로그램을 알려주세요" 등의 질문으로 변형되어 더 풍부한 검색 결과를 얻을 수 있습니다.


Self-Query Retriever

질문에서 메타데이터 필터를 자동으로 추출하여 정밀 검색을 수행하는 방식입니다.

from langchain.chains.query_constructor.base import AttributeInfo
from langchain.retrievers.self_query.base import SelfQueryRetriever

# 메타데이터 정보 정의
metadata_info = [
    AttributeInfo(
        name="department",
        description="문서가 속한 부서 (HR, Engineering, Sales 등)",
        type="string",
    ),
    AttributeInfo(
        name="year",
        description="문서 작성 연도",
        type="integer",
    ),
    AttributeInfo(
        name="doc_type",
        description="문서 유형 (policy, guide, announcement)",
        type="string",
    ),
]

# Self-Query Retriever 생성
self_query_retriever = SelfQueryRetriever.from_llm(
    llm=ChatOpenAI(model="gpt-4o-mini", temperature=0),
    vectorstore=vectorstore,
    document_contents="회사 내부 문서 및 정책",
    metadata_field_info=metadata_info,
)

# 사용 예시
docs = self_query_retriever.invoke("2023년 HR 부서의 휴가 정책을 알려주세요")
# 자동으로 department='HR', year=2023 필터가 적용됩니다

print(format_docs(docs))

LLM이 질문을 분석하여 "department='HR', year=2023" 같은 필터 조건을 자동으로 추출합니다.


답변 검증 파이프라인

RAG의 주요 문제 중 하나인 환각(hallucination)을 방지하기 위한 검증 체인입니다.

# 답변 생성
answer_prompt = ChatPromptTemplate.from_template("""
참고 자료: {context}
질문: {question}

답변:""")

# 답변 검증
verification_prompt = ChatPromptTemplate.from_template("""
다음 답변이 제공된 참고 자료에만 기반하고 있는지 확인해주세요.

참고 자료:
{context}

답변:
{answer}

이 답변은 참고 자료에만 기반하고 있나요? YES 또는 NO로 답하고 간단한 이유를 작성해주세요.
""")

def process_verification(info):
    verification = info["verification"].upper()
    
    if "YES" in verification:
        return info["answer"]
    else:
        return f"⚠️ 검증되지 않은 답변\n\n{info['answer']}\n\n검증 메모: {verification}"

# 검증 체인
verified_chain = (
    {
        "context": vectorstore.as_retriever() | format_docs,
        "question": RunnablePassthrough()
    }
    | RunnablePassthrough.assign(
        answer=(
            answer_prompt
            | ChatOpenAI(model="gpt-4o-mini", temperature=0)
            | StrOutputParser()
        )
    )
    | RunnablePassthrough.assign(
        verification=(
            verification_prompt
            | ChatOpenAI(model="gpt-4o-mini", temperature=0)
            | StrOutputParser()
        )
    )
    | RunnableLambda(process_verification)
)

response = verified_chain.invoke("CEO의 이름은 무엇인가요?")
print(response)

스트리밍과 배치 처리

LCEL의 장점 중 하나는 별도 코드 없이 스트리밍과 배치 처리를 지원한다는 점입니다.

# 스트리밍 응답
for chunk in multi_query_chain.stream("회사 복지 제도는?"):
    print(chunk, end="", flush=True)

# 여러 질문 동시 처리
questions = [
    "연차는 며칠인가요?",
    "재택근무가 가능한가요?",
    "점심 식사가 제공되나요?"
]

results = multi_query_chain.batch(questions)
for q, r in zip(questions, results):
    print(f"질문: {q}\n답변: {r}\n")

비동기 실행

대량의 요청을 처리할 때는 비동기 방식이 효율적입니다.

import asyncio

async def process_multiple_questions():
    questions = ["질문1", "질문2", "질문3"]
    results = await multi_query_chain.abatch(questions)
    
    for q, r in zip(questions, results):
        print(f"질문: {q}\n답변: {r}\n")

asyncio.run(process_multiple_questions())

디버깅과 모니터링

복잡한 체인의 디버깅을 위해 LangSmith를 활용할 수 있습니다.

import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"

# 이제 모든 체인 실행이 LangSmith에 기록됩니다
response = multi_query_chain.invoke("질문")

LangSmith 대시보드에서 각 단계의 입출력, 실행 시간, 토큰 사용량을 확인할 수 있습니다.


실전 활용 팁

체인 설계 원칙

체인이 너무 복잡해지면 여러 개의 작은 체인으로 나누는 것이 좋습니다. 테스트와 유지보수가 훨씬 쉬워집니다.

타입 힌트 활용

from typing import TypedDict

class RAGInput(TypedDict):
    question: str

class RAGOutput(TypedDict):
    answer: str
    sources: list[str]

타입 힌트를 사용하면 체인의 입출력 구조를 명확하게 정의할 수 있습니다.

안정성 향상

with_fallbacks()를 적극 활용하여 시스템의 안정성을 높이세요.

성능 측정

LangSmith나 커스텀 콜백을 사용하여 각 단계의 실행 시간을 측정하고 병목 지점을 파악하세요.


마무리하며

LCEL은 처음에는 다소 낯설게 느껴질 수 있지만, 익숙해지면 복잡한 로직을 매우 간결하게 표현할 수 있습니다. 마치 레고 블록을 조립하듯 각 컴포넌트를 조합하여 강력한 시스템을 구축할 수 있습니다.

이번 편에서 다룬 패턴들은 실제 프로젝트에서 바로 활용 가능합니다. 멀티 소스 검색, 조건부 라우팅, 폴백 전략, Multi-Query 등은 프로덕션 수준의 RAG 시스템에서 자주 사용되는 패턴들입니다.


이제 복잡한 파이프라인을 구성하는 방법을 익혔으니, 다음 단계로 넘어가볼까요?

3편 "LangChain Agent 실전 활용"에서는 스스로 도구를 선택하고 계획을 세워 실행하는 Agent를 만들어보겠습니다. 데이터베이스 조회, API 호출, 파일 처리 등 실제 업무를 자동화하는 Agent의 세계로 들어가봅시다.

다음 편에서 뵙겠습니다!

profile
👩🏻‍💻

0개의 댓글