
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd
# 샘플 문서
documents = [
"This is a sample document.",
"This document is another sample.",
"And this is yet another document, another sample.",
]
# TF-IDF 벡터화
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(documents)
# TF-IDF 행렬을 DataFrame으로 변환
df_tfidf = pd.DataFrame(tfidf_matrix.toarray(), columns=vectorizer.get_feature_names_out())
print("TF-IDF 행렬:")
print(df_tfidf)
# 코사인 유사도 계산
cosine_sim = cosine_similarity(tfidf_matrix)
similarity_df = pd.DataFrame(cosine_sim)
print("\n코사인 유사도 행렬:")
print(similarity_df)
어쨌든 이런 Sparse Embedding의 한계를 극복하기 위해 Dense Embedding이 등장했으며, 이는 더 작은 차원의 고밀도 벡터를 사용하여 단어의 의미와 맥락을 더 잘 표현할 수 있게 됨.
from transformers import AutoTokenizer, BertModel
model_checkpoint = 'bert-base-multilingual-cased'
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
model = BertModel.from_pretrained(model_checkpoint)
text = "This is an example sentence"
inputs = tokenizer(text, return_tensors="pt")
outputs = model(**inputs)
dense_embedding = outputs.last_hidden_state[:, 0, :] # [CLS] token embedding
주로 분류 문제에서 사용되는 손실 함수로, 모델이 예측한 확률 분포와 실제 레이블 간의 차이를 측정한다. NLL Loss는 모델이 예측한 확률이 실제 레이블과 일치할수록 낮아지며, 반대로 예측이 틀릴수록 높아진다.
수학적 정의
은 데이터 포인트의 수
는 데이터 포인트 의 실제 레이블
는 데이터 포인트 의 입력
는 모델이 를 입력으로 받아 일 확률을 예측한 값
양성 문서(positive passage)에 대한 손실은 다음과 같이 정의됨.
실제 계산 예시
import torch
from transformers import BertModel, BertTokenizer
# 모델과 토크나이저 로드
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')
# 입력 예시
question = "What is the capital of France?"
positive_passage = "Paris is the capital of France."
negative_passage = "Berlin is the capital of Germany."
# 토큰화 및 인코딩
inputs_q = tokenizer(question, return_tensors='pt')
inputs_p_pos = tokenizer(positive_passage, return_tensors='pt')
inputs_p_neg = tokenizer(negative_passage, return_tensors='pt')
# 임베딩 생성
with torch.no_grad():
outputs_q = model(**inputs_q)
outputs_p_pos = model(**inputs_p_pos)
outputs_p_neg = model(**inputs_p_neg)
# [CLS] 토큰의 출력 사용
h_q = outputs_q.last_hidden_state[:, 0, :]
h_p_pos = outputs_p_pos.last_hidden_state[:, 0, :]
h_p_neg = outputs_p_neg.last_hidden_state[:, 0, :]
# 유사도 계산 (내적)
sim_pos = torch.matmul(h_q, h_p_pos.T)
sim_neg = torch.matmul(h_q, h_p_neg.T)
# 손실 계산 (NLL)
loss_fn = torch.nn.CrossEntropyLoss()
logits = torch.cat([sim_pos, sim_neg], dim=1)
labels = torch.zeros(logits.size(0), dtype=torch.long) # 양성 샘플의 인덱스는 0
loss = loss_fn(logits, labels)
print(f"Loss: {loss.item()}")
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import DPRContextEncoder, DPRQuestionEncoder, DPRContextEncoderTokenizer, DPRQuestionEncoderTokenizer, AdamW
from datasets import load_dataset
import faiss
import numpy as np
# 디바이스 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# SQuAD 데이터셋 로드
squad_dataset = load_dataset("squad", split="train[:5000]") # 예시를 위해 5000개만 사용
# DPR 모델 및 토크나이저 로드
ctx_encoder = DPRContextEncoder.from_pretrained("facebook/dpr-ctx_encoder-single-nq-base").to(device)
ctx_tokenizer = DPRContextEncoderTokenizer.from_pretrained("facebook/dpr-ctx_encoder-single-nq-base")
q_encoder = DPRQuestionEncoder.from_pretrained("facebook/dpr-question_encoder-single-nq-base").to(device)
q_tokenizer = DPRQuestionEncoderTokenizer.from_pretrained("facebook/dpr-question_encoder-single-nq-base")
# 데이터셋 클래스 정의
class SQuADDataset(Dataset):
def __init__(self, dataset, q_tokenizer, ctx_tokenizer):
self.dataset = dataset
self.q_tokenizer = q_tokenizer
self.ctx_tokenizer = ctx_tokenizer
def __len__(self):
return len(self.dataset)
def __getitem__(self, idx):
item = self.dataset[idx]
question = self.q_tokenizer(item['question'], truncation=True, padding='max_length', max_length=128, return_tensors='pt')
context = self.ctx_tokenizer(item['context'], truncation=True, padding='max_length', max_length=512, return_tensors='pt')
return {
'question_input_ids': question['input_ids'].squeeze(),
'question_attention_mask': question['attention_mask'].squeeze(),
'context_input_ids': context['input_ids'].squeeze(),
'context_attention_mask': context['attention_mask'].squeeze(),
}
# 데이터셋 및 데이터로더 생성
train_dataset = SQuADDataset(squad_dataset, q_tokenizer, ctx_tokenizer)
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
# 손실 함수 및 옵티마이저 정의
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = AdamW(list(q_encoder.parameters()) + list(ctx_encoder.parameters()), lr=2e-5)
# 학습 함수
def train(q_encoder, ctx_encoder, train_dataloader, optimizer, epochs=3):
for epoch in range(epochs):
q_encoder.train()
ctx_encoder.train()
total_loss = 0
for batch in train_dataloader:
optimizer.zero_grad()
q_inputs = {'input_ids': batch['question_input_ids'].to(device),
'attention_mask': batch['question_attention_mask'].to(device)}
ctx_inputs = {'input_ids': batch['context_input_ids'].to(device),
'attention_mask': batch['context_attention_mask'].to(device)}
q_embeddings = q_encoder(**q_inputs).pooler_output
ctx_embeddings = ctx_encoder(**ctx_inputs).pooler_output
sim_scores = torch.matmul(q_embeddings, ctx_embeddings.t())
labels = torch.arange(sim_scores.size(0)).to(device)
loss = loss_fn(sim_scores, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_dataloader):.4f}")
# 모델 학습
train(q_encoder, ctx_encoder, train_dataloader, optimizer)
# 문맥 임베딩 생성 함수
def get_context_embedding(context):
inputs = ctx_tokenizer(context, return_tensors="pt", max_length=512, truncation=True).to(device)
with torch.no_grad():
embeddings = ctx_encoder(**inputs).pooler_output
return embeddings.cpu().numpy()
# 질문 임베딩 생성 함수
def get_question_embedding(question):
inputs = q_tokenizer(question, return_tensors="pt", max_length=512, truncation=True).to(device)
with torch.no_grad():
embeddings = q_encoder(**inputs).pooler_output
return embeddings.cpu().numpy()
# 문맥 임베딩 생성
context_embeddings = [get_context_embedding(ctx) for ctx in squad_dataset["context"]]
context_embeddings = np.vstack(context_embeddings)
# FAISS 인덱스 생성 및 추가
d = context_embeddings.shape[1] # 임베딩 차원
index = faiss.IndexFlatIP(d)
index.add(context_embeddings.astype('float32'))
# 검색 함수
def search(query, top_k=5):
question_embedding = get_question_embedding(query)
scores, indices = index.search(question_embedding, top_k)
return [(squad_dataset[int(i)]["context"], squad_dataset[int(i)]["question"], float(s)) for i, s in zip(indices[0], scores[0])]
# 테스트
test_query = "What is the capital of France?"
results = search(test_query)
print(f"Query: {test_query}\n")
for i, (context, original_question, score) in enumerate(results, 1):
print(f"Result {i}:")
print(f"Context: {context[:100]}...")
print(f"Original Question: {original_question}")
print(f"Similarity Score: {score:.4f}\n")
FAISS?
- FAISS(Facebook AI Similarity Search)란, 대규모 벡터 데이터베이스에서 효율적인 유사성 검색을 수행하기 위한 라이브러리이다.
- Inner Product 또는 Cosine Similarity를 사용하여 벡터 간 유사성을 계산할 수 있다.
- 대규모 데이터셋에서도 빠른 검색이 가능하다.
이하 FAISS를 활용한 기본 예제 코드 참고
import numpy as np
import faiss
# 데이터 준비
d = 768 # BERT 임베딩의 차원
nb = len(document_embeddings) # 문서 수
xb = np.array(document_embeddings).astype('float32')
# FAISS 인덱스 생성
index = faiss.IndexFlatIP(d) # 내적(Inner Product) 유사도 사용
index.add(xb) # 문서 임베딩을 인덱스에 추가
# 쿼리 처리 함수
def retrieve_faiss(query_or_dataset, topk=1):
with torch.no_grad():
query_vec = model.encode([query_or_dataset])[0].astype('float32')
# FAISS를 사용한 유사도 검색
D, I = index.search(query_vec.reshape(1, -1), topk)
return I[0] # 상위 k개의 문서 인덱스 반환
# 사용 예시
query = "What is the capital of France?"
top_k_indices = retrieve_faiss(query, topk=5)
# 결과 출력
for idx in top_k_indices:
print(f"Document: {documents[idx]}")