창업 대회를 준비하면서 Spring을 활용하여 AI 서비스를 구현하려고 했는데, 조금씩 어려움을 겪었다. 지금은 langchain4j을 이용해서 자바로 랭체인을 만들 수 있지만, 아직은 베타 버전이라 기능이 제한적이었어고 그래서 결국에는 REST-API 서버를 만들어서 프로젝트 방향을 바꿨다...
사실은 REST-API을 프론트엔드에서 바로 호출할 수 있었지만, 경험을 위해 스프링을 사용해서 REST API를 호출하는 방법을 선택했다. 좀 더 효율적인 방법이었으면 좋았을 텐데, 그래도 좋은 경험이 되겠죠?
랭체인을 구현하는 방법도 여러 가지 있었다. 처음에는 Colab에서 서버를 날먹으로 무료로 구축하려고 했는데, 외부 네트워크 연결 때문에 라이브러리을 써야 한다는 걸 알게 됐다. 그래서 구현했지만 API 호출하면 호출이 안되고 유료버전부터 API호출 가능하였다.. 다른 방법을 찾으려고 했지만 너무 복잡해서 결국 자바로 랭체인을 구현하는걸 시도했지만 테스트 후 기능 추가하려고 했는데 너무 제한적이었다 그래서 결국 REST-API 서버를 만들기로 결정했다.
REST API 실행 흐름
데이터 정제 -> 데이터 로드 후 벡터DB저장 -> 벡터DB 불러와서 챗봇에게 주입
맞는 말이지만 효율을 추구하는 나는 단순하고 접근성이 높고 처리속도가 빠른 txt파일 정제을 택했다. 단순하기에 읽기도 빠르기 때문이다. PDF바로 로드하는 게 정제과정이 없어서 효율적일 수 있지만 내가 추구하는 서비스는 빅데이터이기에 이게 더 효율적이라고 생각했다.
def convert_pdf_directory_to_txt(pdf_dir, txt_dir):
# 주어진 디렉토리 내의 모든 PDF 파일 검색
pdf_files = glob.glob(os.path.join(pdf_dir, "*.pdf"))
for pdf_path in pdf_files:
# 출력될 TXT 파일의 경로 생성
txt_path = os.path.join(txt_dir, os.path.basename(pdf_path).replace('.pdf', '.txt'))
# 해당 TXT 파일이 이미 존재하는지 확인
if os.path.exists(txt_path):
print(f"이미 변환된 파일이 존재합니다: {txt_path}")
continue # 이미 존재한다면 이 파일은 건너뛰기
# PDF 파일 열기
doc = fitz.open(pdf_path)
# 추출된 텍스트를 저장할 변수 초기화
text = ""
# 각 페이지를 순회하며 텍스트 추출
for page in doc:
text += page.get_text()
# 불필요한 공백 문자 제거 (연속된 공백은 하나로 축소)
text = ' '.join(text.split())
# 텍스트 파일로 저장
with open(txt_path, 'w', encoding='utf-8') as f:
f.write(text)
print(f"PDF 파일이 성공적으로 변환되었습니다: {txt_path}")```
def data_setup(path, db_directory):
"""
주어진 경로에서 PDF 문서를 로드하고, 텍스트 분할 후 벡터 데이터베이스를 생성합니다.
매개변수:
- path: 문서가 위치한 디렉토리 경로
- db_directory: 벡터 데이터베이스를 저장할 디렉토리 경로
반환값:
- 성공 여부, 저장된 데이터베이스 크기(MB), 청크 사이즈 정보를 담은 딕셔너리
"""
pdf_directory = 'data/pdf' # PDF 파일들이 위치한 디렉토리 경로
txt_directory = 'data/txt' # 변환된 TXT 파일들을 저장할 디렉토리 경로
convert_pdf_directory_to_txt(pdf_directory, txt_directory)
# PDF 문서 로딩
loader = DirectoryLoader(path, glob="*.txt", loader_cls=TextLoader)
documents = loader.load() # 문서 로드
# 텍스트 분할
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
texts = text_splitter.split_documents(documents) # 문서를 텍스트 청크로 분할
# 벡터 데이터베이스 생성 및 저장
vectordb = Chroma.from_documents(
documents=texts,
embedding=OpenAIEmbeddings(),
persist_directory=db_directory
)
vectordb.persist() # 데이터베이스 저장
# 저장된 데이터베이스의 총 크기 계산
total_size = sum(os.path.getsize(os.path.join(dirpath, f)) for dirpath, _, filenames in os.walk(db_directory) for f in filenames)
total_size_mb = total_size / (1024 * 1024) # MB 단위로 변환
# 결과 반환
return {
"success": True,
"stored_size_mb": round(total_size_mb, 1),
"chunk_size": len(texts),
}
class QAService:
def __init__(self, db_directory='db'):
"""
QAService 클래스 초기화
매개변수:
- db_directory: 벡터 데이터베이스가 저장된 디렉토리의 경로
"""
self.db_directory = db_directory
self.embedding = OpenAIEmbeddings() # 임베딩 모델 초기화
def load_vector_db(self):
"""
벡터 데이터베이스를 로드하여 검색 엔진으로 사용할 수 있게 준비합니다.
반환값:
- 검색을 위한 retriever 객체
"""
vectordb = Chroma(persist_directory=self.db_directory, embedding_function=self.embedding)
return vectordb.as_retriever()
def initialize_qa_chain(self, retriever):
"""
검색 및 질의응답 체인을 초기화합니다.
매개변수:
- retriever: 검색을 수행할 retriever 객체
반환값:
- 초기화된 RetrievalQA 객체
"""
return RetrievalQA.from_chain_type(
llm=ChatOpenAI(model="gpt-4-1106-preview"),
chain_type="stuff",
retriever=retriever,
return_source_documents=True)
def process_llm_response(self, llm_response,cb):
"""
LLM의 응답을 처리하고 출력합니다.
매개변수:
- llm_response: LLM으로부터 받은 응답
"""
response_data = {
"response_data" :{
'result': llm_response['result'],
'sources': [source.metadata['source'] for source in llm_response["source_documents"]]
},
"Tokens" :{
'Total Tokens' : cb.total_tokens,
'Prompt Tokens' : cb.prompt_tokens,
'Completion Tokens' : cb.completion_tokens,
'Total Cost (USD$)': cb.total_cost
}
}
return response_data
def qacall(self, query):
"""
주어진 쿼리에 대해 QA 체인을 실행하고 응답을 처리한 후 그 결과를 반환합니다.
매개변수:
- query: 사용자로부터 받은 쿼리 문자열
반환값:
- 처리된 응답 데이터와 소스 문서 정보가 담긴 JSON 객체
"""
retriever = self.load_vector_db() # 벡터 DB 로드
qa_chain = self.initialize_qa_chain(retriever) # QA 체인 초기화
with get_openai_callback() as cb:
llm_response = qa_chain.invoke(query) # 쿼리 실행
response_data = self.process_llm_response(llm_response,cb) # 응답 처리 및 데이터 변환
return response_data,
APIRouter
from fastapi import APIRouter
from BaseModel.modeldata import user
from service.qa_service import QAService
chat = APIRouter()
# QAService 인스턴스 생성
qa_service = QAService()
@chat.post("/chat")
def analyze_policy_endpoint(data: user):
# QAService 클래스의 인스턴스를 사용하여 qacall 메서드 호출
result = qa_service.qacall(data.query)
return result
호출 후 결과값
후기
아직 완벽한 코드는 아니지만 틀이라도 잡아서 다행이라고 생각한다...
Langchain에 대해 무작정 코드만 따라쓰다 어느 순간 docs을 찾으면서
코드의 흐름 코드에 역할을 눈에 익혔다. 시행착오가 있엇지만 구현하고싶은 것을 구현을 했다는 것에 의미을 두고싶다.
아직 문제가 많다 채팅을 이어가게 메모리설정,토큰 최적화,코드효율성 높이기등
일단 이제 플러터로 비율을 조금 높혀 앱에서 테스트로 쓰게 메인 코드작성을 앱으로 넘어갈 예정이다.
스프링은 자신있기에 금방 코드을 작성할 거 같아서 플러터쓰면서 필요한 코드가 있을때마다 스프링도 구현할 예정이다.
GitHub 프로젝트파일
문제점이나 잘못된 코드가 있으면 댓글에 써주시면 고쳐 나가겠습니다.
궁금한 것도 댓글에 써주시면 아는 만큼 답변해드리겠습니다.