FAISS는 Meta(구 Facebook)에서 개발한 고성능 벡터 유사도 검색 라이브러리입니다. 수십억 개의 고차원 벡터에서 빠른 근사 최근접 이웃(Approximate Nearest Neighbor, ANN) 검색을 가능하게 합니다.
# 기본 개념: 쿼리 벡터와 가장 유사한 k개 벡터 찾기
query_vector = [0.1, 0.2, 0.3, ...] # 검색할 벡터
k = 5 # 찾을 유사 벡터 개수
distances, indices = index.search(query_vector, k)
FAISS는 다양한 인덱스 타입을 제공하며, 각각 다른 특성을 가집니다:
# CPU 버전
pip install faiss-cpu
# GPU 버전 (CUDA 필요)
pip install faiss-gpu
# conda 설치
conda install -c pytorch faiss-cpu
conda install -c pytorch faiss-gpu
import numpy as np
import faiss
# 1. 데이터 준비
dimension = 128 # 벡터 차원
n_vectors = 10000 # 벡터 개수
# 랜덤 벡터 생성 (실제로는 임베딩 모델에서 생성)
vectors = np.random.random((n_vectors, dimension)).astype('float32')
# 2. 인덱스 생성
index = faiss.IndexFlatL2(dimension) # L2 거리 기반 정확한 검색
# 3. 벡터 추가
index.add(vectors)
print(f"인덱스에 저장된 벡터 수: {index.ntotal}")
# 4. 검색
k = 5 # 상위 5개 결과
query_vector = np.random.random((1, dimension)).astype('float32')
distances, indices = index.search(query_vector, k)
print(f"거리: {distances}")
print(f"인덱스: {indices}")
IndexFlatL2: L2(유클리드) 거리 기반
index = faiss.IndexFlatL2(dimension)
IndexFlatIP: 내적(Inner Product) 기반
index = faiss.IndexFlatIP(dimension)
특징:
기본 원리:
1. 벡터 공간을 여러 클러스터로 분할
2. 각 벡터를 가장 가까운 클러스터에 할당
3. 검색 시 일부 클러스터만 탐색
# IVF 인덱스 생성
nlist = 100 # 클러스터 수
quantizer = faiss.IndexFlatL2(dimension)
index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
# 훈련 (클러스터링)
index.train(vectors)
index.add(vectors)
# 검색 시 탐색할 클러스터 수 설정
index.nprobe = 10
distances, indices = index.search(query_vector, k)
매개변수 조절:
nlist: 클러스터 수 (많을수록 정확하지만 메모리 사용량 증가)nprobe: 검색 시 탐색할 클러스터 수 (많을수록 정확하지만 느림)압축 원리:
벡터를 작은 부분으로 나누고 각 부분을 코드북으로 양자화
# PQ 인덱스
m = 8 # 서브벡터 개수
bits = 8 # 서브벡터당 비트 수
index = faiss.IndexPQ(dimension, m, bits)
index.train(vectors)
index.add(vectors)
메모리 계산:
그래프 기반 검색:
계층적 그래프 구조로 빠른 근사 검색
# HNSW 인덱스
M = 32 # 연결 수
index = faiss.IndexHNSWFlat(dimension, M)
index.add(vectors)
특징:
여러 기법을 조합한 고급 인덱스:
# IVF + PQ 조합
nlist = 100
m = 8
bits = 8
quantizer = faiss.IndexFlatL2(dimension)
index = faiss.IndexIVFPQ(quantizer, dimension, nlist, m, bits)
index.train(vectors)
index.add(vectors)
| 데이터 크기 | 정확도 요구 | 메모리 제한 | 추천 인덱스 |
|---|---|---|---|
| <10만 | 높음 | 없음 | IndexFlatL2 |
| 10만-100만 | 중간 | 있음 | IndexIVFFlat |
| >100만 | 중간 | 있음 | IndexIVFPQ |
| >1000만 | 낮음 | 심함 | IndexPQ |
| 모든 크기 | 높음 | 중간 | IndexHNSWFlat |
IVF 매개변수:
# 일반적인 가이드라인
import math
nlist = int(4 * math.sqrt(n_vectors)) # 클러스터 수
nprobe = int(nlist / 10) # 검색할 클러스터 수
# 정확도 vs 속도 조절
index.nprobe = nprobe # 높일수록 정확하지만 느림
PQ 매개변수:
# 차원이 m으로 나누어떨어져야 함
m = 8 # 일반적으로 8, 16, 32 사용
bits = 8 # 8비트가 일반적 (256개 클러스터)
# 압축률 = dimension * 4 / m
compression_ratio = dimension * 4 / m
# GPU 사용 가능 여부 확인
print(f"GPU 사용 가능: {faiss.get_num_gpus()}")
# GPU 인덱스로 변환
if faiss.get_num_gpus() > 0:
gpu_index = faiss.index_cpu_to_gpu(
faiss.StandardGpuResources(), # GPU 리소스
0, # GPU 번호
index # CPU 인덱스
)
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
class TextSearchEngine:
def __init__(self, model_name='all-MiniLM-L6-v2'):
self.model = SentenceTransformer(model_name)
self.index = None
self.texts = []
def build_index(self, texts, index_type='IVFFlat'):
"""텍스트로부터 인덱스 구축"""
self.texts = texts
# 임베딩 생성
embeddings = self.model.encode(texts)
embeddings = embeddings.astype('float32')
dimension = embeddings.shape[1]
n_vectors = embeddings.shape[0]
# 인덱스 타입에 따른 생성
if index_type == 'Flat':
self.index = faiss.IndexFlatL2(dimension)
elif index_type == 'IVFFlat':
nlist = min(int(4 * np.sqrt(n_vectors)), n_vectors // 10)
quantizer = faiss.IndexFlatL2(dimension)
self.index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
self.index.train(embeddings)
elif index_type == 'HNSW':
self.index = faiss.IndexHNSWFlat(dimension, 32)
self.index.add(embeddings)
def search(self, query, k=5):
"""쿼리 검색"""
query_embedding = self.model.encode([query]).astype('float32')
if hasattr(self.index, 'nprobe'):
self.index.nprobe = 10
distances, indices = self.index.search(query_embedding, k)
results = []
for i, (dist, idx) in enumerate(zip(distances[0], indices[0])):
if idx != -1: # 유효한 결과
results.append({
'text': self.texts[idx],
'distance': float(dist),
'rank': i + 1
})
return results
def save_index(self, filepath):
"""인덱스 저장"""
faiss.write_index(self.index, filepath)
def load_index(self, filepath):
"""인덱스 로드"""
self.index = faiss.read_index(filepath)
# 사용 예제
texts = [
"FAISS는 빠른 벡터 검색 라이브러리입니다.",
"파이썬으로 머신러닝을 구현할 수 있습니다.",
"임베딩은 텍스트를 벡터로 변환합니다.",
"검색 엔진은 정보 검색에 사용됩니다.",
"자연어 처리는 AI의 한 분야입니다."
]
engine = TextSearchEngine()
engine.build_index(texts, 'IVFFlat')
results = engine.search("벡터 검색이란?", k=3)
for result in results:
print(f"순위 {result['rank']}: {result['text']} (거리: {result['distance']:.4f})")
import faiss
import numpy as np
from PIL import Image
import torch
import torchvision.transforms as transforms
import torchvision.models as models
class ImageSearchEngine:
def __init__(self):
# 사전 훈련된 ResNet 모델 로드
self.model = models.resnet50(pretrained=True)
self.model.fc = torch.nn.Identity() # 마지막 분류 레이어 제거
self.model.eval()
# 이미지 전처리
self.transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
self.index = None
self.image_paths = []
def extract_features(self, image_path):
"""이미지에서 특징 벡터 추출"""
image = Image.open(image_path).convert('RGB')
image = self.transform(image).unsqueeze(0)
with torch.no_grad():
features = self.model(image)
return features.numpy().astype('float32')
def build_index(self, image_paths):
"""이미지 인덱스 구축"""
self.image_paths = image_paths
embeddings = []
for path in image_paths:
features = self.extract_features(path)
embeddings.append(features[0])
embeddings = np.array(embeddings)
dimension = embeddings.shape[1]
# HNSW 인덱스 사용 (이미지 검색에 효과적)
self.index = faiss.IndexHNSWFlat(dimension, 32)
self.index.add(embeddings)
def search_similar_images(self, query_image_path, k=5):
"""유사한 이미지 검색"""
query_features = self.extract_features(query_image_path)
distances, indices = self.index.search(query_features, k)
results = []
for dist, idx in zip(distances[0], indices[0]):
if idx != -1:
results.append({
'image_path': self.image_paths[idx],
'similarity': 1 / (1 + dist) # 거리를 유사도로 변환
})
return results
def build_large_index(embeddings_generator, dimension, batch_size=10000):
"""대용량 데이터를 배치로 처리"""
# 인덱스 초기화
nlist = 1000
quantizer = faiss.IndexFlatL2(dimension)
index = faiss.IndexIVFPQ(quantizer, dimension, nlist, 8, 8)
# 첫 번째 배치로 훈련
first_batch = next(embeddings_generator)
index.train(first_batch)
index.add(first_batch)
# 나머지 배치 추가
for batch in embeddings_generator:
index.add(batch)
print(f"현재 인덱스 크기: {index.ntotal}")
return index
# 메모리에 들어가지 않는 대용량 인덱스
def create_disk_index(vectors, index_path):
"""디스크 기반 인덱스 생성"""
dimension = vectors.shape[1]
# 온디스크 인덱스 생성
index = faiss.index_factory(dimension, "IVF1024,PQ64")
# 훈련
index.train(vectors[:100000]) # 샘플로 훈련
# 배치로 추가
batch_size = 10000
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i+batch_size]
index.add(batch)
# 디스크에 저장
faiss.write_index(index, index_path)
return index
import time
import matplotlib.pyplot as plt
def benchmark_indexes(vectors, query_vectors, k=10):
"""다양한 인덱스 성능 비교"""
dimension = vectors.shape[1]
results = {}
# 테스트할 인덱스들
indexes = {
'Flat': faiss.IndexFlatL2(dimension),
'IVF': faiss.IndexIVFFlat(faiss.IndexFlatL2(dimension), dimension, 100),
'HNSW': faiss.IndexHNSWFlat(dimension, 32),
'PQ': faiss.IndexPQ(dimension, 8, 8)
}
for name, index in indexes.items():
# 훈련 (필요한 경우)
if hasattr(index, 'train'):
start_time = time.time()
index.train(vectors)
train_time = time.time() - start_time
else:
train_time = 0
# 인덱스 구축
start_time = time.time()
index.add(vectors)
build_time = time.time() - start_time
# 검색 성능
start_time = time.time()
distances, indices = index.search(query_vectors, k)
search_time = time.time() - start_time
results[name] = {
'train_time': train_time,
'build_time': build_time,
'search_time': search_time,
'memory_usage': index.ntotal * dimension * 4 / (1024**2) # MB
}
return results
# 벤치마크 실행
n_vectors = 100000
dimension = 128
vectors = np.random.random((n_vectors, dimension)).astype('float32')
query_vectors = np.random.random((1000, dimension)).astype('float32')
results = benchmark_indexes(vectors, query_vectors)
# 결과 출력
for name, metrics in results.items():
print(f"{name}:")
print(f" 훈련 시간: {metrics['train_time']:.2f}초")
print(f" 구축 시간: {metrics['build_time']:.2f}초")
print(f" 검색 시간: {metrics['search_time']:.4f}초")
print(f" 메모리 사용량: {metrics['memory_usage']:.1f}MB")
print()
def faiss_clustering(vectors, n_clusters):
"""FAISS를 이용한 K-means 클러스터링"""
dimension = vectors.shape[1]
# K-means 클러스터링
kmeans = faiss.Kmeans(dimension, n_clusters, niter=20, verbose=True)
kmeans.train(vectors)
# 클러스터 할당
_, cluster_assignments = kmeans.index.search(vectors, 1)
return kmeans.centroids, cluster_assignments.flatten()
# 사용 예제
centroids, assignments = faiss_clustering(vectors, 50)
def merge_indexes(index1, index2):
"""두 개의 인덱스 병합"""
# 인덱스1의 벡터를 인덱스2에 추가
vectors1 = index1.reconstruct_n(0, index1.ntotal)
index2.add(vectors1)
return index2
class FilteredIndex:
"""조건부 검색을 위한 필터링된 인덱스"""
def __init__(self, base_index):
self.base_index = base_index
self.metadata = []
def add_with_metadata(self, vectors, metadata):
"""메타데이터와 함께 벡터 추가"""
self.base_index.add(vectors)
self.metadata.extend(metadata)
def search_with_filter(self, query, k, filter_func):
"""필터 조건을 만족하는 벡터만 검색"""
# 더 많은 후보를 검색
distances, indices = self.base_index.search(query, k * 10)
filtered_results = []
for dist, idx in zip(distances[0], indices[0]):
if idx != -1 and filter_func(self.metadata[idx]):
filtered_results.append((dist, idx))
if len(filtered_results) >= k:
break
return filtered_results
class MemoryEfficientIndex:
"""메모리 효율적인 인덱스 관리"""
def __init__(self, dimension, max_memory_mb=1000):
self.dimension = dimension
self.max_memory_mb = max_memory_mb
self.indexes = []
self.current_index = None
def add_vectors(self, vectors):
"""메모리 제한을 고려하여 벡터 추가"""
estimated_memory = len(vectors) * self.dimension * 4 / (1024**2)
if estimated_memory > self.max_memory_mb:
# 새 인덱스 생성
self._create_new_index()
self.current_index.add(vectors)
def _create_new_index(self):
"""새 인덱스 생성"""
if self.current_index is not None:
self.indexes.append(self.current_index)
self.current_index = faiss.IndexFlatL2(self.dimension)
def search(self, query, k):
"""모든 인덱스에서 검색하여 결합"""
all_distances = []
all_indices = []
offset = 0
for index in self.indexes + [self.current_index]:
if index is not None:
distances, indices = index.search(query, k)
all_distances.extend(distances[0])
all_indices.extend(indices[0] + offset)
offset += index.ntotal
# 상위 k개 선택
sorted_results = sorted(zip(all_distances, all_indices))[:k]
return zip(*sorted_results) if sorted_results else ([], [])
class UpdatableIndex:
"""업데이트 가능한 인덱스"""
def __init__(self, dimension):
self.dimension = dimension
self.main_index = faiss.IndexFlatL2(dimension)
self.delta_index = faiss.IndexFlatL2(dimension)
self.deleted_ids = set()
def add(self, vectors, ids=None):
"""벡터 추가"""
if ids is None:
ids = range(self.main_index.ntotal,
self.main_index.ntotal + len(vectors))
self.delta_index.add(vectors)
# 일정 크기가 되면 메인 인덱스와 병합
if self.delta_index.ntotal > 10000:
self._merge_indexes()
def delete(self, ids):
"""벡터 삭제 (소프트 삭제)"""
self.deleted_ids.update(ids)
def search(self, query, k):
"""삭제된 항목을 제외하고 검색"""
# 더 많은 결과를 검색하여 삭제된 항목 필터링
search_k = k + len(self.deleted_ids)
# 메인 인덱스 검색
distances1, indices1 = self.main_index.search(query, search_k)
# 델타 인덱스 검색
distances2, indices2 = self.delta_index.search(query, search_k)
# 결과 병합 및 필터링
all_results = []
for d, i in zip(distances1[0], indices1[0]):
if i not in self.deleted_ids:
all_results.append((d, i))
for d, i in zip(distances2[0], indices2[0]):
if i not in self.deleted_ids:
all_results.append((d, i + self.main_index.ntotal))
# 정렬 후 상위 k개 반환
all_results.sort()
return all_results[:k]
def _merge_indexes(self):
"""델타 인덱스를 메인 인덱스와 병합"""
# 구현 생략 (복잡한 로직)
pass
메모리 부족:
# 해결책: 배치 처리나 압축 인덱스 사용
def memory_safe_add(index, vectors, batch_size=1000):
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i+batch_size]
index.add(batch)
검색 품질 저하:
# 해결책: 매개변수 튜닝
def tune_parameters(index, queries, ground_truth):
best_recall = 0
best_nprobe = 1
for nprobe in [1, 5, 10, 20, 50]:
index.nprobe = nprobe
_, indices = index.search(queries, 10)
recall = calculate_recall(indices, ground_truth)
if recall > best_recall:
best_recall = recall
best_nprobe = nprobe
return best_nprobe
import psutil
import time
class PerformanceMonitor:
"""FAISS 성능 모니터링"""
def __init__(self):
self.metrics = []
def monitor_search(self, index, queries, k=10):
"""검색 성능 모니터링"""
start_time = time.time()
start_memory = psutil.Process().memory_info().rss / 1024 / 1024
distances, indices = index.search(queries, k)
end_time = time.time()
end_memory = psutil.Process().memory_info().rss / 1024 / 1024
metrics = {
'search_time': end_time - start_time,
'memory_used': end_memory - start_memory,
'queries_per_second': len(queries) / (end_time - start_time),
'avg_distance': float(np.mean(distances))
}
self.metrics.append(metrics)
return metrics