[LangChain] LangChain 완전 정복 시리즈 (4편)

당니·2026년 1월 20일

LLM

목록 보기
8/19
post-thumbnail

4편: 실전 프로젝트 - 사내 문서 검색 챗봇 구축기

시작하며

지금까지 RAG, LCEL, Agent를 각각 배웠는데요, 이제 이 모든 것을 종합해서 실제로 사용 가능한 시스템을 만들어보겠습니다. 단순히 코드만 작성하는 게 아니라 기획부터 배포까지 전체 과정을 다룹니다.

이번 프로젝트의 목표는 회사 내부 문서(정책, 매뉴얼, 공지사항 등)를 자동으로 검색하고 답변하는 챗봇입니다. 직원들이 "연차 신청 어떻게 해요?", "회의실 예약 방법은?" 같은 질문을 하면 관련 문서를 찾아서 답변해주는 시스템이죠.


프로젝트 요구사항 정리

핵심 기능

먼저 어떤 기능이 필요한지 정리해봤습니다.

다양한 형식의 문서 처리가 필요합니다. PDF, Word, Excel, 마크다운, 웹페이지 등 회사에서 사용하는 모든 형식을 지원해야 하고요. 자연어로 질문하면 관련 문서를 검색해서 답변하는 기능이 기본입니다. 검색 결과가 없으면 "모르겠습니다"라고 솔직하게 말해야 하고요.

답변의 출처를 명확히 보여줘야 합니다. "어디서 찾은 정보인지" 알려줘야 신뢰도가 높아지거든요. 대화 기록을 유지해서 맥락 있는 대화가 가능해야 하고, 특정 부서나 날짜로 필터링하는 기능도 있으면 좋겠습니다.

기술 스택

프로젝트에 사용할 기술 스택입니다.

백엔드는 FastAPI로 REST API를 만들고, LangChain으로 RAG 파이프라인을 구성합니다. 벡터 데이터베이스는 Qdrant를 사용하고요(무료고 로컬 실행이 가능해서), LLM은 OpenAI GPT-4를 사용하되 비용이 부담되면 GPT-3.5로 전환할 수 있게 만들겠습니다.

프론트엔드는 간단하게 Streamlit으로 웹 인터페이스를 만들고, 배포는 Docker로 컨테이너화해서 AWS나 사내 서버에 올릴 수 있게 준비합니다.


프로젝트 구조

먼저 전체 프로젝트 구조를 잡겠습니다.

chatbot/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI 앱
│   ├── config.py            # 설정
│   ├── models.py            # 데이터 모델
│   └── api/
│       ├── __init__.py
│       ├── chat.py          # 채팅 엔드포인트
│       └── documents.py     # 문서 관리 엔드포인트
├── core/
│   ├── __init__.py
│   ├── document_processor.py   # 문서 처리
│   ├── vectorstore.py          # 벡터 스토어 관리
│   ├── rag_chain.py            # RAG 체인
│   └── agent.py                # Agent 로직
├── utils/
│   ├── __init__.py
│   ├── file_handlers.py     # 파일 핸들러
│   └── logging.py           # 로깅
├── data/
│   ├── raw/                 # 원본 문서
│   ├── processed/           # 처리된 문서
│   └── vectorstore/         # 벡터 DB
├── frontend/
│   └── streamlit_app.py     # Streamlit UI
├── tests/
│   └── ...
├── requirements.txt
├── Dockerfile
└── docker-compose.yml

1단계: 문서 처리 파이프라인

다양한 형식의 문서를 처리하는 모듈부터 만들겠습니다.

# core/document_processor.py
from langchain_community.document_loaders import (
    PyPDFLoader,
    UnstructuredWordDocumentLoader,
    UnstructuredExcelLoader,
    UnstructuredMarkdownLoader,
    WebBaseLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List
import os
from pathlib import Path

class DocumentProcessor:
    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
        )
    
    def load_document(self, file_path: str):
        """파일 형식에 따라 적절한 로더 선택"""
        file_ext = Path(file_path).suffix.lower()
        
        loaders = {
            '.pdf': PyPDFLoader,
            '.docx': UnstructuredWordDocumentLoader,
            '.doc': UnstructuredWordDocumentLoader,
            '.xlsx': UnstructuredExcelLoader,
            '.xls': UnstructuredExcelLoader,
            '.md': UnstructuredMarkdownLoader,
        }
        
        loader_class = loaders.get(file_ext)
        if not loader_class:
            raise ValueError(f"지원하지 않는 파일 형식: {file_ext}")
        
        loader = loader_class(file_path)
        return loader.load()
    
    def process_documents(self, file_paths: List[str]):
        """여러 문서를 처리하고 청크로 분할"""
        all_documents = []
        
        for file_path in file_paths:
            try:
                print(f"처리 중: {file_path}")
                docs = self.load_document(file_path)
                
                # 메타데이터 추가
                for doc in docs:
                    doc.metadata['source'] = file_path
                    doc.metadata['filename'] = os.path.basename(file_path)
                    # 부서, 날짜 등 추가 메타데이터는 파일명이나 내용에서 추출
                    doc.metadata['department'] = self._extract_department(file_path)
                    doc.metadata['doc_type'] = self._extract_doc_type(file_path)
                
                all_documents.extend(docs)
            except Exception as e:
                print(f"오류 발생 ({file_path}): {e}")
                continue
        
        # 텍스트 분할
        splits = self.text_splitter.split_documents(all_documents)
        print(f"총 {len(splits)}개 청크 생성됨")
        
        return splits
    
    def _extract_department(self, file_path: str) -> str:
        """파일 경로에서 부서 정보 추출"""
        # 예: data/raw/HR/policy.pdf -> HR
        parts = Path(file_path).parts
        if len(parts) > 2:
            return parts[-2]
        return "general"
    
    def _extract_doc_type(self, file_path: str) -> str:
        """파일명에서 문서 유형 추출"""
        filename = os.path.basename(file_path).lower()
        
        if 'policy' in filename or '정책' in filename:
            return 'policy'
        elif 'guide' in filename or '가이드' in filename:
            return 'guide'
        elif 'announcement' in filename or '공지' in filename:
            return 'announcement'
        else:
            return 'general'

2단계: 벡터 스토어 구축

처리된 문서를 벡터 데이터베이스에 저장합니다.

# core/vectorstore.py
from langchain_community.vectorstores import Qdrant
from langchain_openai import OpenAIEmbeddings
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams
from typing import List, Optional
import os

class VectorStoreManager:
    def __init__(
        self,
        collection_name: str = "company_docs",
        persist_directory: str = "./data/vectorstore"
    ):
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.embeddings = OpenAIEmbeddings()
        
        # Qdrant 클라이언트 초기화
        self.client = QdrantClient(path=persist_directory)
        
    def create_vectorstore(self, documents: List):
        """문서로부터 벡터 스토어 생성"""
        # 컬렉션이 이미 있으면 삭제
        try:
            self.client.delete_collection(self.collection_name)
        except:
            pass
        
        # 벡터 스토어 생성
        vectorstore = Qdrant.from_documents(
            documents=documents,
            embedding=self.embeddings,
            path=self.persist_directory,
            collection_name=self.collection_name,
        )
        
        print(f"벡터 스토어 생성 완료: {len(documents)}개 문서")
        return vectorstore
    
    def load_vectorstore(self):
        """기존 벡터 스토어 로드"""
        vectorstore = Qdrant(
            client=self.client,
            collection_name=self.collection_name,
            embeddings=self.embeddings,
        )
        return vectorstore
    
    def add_documents(self, documents: List):
        """기존 벡터 스토어에 문서 추가"""
        vectorstore = self.load_vectorstore()
        vectorstore.add_documents(documents)
        print(f"{len(documents)}개 문서 추가 완료")
    
    def search(
        self,
        query: str,
        k: int = 4,
        filter: Optional[dict] = None
    ):
        """벡터 스토어 검색"""
        vectorstore = self.load_vectorstore()
        
        if filter:
            # 메타데이터 필터링
            results = vectorstore.similarity_search(
                query,
                k=k,
                filter=filter
            )
        else:
            results = vectorstore.similarity_search(query, k=k)
        
        return results

3단계: RAG 체인 구성

고급 RAG 기능을 구현합니다.

# core/rag_chain.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain.memory import ConversationBufferMemory
from typing import List, Dict

class RAGChain:
    def __init__(self, vectorstore_manager, model_name: str = "gpt-4o-mini"):
        self.vectorstore_manager = vectorstore_manager
        self.llm = ChatOpenAI(model=model_name, temperature=0)
        self.memory = ConversationBufferMemory(
            return_messages=True,
            memory_key="chat_history"
        )
        
        # 프롬프트 템플릿
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """당신은 회사 내부 문서를 검색하여 답변하는 AI 어시스턴트입니다.

주요 원칙:
1. 제공된 문서에만 기반하여 답변하세요
2. 문서에 정보가 없으면 솔직히 "모르겠습니다"라고 말하세요
3. 답변의 출처를 명시하세요 (파일명 또는 문서 제목)
4. 친절하고 명확하게 답변하세요

참고 문서:
{context}"""),
            MessagesPlaceholder(variable_name="chat_history"),
            ("user", "{question}")
        ])
    
    def format_docs(self, docs: List) -> str:
        """문서를 포맷팅"""
        formatted = []
        for i, doc in enumerate(docs, 1):
            source = doc.metadata.get('filename', '알 수 없음')
            content = doc.page_content
            formatted.append(f"[문서 {i}: {source}]\n{content}")
        
        return "\n\n".join(formatted)
    
    def get_retriever(self, k: int = 4, filter: dict = None):
        """Retriever 반환"""
        vectorstore = self.vectorstore_manager.load_vectorstore()
        
        if filter:
            retriever = vectorstore.as_retriever(
                search_kwargs={"k": k, "filter": filter}
            )
        else:
            retriever = vectorstore.as_retriever(search_kwargs={"k": k})
        
        return retriever
    
    def create_chain(self, filter: dict = None):
        """RAG 체인 생성"""
        retriever = self.get_retriever(filter=filter)
        
        chain = (
            {
                "context": retriever | self.format_docs,
                "question": RunnablePassthrough(),
                "chat_history": lambda x: self.memory.load_memory_variables({})["chat_history"]
            }
            | self.prompt
            | self.llm
            | StrOutputParser()
        )
        
        return chain
    
    def invoke(self, question: str, filter: dict = None) -> Dict:
        """질문에 대한 답변 생성"""
        chain = self.create_chain(filter=filter)
        
        # 관련 문서 검색
        retriever = self.get_retriever(filter=filter)
        source_docs = retriever.invoke(question)
        
        # 답변 생성
        answer = chain.invoke(question)
        
        # 메모리 업데이트
        self.memory.save_context(
            {"input": question},
            {"output": answer}
        )
        
        # 출처 정보 추출
        sources = [
            {
                "filename": doc.metadata.get('filename', '알 수 없음'),
                "department": doc.metadata.get('department', 'general'),
                "content_preview": doc.page_content[:200]
            }
            for doc in source_docs
        ]
        
        return {
            "answer": answer,
            "sources": sources,
            "question": question
        }
    
    def clear_history(self):
        """대화 기록 초기화"""
        self.memory.clear()

4단계: Multi-Query 및 검증 추가

검색 정확도를 높이고 답변을 검증하는 기능을 추가합니다.

# core/rag_chain.py에 추가

class AdvancedRAGChain(RAGChain):
    def __init__(self, vectorstore_manager, model_name: str = "gpt-4o-mini"):
        super().__init__(vectorstore_manager, model_name)
        
        # Multi-Query 프롬프트
        self.multi_query_prompt = ChatPromptTemplate.from_template("""
사용자 질문을 다른 방식으로 3가지 버전을 만들어주세요.
의미는 같지만 표현을 다르게 해주세요.

원래 질문: {question}

변형된 질문들 (번호 없이, 한 줄에 하나씩):""")
        
        # 검증 프롬프트
        self.verification_prompt = ChatPromptTemplate.from_template("""
다음 답변이 제공된 참고 문서에만 기반하고 있는지 확인해주세요.

참고 문서:
{context}

답변:
{answer}

이 답변은 참고 문서에만 기반하고 있나요? 
YES 또는 NO로 답하고, NO인 경우 어느 부분이 문제인지 설명해주세요.""")
    
    def generate_alternative_queries(self, question: str) -> List[str]:
        """질문의 변형 생성"""
        chain = self.multi_query_prompt | self.llm | StrOutputParser()
        result = chain.invoke({"question": question})
        
        # 각 줄을 쿼리로 분리
        queries = [q.strip() for q in result.split('\n') if q.strip()]
        return [question] + queries[:3]  # 원래 질문 + 변형 3개
    
    def multi_query_search(self, question: str, filter: dict = None) -> List:
        """Multi-Query 검색"""
        queries = self.generate_alternative_queries(question)
        print(f"검색 쿼리들: {queries}")
        
        all_docs = []
        seen_content = set()
        
        for query in queries:
            docs = self.vectorstore_manager.search(query, k=3, filter=filter)
            for doc in docs:
                content_hash = hash(doc.page_content)
                if content_hash not in seen_content:
                    seen_content.add(content_hash)
                    all_docs.append(doc)
        
        # 상위 10개만 반환
        return all_docs[:10]
    
    def verify_answer(self, answer: str, context: str) -> Dict:
        """답변 검증"""
        chain = self.verification_prompt | self.llm | StrOutputParser()
        verification = chain.invoke({
            "context": context,
            "answer": answer
        })
        
        is_valid = "YES" in verification.upper()
        
        return {
            "is_valid": is_valid,
            "verification_message": verification
        }
    
    def invoke_with_verification(self, question: str, filter: dict = None) -> Dict:
        """검증 포함 답변 생성"""
        # Multi-Query로 문서 검색
        docs = self.multi_query_search(question, filter=filter)
        context = self.format_docs(docs)
        
        # 답변 생성
        answer_chain = self.prompt | self.llm | StrOutputParser()
        answer = answer_chain.invoke({
            "context": context,
            "question": question,
            "chat_history": self.memory.load_memory_variables({})["chat_history"]
        })
        
        # 검증
        verification = self.verify_answer(answer, context)
        
        # 메모리 업데이트
        self.memory.save_context(
            {"input": question},
            {"output": answer}
        )
        
        # 출처 정보
        sources = [
            {
                "filename": doc.metadata.get('filename', '알 수 없음'),
                "department": doc.metadata.get('department', 'general'),
                "content_preview": doc.page_content[:200]
            }
            for doc in docs[:5]  # 상위 5개만
        ]
        
        result = {
            "answer": answer,
            "sources": sources,
            "question": question,
            "verified": verification["is_valid"]
        }
        
        if not verification["is_valid"]:
            result["warning"] = "⚠️ 이 답변은 제공된 문서만으로 검증되지 않았습니다."
        
        return result

5단계: FastAPI 서버 구축

RESTful API를 만듭니다.

# app/main.py
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional, Dict
import os
import shutil
from core.document_processor import DocumentProcessor
from core.vectorstore import VectorStoreManager
from core.rag_chain import AdvancedRAGChain

app = FastAPI(title="사내 문서 검색 챗봇")

# CORS 설정
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 전역 객체
vectorstore_manager = VectorStoreManager()
rag_chain = AdvancedRAGChain(vectorstore_manager)

# 요청/응답 모델
class ChatRequest(BaseModel):
    question: str
    department: Optional[str] = None
    use_verification: bool = True

class ChatResponse(BaseModel):
    answer: str
    sources: List[Dict]
    verified: Optional[bool] = None
    warning: Optional[str] = None

class IndexRequest(BaseModel):
    file_paths: List[str]

@app.get("/")
async def root():
    return {"message": "사내 문서 검색 챗봇 API"}

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """채팅 엔드포인트"""
    try:
        # 부서 필터
        filter_dict = None
        if request.department:
            filter_dict = {"department": request.department}
        
        # 답변 생성
        if request.use_verification:
            result = rag_chain.invoke_with_verification(
                request.question,
                filter=filter_dict
            )
        else:
            result = rag_chain.invoke(
                request.question,
                filter=filter_dict
            )
        
        return ChatResponse(**result)
    
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/upload")
async def upload_document(file: UploadFile = File(...)):
    """문서 업로드"""
    try:
        # 파일 저장
        upload_dir = "data/raw"
        os.makedirs(upload_dir, exist_ok=True)
        
        file_path = os.path.join(upload_dir, file.filename)
        with open(file_path, "wb") as buffer:
            shutil.copyfileobj(file.file, buffer)
        
        return {"message": f"{file.filename} 업로드 완료", "path": file_path}
    
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/index")
async def index_documents(request: IndexRequest):
    """문서 인덱싱"""
    try:
        processor = DocumentProcessor()
        documents = processor.process_documents(request.file_paths)
        
        vectorstore_manager.create_vectorstore(documents)
        
        return {
            "message": "인덱싱 완료",
            "document_count": len(documents)
        }
    
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.delete("/history")
async def clear_history():
    """대화 기록 초기화"""
    rag_chain.clear_history()
    return {"message": "대화 기록이 초기화되었습니다"}

@app.get("/health")
async def health_check():
    """헬스 체크"""
    return {"status": "healthy"}

6단계: Streamlit 프론트엔드

사용자 친화적인 웹 인터페이스를 만듭니다.

# frontend/streamlit_app.py
import streamlit as st
import requests
import json

API_URL = "http://localhost:8000"

st.set_page_config(
    page_title="사내 문서 검색 챗봇",
    page_icon="🤖",
    layout="wide"
)

st.title("🤖 사내 문서 검색 챗봇")

# 사이드바
with st.sidebar:
    st.header("설정")
    
    department = st.selectbox(
        "부서 필터",
        ["전체", "HR", "Engineering", "Sales", "Marketing"]
    )
    
    use_verification = st.checkbox("답변 검증 사용", value=True)
    
    if st.button("대화 기록 초기화"):
        requests.delete(f"{API_URL}/history")
        st.success("대화 기록이 초기화되었습니다")
        st.rerun()
    
    st.divider()
    
    st.header("문서 업로드")
    uploaded_file = st.file_uploader(
        "문서 업로드",
        type=['pdf', 'docx', 'xlsx', 'md']
    )
    
    if uploaded_file and st.button("업로드"):
        files = {"file": uploaded_file}
        response = requests.post(f"{API_URL}/upload", files=files)
        if response.status_code == 200:
            st.success("업로드 완료!")
        else:
            st.error("업로드 실패")

# 메인 채팅 영역
if "messages" not in st.session_state:
    st.session_state.messages = []

# 채팅 기록 표시
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.markdown(message["content"])
        
        if message["role"] == "assistant" and "sources" in message:
            with st.expander("📚 참고 문서"):
                for i, source in enumerate(message["sources"], 1):
                    st.write(f"**{i}. {source['filename']}** ({source['department']})")
                    st.caption(source['content_preview'] + "...")

# 사용자 입력
if prompt := st.chat_input("질문을 입력하세요"):
    # 사용자 메시지 표시
    st.session_state.messages.append({"role": "user", "content": prompt})
    with st.chat_message("user"):
        st.markdown(prompt)
    
    # API 호출
    with st.chat_message("assistant"):
        with st.spinner("답변 생성 중..."):
            try:
                response = requests.post(
                    f"{API_URL}/chat",
                    json={
                        "question": prompt,
                        "department": None if department == "전체" else department,
                        "use_verification": use_verification
                    }
                )
                
                if response.status_code == 200:
                    result = response.json()
                    
                    # 경고 표시
                    if result.get("warning"):
                        st.warning(result["warning"])
                    
                    # 답변 표시
                    st.markdown(result["answer"])
                    
                    # 출처 표시
                    with st.expander("📚 참고 문서"):
                        for i, source in enumerate(result["sources"], 1):
                            st.write(f"**{i}. {source['filename']}** ({source['department']})")
                            st.caption(source['content_preview'] + "...")
                    
                    # 메시지 저장
                    st.session_state.messages.append({
                        "role": "assistant",
                        "content": result["answer"],
                        "sources": result["sources"]
                    })
                else:
                    st.error(f"오류 발생: {response.text}")
            
            except Exception as e:
                st.error(f"오류 발생: {str(e)}")

7단계: 설정 관리

환경 변수와 설정을 관리합니다.

# app/config.py
from pydantic_settings import BaseSettings
from typing import Optional

class Settings(BaseSettings):
    # OpenAI
    openai_api_key: str
    
    # 모델 설정
    model_name: str = "gpt-4o-mini"
    embedding_model: str = "text-embedding-3-small"
    
    # 청크 설정
    chunk_size: int = 1000
    chunk_overlap: int = 200
    
    # 검색 설정
    retrieval_k: int = 4
    
    # 벡터 스토어
    vectorstore_path: str = "./data/vectorstore"
    collection_name: str = "company_docs"
    
    # API 설정
    api_host: str = "0.0.0.0"
    api_port: int = 8000
    
    class Config:
        env_file = ".env"

settings = Settings()
# .env 파일
OPENAI_API_KEY=your-api-key-here
MODEL_NAME=gpt-4o-mini
CHUNK_SIZE=1000
CHUNK_OVERLAP=200

8단계: Docker 설정

배포를 위한 Docker 설정입니다.

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 시스템 의존성 설치
RUN apt-get update && apt-get install -y \
    build-essential \
    libpq-dev \
    && rm -rf /var/lib/apt/lists/*

# Python 의존성 설치
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 앱 파일 복사
COPY app/ ./app/
COPY core/ ./core/
COPY utils/ ./utils/

# 데이터 디렉토리 생성
RUN mkdir -p data/raw data/processed data/vectorstore

EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# Dockerfile.streamlit
FROM python:3.11-slim

WORKDIR /app

# Python 의존성 설치
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 프론트엔드 파일 복사
COPY frontend/ ./frontend/

EXPOSE 8501

CMD ["streamlit", "run", "frontend/streamlit_app.py", "--server.port=8501", "--server.address=0.0.0.0"]
# docker-compose.yml
version: '3.8'

services:
  api:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: chatbot-api
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - MODEL_NAME=${MODEL_NAME:-gpt-4o-mini}
    volumes:
      - ./data:/app/data
      - ./logs:/app/logs
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
  
  frontend:
    build:
      context: .
      dockerfile: Dockerfile.streamlit
    container_name: chatbot-frontend
    ports:
      - "8501:8501"
    depends_on:
      - api
    environment:
      - API_URL=http://api:8000
    restart: unless-stopped

volumes:
  data:
  logs:
# requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
langchain==0.1.0
langchain-openai==0.0.2
langchain-community==0.0.10
qdrant-client==1.7.0
python-multipart==0.0.6
pydantic==2.5.0
pydantic-settings==2.1.0
python-dotenv==1.0.0
streamlit==1.29.0
requests==2.31.0
pypdf==3.17.1
unstructured==0.11.2
python-docx==1.1.0
openpyxl==3.1.2
tiktoken==0.5.2

9단계: 로깅 및 모니터링

운영 환경에서 필수적인 로깅과 모니터링을 추가합니다.

# utils/logging.py
import logging
from logging.handlers import RotatingFileHandler
import os
from datetime import datetime

def setup_logger(name: str, log_file: str = None):
    """로거 설정"""
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    
    # 포맷 설정
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # 콘솔 핸들러
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    
    # 파일 핸들러 (옵션)
    if log_file:
        os.makedirs('logs', exist_ok=True)
        file_handler = RotatingFileHandler(
            f'logs/{log_file}',
            maxBytes=10485760,  # 10MB
            backupCount=5
        )
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)
    
    return logger
# app/main.py에 로깅 추가
from utils.logging import setup_logger
import time

logger = setup_logger('chatbot_api', 'api.log')

# 미들웨어로 요청/응답 로깅
@app.middleware("http")
async def log_requests(request, call_next):
    start_time = time.time()
    
    logger.info(f"요청 시작: {request.method} {request.url.path}")
    
    response = await call_next(request)
    
    duration = time.time() - start_time
    logger.info(f"요청 완료: {request.method} {request.url.path} - {response.status_code} - {duration:.2f}s")
    
    return response

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """채팅 엔드포인트"""
    logger.info(f"채팅 요청: {request.question[:50]}...")
    
    try:
        filter_dict = None
        if request.department:
            filter_dict = {"department": request.department}
            logger.info(f"부서 필터 적용: {request.department}")
        
        if request.use_verification:
            result = rag_chain.invoke_with_verification(
                request.question,
                filter=filter_dict
            )
        else:
            result = rag_chain.invoke(
                request.question,
                filter=filter_dict
            )
        
        logger.info(f"답변 생성 완료 (검증: {result.get('verified', 'N/A')})")
        return ChatResponse(**result)
    
    except Exception as e:
        logger.error(f"채팅 오류: {str(e)}", exc_info=True)
        raise HTTPException(status_code=500, detail=str(e))

10단계: 성능 최적화

캐싱과 배치 처리로 성능을 개선합니다.

# utils/cache.py
from functools import lru_cache
import hashlib
import json
import os

class ResponseCache:
    """응답 캐싱"""
    def __init__(self, cache_dir: str = "data/cache"):
        self.cache_dir = cache_dir
        os.makedirs(cache_dir, exist_ok=True)
    
    def _get_cache_key(self, question: str, filter_dict: dict = None) -> str:
        """캐시 키 생성"""
        data = {
            "question": question,
            "filter": filter_dict
        }
        data_str = json.dumps(data, sort_keys=True)
        return hashlib.md5(data_str.encode()).hexdigest()
    
    def get(self, question: str, filter_dict: dict = None):
        """캐시에서 가져오기"""
        key = self._get_cache_key(question, filter_dict)
        cache_file = os.path.join(self.cache_dir, f"{key}.json")
        
        if os.path.exists(cache_file):
            with open(cache_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        return None
    
    def set(self, question: str, result: dict, filter_dict: dict = None):
        """캐시에 저장"""
        key = self._get_cache_key(question, filter_dict)
        cache_file = os.path.join(self.cache_dir, f"{key}.json")
        
        with open(cache_file, 'w', encoding='utf-8') as f:
            json.dump(result, f, ensure_ascii=False, indent=2)
    
    def clear(self):
        """캐시 전체 삭제"""
        for file in os.listdir(self.cache_dir):
            os.remove(os.path.join(self.cache_dir, file))
# app/main.py에 캐싱 추가
from utils.cache import ResponseCache

cache = ResponseCache()

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """채팅 엔드포인트 (캐싱 포함)"""
    logger.info(f"채팅 요청: {request.question[:50]}...")
    
    try:
        filter_dict = None
        if request.department:
            filter_dict = {"department": request.department}
        
        # 캐시 확인
        cached_result = cache.get(request.question, filter_dict)
        if cached_result:
            logger.info("캐시에서 응답 반환")
            return ChatResponse(**cached_result)
        
        # 답변 생성
        if request.use_verification:
            result = rag_chain.invoke_with_verification(
                request.question,
                filter=filter_dict
            )
        else:
            result = rag_chain.invoke(
                request.question,
                filter=filter_dict
            )
        
        # 캐시에 저장
        cache.set(request.question, result, filter_dict)
        
        logger.info(f"답변 생성 완료")
        return ChatResponse(**result)
    
    except Exception as e:
        logger.error(f"채팅 오류: {str(e)}", exc_info=True)
        raise HTTPException(status_code=500, detail=str(e))

@app.delete("/cache")
async def clear_cache():
    """캐시 삭제"""
    cache.clear()
    return {"message": "캐시가 삭제되었습니다"}

11단계: 배치 처리 및 스케줄링

정기적인 문서 업데이트를 자동화합니다.

# scripts/scheduled_indexing.py
import schedule
import time
from pathlib import Path
from core.document_processor import DocumentProcessor
from core.vectorstore import VectorStoreManager
from utils.logging import setup_logger
from datetime import datetime

logger = setup_logger('scheduled_indexing', 'scheduled_indexing.log')

def update_index():
    """새로운 문서 인덱싱"""
    logger.info("스케줄된 인덱싱 시작")
    
    try:
        # 새로운 문서 찾기
        raw_dir = Path("data/raw")
        processed_dir = Path("data/processed")
        
        new_files = []
        for ext in ['*.pdf', '*.docx', '*.xlsx', '*.md']:
            for file_path in raw_dir.rglob(ext):
                # 처리되지 않은 파일만
                marker_file = processed_dir / f"{file_path.name}.processed"
                if not marker_file.exists():
                    new_files.append(str(file_path))
        
        if not new_files:
            logger.info("새로운 문서 없음")
            return
        
        logger.info(f"{len(new_files)}개 새 문서 발견")
        
        # 문서 처리
        processor = DocumentProcessor()
        documents = processor.process_documents(new_files)
        
        # 벡터 스토어에 추가
        vectorstore_manager = VectorStoreManager()
        vectorstore_manager.add_documents(documents)
        
        # 처리 완료 마킹
        processed_dir.mkdir(exist_ok=True)
        for file_path in new_files:
            marker_file = processed_dir / f"{Path(file_path).name}.processed"
            marker_file.touch()
        
        logger.info(f"인덱싱 완료: {len(documents)}개 청크 추가")
    
    except Exception as e:
        logger.error(f"인덱싱 오류: {str(e)}", exc_info=True)

def run_scheduler():
    """스케줄러 실행"""
    # 매일 새벽 2시에 실행
    schedule.every().day.at("02:00").do(update_index)
    
    # 또는 1시간마다
    # schedule.every().hour.do(update_index)
    
    logger.info("스케줄러 시작")
    
    while True:
        schedule.run_pending()
        time.sleep(60)

if __name__ == "__main__":
    run_scheduler()

12단계: 보안 강화

API 인증과 권한 관리를 추가합니다.

# app/auth.py
from fastapi import HTTPException, Security, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from datetime import datetime, timedelta
from typing import Optional

SECRET_KEY = "your-secret-key-change-this"  # 실제로는 환경변수에서
ALGORITHM = "HS256"

security = HTTPBearer()

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    """액세스 토큰 생성"""
    to_encode = data.copy()
    
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(hours=24)
    
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    
    return encoded_jwt

def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
    """토큰 검증"""
    try:
        token = credentials.credentials
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id = payload.get("sub")
        
        if user_id is None:
            raise HTTPException(status_code=401, detail="유효하지 않은 토큰")
        
        return payload
    
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="토큰이 만료되었습니다")
    except jwt.JWTError:
        raise HTTPException(status_code=401, detail="토큰 검증 실패")
# app/main.py에 인증 추가
from app.auth import verify_token, create_access_token
from datetime import timedelta

@app.post("/token")
async def login(username: str, password: str):
    """로그인 (간단한 예제)"""
    # 실제로는 데이터베이스에서 사용자 확인
    if username == "admin" and password == "password":
        access_token = create_access_token(
            data={"sub": username},
            expires_delta=timedelta(hours=24)
        )
        return {"access_token": access_token, "token_type": "bearer"}
    
    raise HTTPException(status_code=401, detail="인증 실패")

@app.post("/chat", response_model=ChatResponse)
async def chat(
    request: ChatRequest,
    user = Depends(verify_token)  # 인증 필요
):
    """채팅 엔드포인트 (인증 필요)"""
    logger.info(f"사용자 {user['sub']}의 채팅 요청")
    
    # ... 나머지 코드 동일

13단계: 메트릭 수집

시스템 성능을 추적하기 위한 메트릭을 수집합니다.

# utils/metrics.py
from datetime import datetime
import json
import os

class MetricsCollector:
    def __init__(self, metrics_file: str = "data/metrics.jsonl"):
        self.metrics_file = metrics_file
    
    def log_query(
        self,
        question: str,
        response_time: float,
        verified: bool,
        num_sources: int,
        user_id: str = None
    ):
        """쿼리 메트릭 기록"""
        metric = {
            "timestamp": datetime.now().isoformat(),
            "question_length": len(question),
            "response_time": response_time,
            "verified": verified,
            "num_sources": num_sources,
            "user_id": user_id
        }
        
        with open(self.metrics_file, 'a', encoding='utf-8') as f:
            f.write(json.dumps(metric, ensure_ascii=False) + '\n')
    
    def get_stats(self, hours: int = 24):
        """통계 조회"""
        if not os.path.exists(self.metrics_file):
            return {}
        
        metrics = []
        cutoff = datetime.now().timestamp() - (hours * 3600)
        
        with open(self.metrics_file, 'r', encoding='utf-8') as f:
            for line in f:
                metric = json.loads(line)
                timestamp = datetime.fromisoformat(metric['timestamp']).timestamp()
                if timestamp > cutoff:
                    metrics.append(metric)
        
        if not metrics:
            return {}
        
        return {
            "total_queries": len(metrics),
            "avg_response_time": sum(m['response_time'] for m in metrics) / len(metrics),
            "verification_rate": sum(1 for m in metrics if m['verified']) / len(metrics),
            "avg_sources": sum(m['num_sources'] for m in metrics) / len(metrics)
        }
# app/main.py에 메트릭 수집 추가
from utils.metrics import MetricsCollector
import time

metrics_collector = MetricsCollector()

@app.post("/chat", response_model=ChatResponse)
async def chat(
    request: ChatRequest,
    user = Depends(verify_token)
):
    """채팅 엔드포인트"""
    start_time = time.time()
    
    try:
        # ... 답변 생성 코드
        
        # 메트릭 기록
        response_time = time.time() - start_time
        metrics_collector.log_query(
            question=request.question,
            response_time=response_time,
            verified=result.get('verified', False),
            num_sources=len(result['sources']),
            user_id=user['sub']
        )
        
        return ChatResponse(**result)
    
    except Exception as e:
        logger.error(f"채팅 오류: {str(e)}", exc_info=True)
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/metrics")
async def get_metrics(hours: int = 24, user = Depends(verify_token)):
    """메트릭 조회"""
    stats = metrics_collector.get_stats(hours=hours)
    return stats

14단계: 에러 처리 및 재시도

안정성을 높이기 위한 에러 처리와 재시도 로직입니다.

# utils/retry.py
from functools import wraps
import time
from utils.logging import setup_logger

logger = setup_logger('retry')

def retry_on_failure(max_attempts=3, delay=1, backoff=2):
    """재시도 데코레이터"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            attempt = 1
            current_delay = delay
            
            while attempt <= max_attempts:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts:
                        logger.error(f"{func.__name__} 최종 실패: {str(e)}")
                        raise
                    
                    logger.warning(
                        f"{func.__name__} 실패 (시도 {attempt}/{max_attempts}): {str(e)}. "
                        f"{current_delay}초 후 재시도..."
                    )
                    
                    time.sleep(current_delay)
                    current_delay *= backoff
                    attempt += 1
        
        return wrapper
    return decorator
# core/rag_chain.py에 재시도 적용
from utils.retry import retry_on_failure

class AdvancedRAGChain(RAGChain):
    
    @retry_on_failure(max_attempts=3, delay=1)
    def invoke_with_verification(self, question: str, filter: dict = None) -> Dict:
        """검증 포함 답변 생성 (재시도 포함)"""
        # ... 기존 코드

15단계: 환경별 설정 관리

개발, 테스트, 프로덕션 환경을 분리합니다.

# app/config.py 확장
from pydantic_settings import BaseSettings
from typing import Optional
from enum import Enum

class Environment(str, Enum):
    DEVELOPMENT = "development"
    STAGING = "staging"
    PRODUCTION = "production"

class Settings(BaseSettings):
    # 환경
    environment: Environment = Environment.DEVELOPMENT
    
    # OpenAI
    openai_api_key: str
    
    # 모델 설정
    model_name: str = "gpt-4o-mini"
    embedding_model: str = "text-embedding-3-small"
    
    # 청크 설정
    chunk_size: int = 1000
    chunk_overlap: int = 200
    
    # 검색 설정
    retrieval_k: int = 4
    
    # 벡터 스토어
    vectorstore_path: str = "./data/vectorstore"
    collection_name: str = "company_docs"
    
    # API 설정
    api_host: str = "0.0.0.0"
    api_port: int = 8000
    debug: bool = False
    
    # 보안
    secret_key: str
    access_token_expire_hours: int = 24
    
    # 캐싱
    enable_cache: bool = True
    cache_ttl_seconds: int = 3600
    
    # 로깅
    log_level: str = "INFO"
    
    # 성능
    max_concurrent_requests: int = 10
    request_timeout_seconds: int = 60
    
    class Config:
        env_file = ".env"
    
    @property
    def is_production(self) -> bool:
        return self.environment == Environment.PRODUCTION

# 환경별 설정
def get_settings() -> Settings:
    return Settings()

settings = get_settings()
# .env.development
ENVIRONMENT=development
OPENAI_API_KEY=your-dev-key
MODEL_NAME=gpt-3.5-turbo
DEBUG=true
LOG_LEVEL=DEBUG
SECRET_KEY=dev-secret-key

# .env.production
ENVIRONMENT=production
OPENAI_API_KEY=your-prod-key
MODEL_NAME=gpt-4o-mini
DEBUG=false
LOG_LEVEL=INFO
SECRET_KEY=prod-secret-key-change-this
ENABLE_CACHE=true
MAX_CONCURRENT_REQUESTS=50

16단계: 배포 스크립트

배포를 자동화하는 스크립트입니다.

# scripts/deploy.sh
#!/bin/bash

set -e

echo "🚀 챗봇 배포 시작..."

# 환경 변수 확인
if [ ! -f .env ]; then
    echo "❌ .env 파일이 없습니다"
    exit 1
fi

# Docker 이미지 빌드
echo "📦 Docker 이미지 빌드 중..."
docker-compose build

# 기존 컨테이너 중지
echo "🛑 기존 컨테이너 중지..."
docker-compose down

# 새 컨테이너 시작
echo "▶️  새 컨테이너 시작..."
docker-compose up -d

# 헬스 체크
echo "🏥 헬스 체크 중..."
sleep 10

if curl -f http://localhost:8000/health > /dev/null 2>&1; then
    echo "✅ API 서버 정상 작동"
else
    echo "❌ API 서버 시작 실패"
    docker-compose logs api
    exit 1
fi

echo "✨ 배포 완료!"
echo "API: http://localhost:8000"
echo "UI: http://localhost:8501"
# scripts/backup.sh
#!/bin/bash

set -e

BACKUP_DIR="backups/$(date +%Y%m%d_%H%M%S)"

echo "💾 백업 시작..."

mkdir -p $BACKUP_DIR

# 벡터 스토어 백업
echo "벡터 스토어 백업 중..."
cp -r data/vectorstore $BACKUP_DIR/

# 로그 백업
echo "로그 백업 중..."
cp -r logs $BACKUP_DIR/

# 설정 백업
echo "설정 백업 중..."
cp .env $BACKUP_DIR/

echo "✅ 백업 완료: $BACKUP_DIR"

17단계: 운영 가이드 문서

로컬 개발 환경

# 의존성 설치
pip install -r requirements.txt

# 환경 변수 설정
cp .env.example .env
# .env 파일을 편집하여 API 키 등을 설정

# 초기 문서 인덱싱
python scripts/initial_indexing.py

# API 서버 실행
uvicorn app.main:app --reload

# Streamlit UI 실행 (별도 터미널)
streamlit run frontend/streamlit_app.py


⸻

Docker로 실행

# 빌드 및 실행
docker-compose up --build -d

# 로그 확인
docker-compose logs -f

# 중지
docker-compose down


⸻

일상 운영

새 문서 추가
	1.	data/raw/ 폴더에 문서 업로드
	2.	자동 인덱싱 대기 (매일 새벽 2)
	3.	또는 수동 인덱싱 실행

python scripts/manual_indexing.py


⸻

캐시 관리

# 캐시 삭제
curl -X DELETE http://localhost:8000/cache


⸻

로그 확인

# API 로그
tail -f logs/api.log

# 인덱싱 로그
tail -f logs/scheduled_indexing.log


⸻

메트릭 확인

# 최근 24시간 메트릭
curl http://localhost:8000/metrics?hours=24


⸻

문제 해결

API 서버가 시작되지 않음
	1.	로그 확인

docker-compose logs api


	2.	환경 변수 확인 (.env 파일의 API 키)
	3.	포트 충돌 확인

lsof -i :8000



⸻

검색 결과가 부정확함
	•	문서 재인덱싱
	•	config.py에서 chunk_size 조정
	•	retrieval_k 값 증가

⸻

응답이 느림
	•	캐싱 활성화 여부 확인
	•	모델을 gpt-3.5-turbo로 변경
	•	retrieval_k 값 감소

⸻

백업 및 복구

백업

./scripts/backup.sh


⸻

복구

# 특정 백업에서 복구
cp -r backups/20240115_120000/vectorstore data/


⸻

보안 체크리스트
	•	.env 파일의 SECRET_KEY 변경
	•	프로덕션 환경에서 DEBUG=false 설정
	•	HTTPS 사용 (프록시 서버 설정)
	•	API 인증 활성화
	•	정기적인 백업 스케줄 설정

⸻

성능 튜닝

응답 속도 개선
	•	캐싱 활용
자주 묻는 질문은 캐시에서 즉시 응답
	•	모델 선택
간단한 질문은 gpt-3.5-turbo 사용
	•	청크 크기 조정
너무 크면 느리고, 너무 작으면 맥락 부족

⸻

비용 최적화
	•	임베딩 모델
text-embedding-3-small 사용
	•	LLM 호출 최소화
캐싱과 검증 로직을 선택적으로 사용
	•	배치 처리
여러 문서를 한 번에 인덱싱

⸻

모니터링

주요 지표
	•	평균 응답 시간
	•	검증 성공률
	•	일일 쿼리 수
	•	캐시 히트율

⸻

알림 설정

# 응답 시간이 5초 이상일 때 알림 (예시)
# 실제 운영 환경에서는 Prometheus, Grafana 등 사용 권장


⸻

업데이트

시스템 업데이트

# 코드 업데이트
git pull

# 의존성 업데이트
pip install -r requirements.txt --upgrade

# 재배포
./scripts/deploy.sh


⸻

롤백

# 이전 버전으로 롤백
git checkout <commit-hash>
docker-compose down
docker-compose up --build -d

마무리하며

이제 완전히 배포 가능한 RAG 시스템이 완성되었습니다. 문서 처리부터 API, 프론트엔드, Docker 배포, 모니터링, 보안까지 전체 스택을 다뤘습니다.

실제 운영에서는 사용자 피드백을 받아가며 점진적으로 개선하는 것이 중요합니다. 처음부터 모든 기능을 완벽하게 만들려 하지 말고, 핵심 기능부터 시작해서 필요에 따라 확장하세요.

이 시리즈가 여러분의 LLM 프로젝트에 도움이 되었기를 바랍니다!🤓

profile
👩🏻‍💻

0개의 댓글