Table data를 VectorDB에 저장하고 쿼리하는 고군분투기 (with ChromaDB, FAISS) - (2)

JeongYun Lee·2024년 11월 24일
0

LLM

목록 보기
9/9
post-thumbnail

앞서 1편에서 ChromaDB를 통해 저장한 vector, documents, metadata를 FAISS에 Insert 하는 방법이다.

거듭 언급하지만, 처음 하는 사람은 ChromaDB와 FAISS를 모두 사용한 이 방법은 사용할 필요가 전혀 없다! 두 vectorDB 모두 임베딩을 하면서 저장하고 similarity로 쿼리하는 기능을 제공하며, 데이터가 많은 경우에 쿼리 속도가 중요하다면 FAISS를 선택하면 되지만, 데이터 양이 적은 경우 큰 차이 없으니 사용하기 쉬운 ChromaDB를 선택하면 된다.
.
.
.
FAISS를 사용해서 테이블 데이터를 임베딩과 함께 저장하는 방법과 이미 임베딩된 벡터와 documents, metadata를 단순히 '저장'하는 두 방법을 모두 기록해본다.


임베딩 + 저장

이 방법은 사실 예제 코드도 많고, 튜토리얼을 잘 따라만 하면된다. 필자의 경우 langchain의 예제 코드테디노트님의 langchain wikidocs를 많이 참고했다.

csv 데이터를 Document 단위로 만들기

from langchain.schema import Document

documents = [
    Document(page_content=doc, metadata=meta)
    for doc, meta in zip(contents, metadatas)
]

documents

langchain의 모듈을 활용해서 앞서 저장했던 metadata와 contents를 각각 metadata, page_content로 지정하여 형식을 맞춰준다.

Index 생성

FAISS에서는 Index가 chromaDB의 collection 단위 역할을 한다. 이때 metadata, contents, vector를 한 collection에 저장하는 chromaDB와 달리 FAISS의 Index에는 vector만 저장한다(아마도 그래서 속도가 빠르지 싶다). 처음에 이 개념들이 잘 이해가 안돼서 한참 헤맸는데, vector_store라는 변수를 하나 지정해서 사용하되, 실제 저장은 분리해서 한다는 결론을 내렸다.

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

import faiss
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.vectorstores import FAISS

index = faiss.IndexFlatL2(len(embeddings.embed_query("hello world")))

vector_store = FAISS(
    embedding_function=embeddings,
    index=index,
    docstore=InMemoryDocstore(),
    index_to_docstore_id={},
)

임베딩 모델을 openai의 text-embedding-large-3를 사용했고, dimention이 3072인 것을 지정해준다.

# 총 길이 확인하기
num_documents = len(vector_store.index_to_docstore_id)
num_documents
# ID 리스트 확인하기
vector_store.index_to_docstore_id
# ID와 데이터 확인하기
vector_store.docstore._dict
# 전체 벡터 array 확인하기
all_vectors = vector_store.index.reconstruct_n(0, vector_store.index.ntotal)
all_vectors

위 코드들은 Index와 vector_store에 저장된 값들의 정보를 확인할 수 있는 방법들이다.

데이터 저장

from langchain.schema import Document
from uuid import uuid4

# UUID 생성
uuids = [str(uuid4()) for _ in range(len(documents))]

# FAISS에 추가
vector_store.add_documents(
    documents=documents,
    embeddings=embeddings,
    ids=uuids
)

앞서 생성한 documents를 저장하는데, 이때 ID를 UUID로 임의로 생성해서 넣어준다. unique하기만 하다면 데이터 내의 식별자를 지정해줘도 될 것 같다.

임베딩 벡터 저장

index를 생성하는 부분은 동일하다. 여기서는 이미 임베딩 된 벡터를 어떻게 index로 넣는지에 대한 방법을 batch로 실행한다.

코드는 claude의 최적화를 거쳤다.

하나의 index에 모두 저장

import json
import numpy as np
from uuid import uuid4
from langchain.schema import Document
import os
from tqdm import tqdm  # 진행률 표시용

def process_batch(batch_num, vector_store):
    try:
        # 파일 경로
        doc_path = f'embeddings_output/documents_batch_{batch_num}.json'
        meta_path = f'embeddings_output/metadatas_batch_{batch_num}.json'
        emb_path = f'embeddings_output/embeddings_batch_{batch_num}.npy'
        
        # 파일 존재 확인
        if not all(os.path.exists(f) for f in [doc_path, meta_path, emb_path]):
            return False
        
        # 데이터 로드
        with open(doc_path, 'r', encoding='utf-8') as f:
            contents = json.load(f)
        with open(meta_path, 'r', encoding='utf-8') as f:
            metadatas = json.load(f)
        embedding_vectors = np.load(emb_path)
        
        # Document 객체 생성
        documents = [
            Document(page_content=doc, metadata=meta)
            for doc, meta in zip(contents, metadatas)
        ]
        
        # UUID 생성
        uuids = [str(uuid4()) for _ in range(len(documents))]
        
        # 현재 인덱스 상태 저장
        current_total = vector_store.index.ntotal
        
        # 벡터 저장
        vector_store.index.add(embedding_vectors)
        
        # docstore에 문서 추가
        for uuid, doc in zip(uuids, documents):
            vector_store.docstore._dict[uuid] = doc
        
        # index_to_docstore_id 매핑 업데이트
        for i, new_id in enumerate(uuids):
            vector_store.index_to_docstore_id[current_total + i] = new_id
        
        # 메모리 정리
        del contents, metadatas, embedding_vectors, documents, uuids
        
        return True
        
    except Exception as e:
        print(f"Error processing batch {batch_num}: {str(e)}")
        return False

def process_all_batches(vector_store, start_batch=1):
    
    # 마지막 배치 번호 찾기
    batch_files = [f for f in os.listdir('embeddings_output') if f.startswith('embeddings_batch_') and f.endswith('.npy')]
    max_batch = max([int(f.split('_')[-1].split('.')[0]) for f in batch_files if f.split('_')[-1].split('.')[0].isdigit()])
    
    print(f"Starting process from batch {start_batch} to {max_batch}")
    
    # 진행률 표시와 함께 배치 처리
    for batch_num in tqdm(range(start_batch, max_batch + 1), desc="Processing batches"):
        print(f"\nProcessing batch {batch_num}")
        
        success = process_batch(batch_num, vector_store)
        
        if success:
            print(f"Current vector count: {vector_store.index.ntotal}")
            print(f"Current document count: {len(vector_store.docstore._dict)}")
        else:
            print(f"Skipped batch {batch_num}")
    
    # final 배치 처리
    if os.path.exists('embeddings_output/embeddings_batch_final.npy'):
        print("\nProcessing final batch")
        success = process_batch('final', vector_store)
        if success:
            print("Final batch processed successfully")

    print("\nProcess completed!")
    print(f"Total vectors: {vector_store.index.ntotal}")
    print(f"Total documents: {len(vector_store.docstore._dict)}")
    
    # 무결성 검사
    assert vector_store.index.ntotal == len(vector_store.docstore._dict), "Vector count and document count mismatch!"
    assert vector_store.index.ntotal == len(vector_store.index_to_docstore_id), "Vector count and mapping count mismatch!"

process_batch는 앞서 10000 단위로 쪼개서 저장한 npy와 json 파일들을 각 batch 단위별로 불러서 Documents 형식으로 변환하고, 벡터는 index에 add를 통해서 넣어주고 나머지 metadata와 documents는 uuid에 맞게 저장해준다. 그리고 uuid기준으로 벡터를 맵핑하여 추후 찾을 수 있도록 한다. process_all_batches는 execute 함수이다.

try:
    # 시작할 배치 번호 지정 (중단된 지점부터 다시 시작하려면 여기서 번호 지정)
    start_batch = 1
    
    process_all_batches(vector_store, start_batch)
    
except Exception as e:
    print(f"Error occurred: {str(e)}")
    raise

이렇게 하면 다시 page_content를 임베딩해서 저장하지 않고, 원래 있는 벡터를 그대로 재사용할 수 있다.

두 개의 index로 분리해서 저장

위 방법대로 하고 쿼리를 진행하니, ChromaDB보다는 훨씬 빨랐지만, 여전히 200만행 정도를 쿼리하는데 1분 정도의 시간이 소요되어 서비스에서 쓰기에는 애매한 부분이 있었다. 하나의 index에 너무 많이 저장해서 그런가 싶어서 100만행씩 쪼개서 저장해보았다.

import faiss
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.vectorstores import FAISS
import json
import numpy as np
from uuid import uuid4
from langchain.schema import Document
import os
from tqdm import tqdm

class SplitVectorStore:
    def __init__(self, embeddings):
        embedding_dim = len(embeddings.embed_query("hello world"))
        
        # 두 개의 개별 인덱스 생성
        self.index1 = faiss.IndexFlatL2(embedding_dim)
        self.index2 = faiss.IndexFlatL2(embedding_dim)
        
        # 두 개의 벡터 스토어 생성
        self.vector_store1 = FAISS(
            embedding_function=embeddings,
            index=self.index1,
            docstore=InMemoryDocstore(),
            index_to_docstore_id={},
        )
        
        self.vector_store2 = FAISS(
            embedding_function=embeddings,
            index=self.index2,
            docstore=InMemoryDocstore(),
            index_to_docstore_id={},
        )
    
    def process_batch(self, batch_num, is_first_store=True):
        try:
            # 파일 경로
            doc_path = f'embeddings_output/documents_batch_{batch_num}.json'
            meta_path = f'embeddings_output/metadatas_batch_{batch_num}.json'
            emb_path = f'embeddings_output/embeddings_batch_{batch_num}.npy'
            
            # 파일 존재 확인
            if not all(os.path.exists(f) for f in [doc_path, meta_path, emb_path]):
                return False
            
            # 데이터 로드
            with open(doc_path, 'r', encoding='utf-8') as f:
                contents = json.load(f)
            with open(meta_path, 'r', encoding='utf-8') as f:
                metadatas = json.load(f)
            embedding_vectors = np.load(emb_path)
            
            # Document 객체 생성
            documents = [
                Document(page_content=doc, metadata=meta)
                for doc, meta in zip(contents, metadatas)
            ]
            
            # UUID 생성
            uuids = [str(uuid4()) for _ in range(len(documents))]
            
            # 사용할 벡터 스토어 선택
            vector_store = self.vector_store1 if is_first_store else self.vector_store2
            
            # 현재 인덱스 상태 저장
            current_total = vector_store.index.ntotal
            
            # 벡터 저장
            vector_store.index.add(embedding_vectors)
            
            # docstore에 문서 추가
            for uuid, doc in zip(uuids, documents):
                vector_store.docstore._dict[uuid] = doc
            
            # index_to_docstore_id 매핑 업데이트
            for i, new_id in enumerate(uuids):
                vector_store.index_to_docstore_id[current_total + i] = new_id
            
            # 메모리 정리
            del contents, metadatas, embedding_vectors, documents, uuids
            
            return True
            
        except Exception as e:
            print(f"Error processing batch {batch_num}: {str(e)}")
            return False

    def process_all_batches(self, start_batch=1):
        # 마지막 배치 번호 찾기
        batch_files = [f for f in os.listdir('embeddings_output') if f.startswith('embeddings_batch_') and f.endswith('.npy')]
        max_batch = max([int(f.split('_')[-1].split('.')[0]) for f in batch_files if f.split('_')[-1].split('.')[0].isdigit()])
        
        print(f"Starting process from batch {start_batch} to {max_batch}")
        
        # 중간 지점 계산
        mid_point = (max_batch + 1) // 2
        
        # 첫 번째 절반 처리 (vector_store1)
        print("\nProcessing first half of batches...")
        for batch_num in tqdm(range(start_batch, mid_point), desc="Processing first half"):
            print(f"\nProcessing batch {batch_num} (Store 1)")
            success = self.process_batch(batch_num, is_first_store=True)
            if success:
                print(f"Current vector count (Store 1): {self.vector_store1.index.ntotal}")
                print(f"Current document count (Store 1): {len(self.vector_store1.docstore._dict)}")
            else:
                print(f"Skipped batch {batch_num}")
        
        # 두 번째 절반 처리 (vector_store2)
        print("\nProcessing second half of batches...")
        for batch_num in tqdm(range(mid_point, max_batch + 1), desc="Processing second half"):
            print(f"\nProcessing batch {batch_num} (Store 2)")
            success = self.process_batch(batch_num, is_first_store=False)
            if success:
                print(f"Current vector count (Store 2): {self.vector_store2.index.ntotal}")
                print(f"Current document count (Store 2): {len(self.vector_store2.docstore._dict)}")
            else:
                print(f"Skipped batch {batch_num}")
        
        # final 배치 처리 (두 번째 스토어에 저장)
        if os.path.exists('embeddings_output/embeddings_batch_final.npy'):
            print("\nProcessing final batch")
            success = self.process_batch('final', is_first_store=False)
            if success:
                print("Final batch processed successfully")
        
        print("\nProcess completed!")
        print(f"Total vectors (Store 1): {self.vector_store1.index.ntotal}")
        print(f"Total documents (Store 1): {len(self.vector_store1.docstore._dict)}")
        print(f"Total vectors (Store 2): {self.vector_store2.index.ntotal}")
        print(f"Total documents (Store 2): {len(self.vector_store2.docstore._dict)}")
        
        # 무결성 검사
        self._verify_integrity()
    
    def _verify_integrity(self):
        # Store 1 무결성 검사
        assert self.vector_store1.index.ntotal == len(self.vector_store1.docstore._dict), "Vector count and document count mismatch in Store 1!"
        assert self.vector_store1.index.ntotal == len(self.vector_store1.index_to_docstore_id), "Vector count and mapping count mismatch in Store 1!"
        
        # Store 2 무결성 검사
        assert self.vector_store2.index.ntotal == len(self.vector_store2.docstore._dict), "Vector count and document count mismatch in Store 2!"
        assert self.vector_store2.index.ntotal == len(self.vector_store2.index_to_docstore_id), "Vector count and mapping count mismatch in Store 2!"
split_store = SplitVectorStore(embeddings)

# 모든 배치 처리
split_store.process_all_batches()

vector_store1 = split_store.vector_store1
vector_store2 = split_store.vector_store2

vector_store1과 vector_store2로 각각 지정해주었다.

쿼리하기

results = vector_store1.similarity_search(
    "쿼리하고 싶은 단어,
    k=1,
    filter={"type": "topic1"},
)
for res in results:
    print(f"* {res.page_content} [{res.metadata}]")

일반적인 similarity 쿼리는 위와 같이 진행한다. filter를 통해서 metadata의 값으로 필터링해줄 수도 있다.

retriever = vector_store1.as_retriever(search_type="mmr", search_kwargs={"k": 1})
retriever.invoke("쿼리하고 싶은 단어, filter={"type": "topic1"})

langchain의 retriever와 같은 형식으로도 쿼리가 가능하고 k를 통해서 상위 몇 개를 출력하고 싶은 지 지정할 수도 있다.

import concurrent.futures
from typing import List, Tuple
from langchain.schema import Document

class SimpleParallelSearcher:
    def __init__(self, split_store):
        self.stores = [
            split_store.vector_store1,
            split_store.vector_store2
        ]
    
    def search_store(self, store, query: str, k: int) -> List[Tuple[Document, float]]:
        """단일 스토어 검색"""
        return store.similarity_search_with_score(query, k=k)
    
    def search(self, query: str, k: int = 4) -> List[Tuple[Document, float]]:
        """병렬 검색 및 결과 병합"""
        # 각 스토어에서 k개의 결과를 가져옴
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_results = [
                executor.submit(self.search_store, store, query, k)
                for store in self.stores
            ]
            
            # 모든 결과 수집
            all_results = []
            for future in concurrent.futures.as_completed(future_results):
                all_results.extend(future.result())
        
        # 거리 기준으로 정렬하고 상위 k개 반환
        sorted_results = sorted(all_results, key=lambda x: x[1])
        return sorted_results[:k]

위에서 두 개의 index로 분리해서 저장했기 때문에 쿼리를 두 index에서 모두 병렬적으로 진행하기 위한 코드이다. 각각 k개씩 뽑은 뒤 다시 재정렬해서 최종 상위 k개를 반환하는 방식이다.

# 검색기 초기화
searcher = SimpleParallelSearcher(split_store)

# 검색 수행
results = searcher.search("쿼리하고 싶은 단어", k=1)

# 결과 출력
for doc, score in results:
    print(f"Score: {score}")
    print(f"Content: {doc.page_content}\n")

앞서 언급했듯, ChromaDB보다는 월등히 빠르지만, index를 쪼개도 rerank하는데 또 한번의 연산이 들어가기 때문에 부화가 생긴다. 더 많이 쪼갠다면 더 빨라질지는 미지수다.

저장하고 재연결하기

# 저장
vector_store.save_local("faiss_test")

로컬에 index와 metadata, content는 각각의 파일로 저장된다. 지정해준 이름은 폴더명이 된다.

import faiss
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.vectorstores import FAISS

vector_store = FAISS.load_local(
    "faiss_test", embeddings, allow_dangerous_deserialization=True
)

docs = vector_store.similarity_search("쿼리하고 싶은 단어")
docs

저장한 index를 다시 불러와서 쿼리를 해보면 똑같은 결과를 얻을 수 있다.


이렇게 FAISS의 성능을 어느 정도 파악해보았다. 대용량 데이터에 대해서는 확실히 쿼리 속도가 빠른게 체감되었다.

그러나 근본적으로 몇 백만 행의 데이터를 임베딩해서 쿼리하는 것에 대한 한계(시간적, 금전적(?, 유료 임베딩 모델을 사용한다면))가 있다는 것을 알게 되었고, 테스트가 아닌 실제 서비스에 적용할지는 고민을 해봐야 할 것 같다.

profile
궁금한 건 많지만, 천천히 알아가는 중입니다

0개의 댓글