Table data를 VectorDB에 저장하고 쿼리하는 고군분투기 (with ChromaDB, FAISS) - (1)

JeongYun Lee·2024년 11월 23일
0

LLM

목록 보기
8/9

main

<두 줄 요약>
1. 대용량 (1000만행 이상) csv 데이터를 임베딩해서
2. FAISS에 저장하고 쿼리하기

(이슈) ChromDB에서 저장한 벡터를 FAISS에 이관하기

지금까지 RAG를 구현할 때에는 ChromaDB를 중심으로 한 vectorDB를 사용했다. ChromaDB를 사용한 이유는 우선 예제가 많기도 했고, 공식 문서가 친절하게 작성되어 있었기 때문이다. (다른 툴을 사용해보니 확실히 잘 정리되어 있는 편이라는 걸 느꼈다 😂) 뿐만 아니라 구현도, 처리도 간단해서 입문용으로 적합한 툴이었던 것 같다.

그럼에도 이번 테스트에는 ChromaDB가 적합하지 않다는 판단을 했다. 이유는 딱 한 가지다.

쿼리 속도가 너무 느려서!!

vectorDB는 쿼리를 할 때, input query를 임베딩해서 저장되어 있는 임베딩 벡터 documents 들과 유사도를 계산해서 출력하는 방식이다 (유사도를 계산하는 방식은 다양함). 따라서 당연히 저장한 documents가 많으면 계산량이 많아지니 속도가 느려질 수 밖에 없는 것이다.

이때까지는 텍스트 기반의 데이터를 chunk 단위로 분리해서 저장했고, 사용한 데이터의 사이즈도 그렇게 크지 않았기 때문에 쿼리 속도가 큰 문제가 아니었다. 그러나 이번에는 csv 데이터, 그것도 1000만행 이상을 저장하고자 해서 쿼리 속도의 부화가 심각했다.

csv 데이터는 일반적으로 RDB나, NOSQL 형식으로 저장하고 쿼리하면 효율적이지만, 이를 위해서 쿼리할 field를 지정해줘야 한다는 근본적 조건이 있다. 하지만, 필자의 경우 쿼리하고 싶은 input이 어떤 field에서 검색을 해야 할지 특정하지 못하거나 input이 여러 field에 걸쳐 있는 문제가 있었다. field 후보를 줄 순 있지만, or 조건으로 검색하긴에 다소 애매... 하다고 생각해서대안으로 임베딩을 하고 쿼리를 하면 되지 않을까? 라는 사고로 이어진 것이다.
(아직 테스트 해보진 않았지만, 추후 Elasticsearch도 적용해보고 싶다)

그래서 FAISS는 쿼리 속도가 빠른가? 테스트 해 본 결과 (최소한 필자의 경우에는) ChromaDB와 비교할 수 없을 정도로 빨랐다. 물론 1000만행을 하나의 store에 넣어서 쿼리하는건 여전히 무리인 것 같지만, 그래도 기다릴만한 속도를 보여주긴 한다.

아 참고로, FAISS가 ChromaDB보다 빠른 이유는 C++로 만들어서, 벡터 쿼리에만 포커싱한 DB라서 등등의 이유가 있다고 한다.

.
.
.

서론이 매우 길었다. 그래서 이 글에서는 ChromaDB에 대용량 데이터를 넣는 방법과 여기서 생성한 벡터를 FAISS에 migrate하는 방법을 다루고자 한다. 물론 그냥 FAISS에서 바로 임베딩하고 저장하면 된다. 필자는 ChromaDB로 이미 테스트하면서 25달러나 써서 임베딩을 했기 때문에 이를 재활용하기 위해서 migrate를 해야만 했으므로... 200만행을 임베딩하는데 (컬럼수는 10개 내외, 한 행을 join한 것을 document로 사용) openai API의 text-embedding-3-large를 기준으로 25$가 부가됐다.

이미 임베딩된 벡터를 insert하는 방법을 알아내느라 꽤 애먹었는데, 하고 나니 제법 간단했다.


ChromaDB

ChromaDB에 데이터를 저장하는 방법은 이전 포스트들에서 다룬 적이 있으니 자세한 설명은 생략하겠다.

Connect

import chromadb
import uuid
from langchain.text_splitter import RecursiveCharacterTextSplitter
# from chromadb.utils import embedding_functions
import chromadb.utils.embedding_functions as embedding_functions

chromadb.__version__

client = chromadb.PersistentClient('./data/chroma/')
# chroma_client = chromadb.HttpClient(host="localhost", port=8000) # docker
client.list_collections()

나는 로컬에 저장하기 때문에 PersistentClient를 사용하는데, docker를 사용하거나 다른 방법을 사용할 수도 있다.

# collection 생성
collection = client.get_or_create_collection(name="컬렉션이름")
collection.count()

Insert

임베딩할 데이터와 metadata에 넣어줄 데이터가 달랐기 때문에 df_list에는 여러개의 csv 데이터 변수명을 넣어줬고, metadata에는 csv를 json형태로 바꾼 값을 넣어줬다.

claude의 최적화를 거쳤다.

openai_ef = embedding_functions.OpenAIEmbeddingFunction(
                api_key=OPENAI_API_KEY,
                model_name="text-embedding-3-large"
            )

collection = client.get_collection(name="컬렉션이름", embedding_function=openai_ef)

BATCH_SIZE = 1000  # 배치 크기 설정 (chromadb의 max가 10000인 듯)

# 이미 존재하는 ID 집합 가져오기
existing_ids = set(collection.get()['ids'])  # 한 번만 실행

for df_name in tqdm(df_list, desc="Total processing", position=1):
    df = globals()[df_name]
    df_json_name = df_name.replace('_select', '_json')
    df_json = result_dict[df_json_name]
    
    # ID 컬럼 확인
    id_col = '식별자' if '식별자' in df.columns else 'identifier'
    
    # 필요한 메타데이터 필드 정의
    metadata_fields = ['type', 'identifier', 'location_identifier']
    
    # 배치 처리를 위한 리스트
    batch_ids = []
    batch_documents = []
    batch_metadatas = []
    
    skipped_count = 0  # 건너뛴 항목 수 추적
    
    for i, row in tqdm(enumerate(df_json), desc=f"Processing {df_name}", position=0, leave=False):
        current_id = str(row[id_col])
        
        # 이미 존재하는 ID면 건너뛰기
        if current_id in existing_ids:
            skipped_count += 1
            continue
            
        # 임베딩용 텍스트 준비
        embed_text = ' '.join(str(v) for k, v in row.items())
        
        # 존재하는 메타데이터 필드만 선택
        metadata = {field: row[field] for field in metadata_fields if field in row}
        
        # 배치에 추가
        batch_ids.append(current_id)
        batch_documents.append(embed_text)
        batch_metadatas.append(metadata)
        
        # 현재 ID를 existing_ids에 추가
        existing_ids.add(current_id)
        
        # 배치 크기에 도달하거나 마지막 데이터일 때 저장
        if len(batch_ids) >= BATCH_SIZE or i == len(df_json) - 1:
            if batch_ids:  # 배치가 비어있지 않은 경우에만 실행
                collection.add(
                    ids=batch_ids,
                    documents=batch_documents,
                    metadatas=batch_metadatas
                )
            # 배치 초기화
            batch_ids = []
            batch_documents = []
            batch_metadatas = []
    
    print(f"{df_name} 처리 완료: {skipped_count}개 항목 건너뜀")

OPENAI_API_KEY는 .env 파일에 추가해서 가져왔고, model은 text-embedding-3-large를 사용했다. batch를 활용해서 속도를 높였고 (max가 10000인지 이상을 주면 계속 에러가 발생함) 이미 저장되어 있는 식별자의 경우 pass되도록 추가해줬다.

Query

# 저장된 데이터 수
collection.count()
# id기준으로 찾기
collection.get(
	ids=["d19-e28"]
)

이때 collection.get을 하면 embeddings 부분은 None이 나온다. 내 추측이지만, 아마도 빠르게 쿼리하기 위해서 임베딩 벡터는 제외하고 인덱싱된 metadata만 사용해서 쿼리하기 때문인 것 같다. 그래서 get의 속도는 굉장히 빠르다

# metadata를 활용한 filtering
 collection.get(where={"type": 'name'})
collection.get(
    where={"$or": [{"type": "주소정보안내판"}, {"type": "명예도로명판"}, {"type":"예고용도로명판"}, {"type":"이면도로도로명판"}]},
)

이런식으로 where를 활용해서 metadata의 key를 활용해서 쿼리해줄 수도 있다. 이때 or나 and 조건을 추가할 수도 있으나 속도가 살짝 느려지긴 한다.

results = collection.query(
    query_texts=["스타벅스 광화문점"],
    n_results=2,
    where={"type": "카페"}
)

collection.query는 벡터의 유사도를 계산해서 쿼리하는 방식이다. 이 쿼리가 사실 필요한데, 저장한 데이터의 양이 많아지니까 이 쿼리가 끝나지 않았다! 앞서 언급한 이유에 의해서 계산 부화가 너무 심하게 된다고 생각했다.

Chroma.sqlite3

chromadb 기본적으로 sqlite에 벡터와 metadata, document를 저장한다. 로컬에 저장한 경우 해당 경로 폴더에 들어가면 chroma.sqlite3라는 폴더가 있을 것이다. 이때까지, 이폴더에 대해서 크게 신경쓰지 않았는데, 이번에 저장된 벡터를 보다 효율적으로 가져오기 위해서 sqlite를 사용해보려 했다. 그러나... 내가 저장한 모든 벡터와 metadata를 다 가져오는데는 결국 실패했고, 아직도 정확한 원인은 잘 모르겠다. 최종적으로는 chromadb에서 collection.get에서 include를 활용하는 쿼리를 활용했다. 참고할 수 있는 sqlite의 테이블 안을 확인하는 코드는 다음과 같다.

import sqlite3

db_path = "./data/chroma/chroma.sqlite3"  
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# 모든 테이블 목록 가져오기
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()

for table in tables:
    table_name = table[0]
    print(f"\n===== {table_name} =====")
    
    # 테이블 구조 확인
    cursor.execute(f"PRAGMA table_info({table_name})")
    columns = cursor.fetchall()
    print("\nColumns:")
    for col in columns:
        print(f"- {col[1]} ({col[2]})")
    
    # 테이블 데이터 샘플 확인
    try:
        cursor.execute(f"SELECT * FROM {table_name} LIMIT 1")
        sample = cursor.fetchone()
        print("\nSample data:")
        print(sample)
        
        # 행 수 확인
        cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
        count = cursor.fetchone()[0]
        print(f"\nTotal rows: {count}")
    except Exception as e:
        print(f"\nError reading data: {e}")

conn.close()

sqlite에 저장된 모든 테이블과, 각 테이블의 컬럼, 행수를 확인하는 코드이다. 이 중 필요한 테이블은 embeddings_queue으로, 이 테이블 안에 우리가 필요한 벡터, 메타데이터, 도큐먼트가 모두 저장되어 있다. (전체 데이터는 아닌듯?)

import sqlite3
import numpy as np

db_path = "./data/chroma/chroma.sqlite3"  
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# 모든 collection 정보 가져오기
cursor.execute("""
    SELECT id, name
    FROM collections 
""")
collections = cursor.fetchall()

print("Available collections:")
for collection in collections:
    print(f"ID: {collection[0]}, Name: {collection[1]}")

conn.close()

SQL 쿼리를 통해 저장된 collection을 확인할 수도 있다.

(최종) Vector, Documents, Metadata를 저장

결국 collection.getinclude를 활용하기로 했다. 다만 batch를 활용해서 속도를 최대한 올렸고, 메모리를 주기적으로 정리해줘서 효율을 높였다. batch는 10000이상으로 올리면 ChromaDB 쪽에서 에러가 나는 것 같아 더 올리진 못했다.

이 역시 claude의 최적화를 거쳤다.

import chromadb
import numpy as np
import os, json, glob

# ChromaDB 클라이언트 초기화
client = chromadb.PersistentClient(path="./data/chroma")
collection = client.get_collection("컬렉션이름")

# 결과를 저장할 디렉토리 생성
output_dir = "embeddings_output"
os.makedirs(output_dir, exist_ok=True)

# 이미 처리된 배치 번호 확인
existing_batches = []
for file in glob.glob(os.path.join(output_dir, "embeddings_batch_*.npy")):
    batch_name = file.split('_')[-1].split('.')[0]
    if batch_name.isdigit():  # 숫자인 경우만 처리
        existing_batches.append(int(batch_name))

last_processed_batch = max(existing_batches) if existing_batches else 0
last_processed_count = last_processed_batch * 50000

print(f"Last processed batch: {last_processed_batch}")
print(f"Continuing from record: {last_processed_count}")

# 배치 처리 설정
batch_size = 10000
processed_count = last_processed_count
all_vectors = []
all_documents = []
all_metadatas = []

try:
    total_count = collection.count()
    print(f"Total records in collection: {total_count}")
    print(f"Resuming from record {processed_count}")
    
    # 배치 처리
    while processed_count < total_count:
        # 배치 데이터 가져오기
        batch_data = collection.get(
            limit=batch_size,
            offset=processed_count,
            include=['embeddings', 'documents', 'metadatas']
        )
        
        batch_embeddings = np.array(batch_data['embeddings'])
        batch_documents = batch_data['documents']
        batch_metadatas = batch_data['metadatas']
        
        all_vectors.append(batch_embeddings)
        all_documents.extend(batch_documents)
        all_metadatas.extend(batch_metadatas)
        
        processed_count += len(batch_embeddings)
        print(f"Processed {processed_count}/{total_count} records")
        
        # 메모리 관리를 위해 주기적으로 저장하고 메모리 정리
        if processed_count % 50000 == 0:
            batch_num = processed_count // 50000
            if batch_num not in existing_batches:  # 이미 존재하는 배치는 건너뛰기
                # Embeddings 저장
                current_vectors = np.vstack(all_vectors)
                np.save(os.path.join(output_dir, f"embeddings_batch_{batch_num}.npy"), current_vectors)
                
                # Documents 저장
                with open(os.path.join(output_dir, f"documents_batch_{batch_num}.json"), "w") as doc_file:
                    json.dump(all_documents, doc_file, ensure_ascii=False, indent=4)
                
                # Metadata 저장
                with open(os.path.join(output_dir, f"metadatas_batch_{batch_num}.json"), "w") as meta_file:
                    json.dump(all_metadatas, meta_file, ensure_ascii=False, indent=4)
            
            # 메모리 정리
            all_vectors = []
            all_documents = []
            all_metadatas = []
            
    # 남은 데이터 처리
    if all_vectors:
        final_vectors = np.vstack(all_vectors)
        np.save(os.path.join(output_dir, "embeddings_batch_final.npy"), final_vectors)
    if all_documents:
        with open(os.path.join(output_dir, "documents_batch_final.json"), "w") as doc_file:
            json.dump(all_documents, doc_file, ensure_ascii=False, indent=4)
    if all_metadatas:
        with open(os.path.join(output_dir, "metadatas_batch_final.json"), "w") as meta_file:
            json.dump(all_metadatas, meta_file, ensure_ascii=False, indent=4)

except Exception as e:
    print(f"\nError occurred: {str(e)}")
    raise

finally:
    print("\nProcess completed")
    print(f"Embeddings, documents, and metadatas saved in {output_dir} directory")

200만행을 처리하는데 생각보다 오래걸리지 않았다. 벡터는 npy로 저장되고, documents와 metadata는 json 형식으로 저장된다. 수많은 파일들이 저장되고 몇십 기가의 용량을 차지하니 꼭 저장공간을 확인하길 바란다.
.
.
.
이제 저장한 데이터를 FAISS에 저장하면 끝!
(2)편에서 이어서 작성하는 걸로~

profile
궁금한 건 많지만, 천천히 알아가는 중입니다

0개의 댓글