작년 2020년 내가 AI를 공부하면서 많은 기술들을 보았지만, transformer, bert, gpt에 대한 내용을 많이 볼 수 있었다.
이전에 인턴기간 다녔었던 판교 회사에서도 bert를 이용한 챗봇을 강조하면서 홍보를 했었다.
transformer에 대한 내용을 boostcamp에서 이해하였고, 이에 대한 코드를 간단하게 찾아 구현해 보았다.
아래의 사이트에서 코드를 참고하였다.
https://curiousily.com/posts/sentiment-analysis-with-bert-and-hugging-face-using-pytorch-and-python/
코드를 참고하여 작성하던 중, freeze_support
에러가 발생하여 run
함수를 따로 구현하여 main
에서 동작하게 하였으며 workers
도 0으로 주었다.
임베딩 과정 중에 위 코드가 max_len
에 맞지 않게 동작하여 truncation=True
옵션도 추가하여 작성했다.
from multiprocessing import freeze_support
import transformers
from transformers import BertModel, BertTokenizer, AdamW, get_linear_schedule_with_warmup
import torch
import gdown
import numpy as np
import pandas as pd
import seaborn as sns
from pylab import rcParams
import matplotlib.pyplot as plt
from matplotlib import rc
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from collections import defaultdict
from textwrap import wrap
from torch.nn import functional as F
from torchsummary import summary as summary_
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
def run():
freeze_support()
sns.set(style='whitegrid', palette='muted', font_scale=1.2)
HAPPY_COLORS_PALETTE = ["#01BEFE", "#FFDD00", "#FF7D00", "#FF006D", "#ADFF02", "#8F00FF"]
sns.set_palette(sns.color_palette(HAPPY_COLORS_PALETTE))
rcParams['figure.figsize'] = 12, 8
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
df = pd.read_csv("reviews.csv")
print(df.head())
print(df.shape)
print(df.info())
sns.countplot(df.score)
plt.xlabel('review scroe')
plt.show()
def to_sentiment(rating):
rating = int(rating)
if rating <= 2 : return 0
elif rating == 3 : return 1
else : return 2
df['sentiment'] = df.score.apply(to_sentiment)
class_names = ['negative', 'neutral', 'positive']
ax = sns.countplot(df.sentiment)
plt.xlabel('review sentiment')
ax.set_xticklabels(class_names)
plt.show()
PRE_TRAINED_MODEL_NAME = 'bert-base-cased'
tokenizer = BertTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME)
# sample_txt = 'When was I last outside? I am stuck at home for 2 weeks.'
#
# tokens = tokenizer.tokenize(sample_txt)
# tokens_ids = tokenizer.convert_tokens_to_ids(tokens)
#
# print(f' Sentence: {sample_txt}')
# print(f' Tokens: {tokens}')
# print(f'Token IDs: {tokens_ids}')
# print(tokenizer.sep_token, tokenizer.sep_token_id)
# print(tokenizer.cls_token, tokenizer.cls_token_id)
# print(tokenizer.pad_token, tokenizer.pad_token_id)
# print(tokenizer.unk_token, tokenizer.unk_token_id)
# encoding = tokenizer.encode_plus(
# sample_txt,
# max_length=32,
# add_special_tokens=True,
# return_token_type_ids=False,
# padding='max_length',
# return_attention_mask=True,
# return_tensors='pt'
# )
#
# print(encoding.keys())
#
# print(len(encoding['input_ids'][0]))
# print(encoding['input_ids'][0])
#
# print(len(encoding['attention_mask'][0]))
# print(encoding['attention_mask'])
#
# print(tokenizer.convert_ids_to_tokens(encoding['input_ids'][0]))
token_lens = []
for txt in df.content:
tokens = tokenizer.encode(txt, max_length=512)
token_lens.append(len(tokens))
sns.displot(token_lens, kde=True)
plt.xlim([0,256])
plt.xlabel('Token count')
plt.show()
# Most of reviesws seem to contain less than 128 tokens, but we'll be on the safe side and choose a maximum length of 160.
MAX_LEN = 160
class GPReviewDataset(Dataset):
def __init__(self, reviews, targets, tokenizer, max_len):
self.reviews = reviews
self.targets = targets
self.tokenizer = tokenizer
self.max_len = max_len
def __len__(self):
return len(self.reviews)
def __getitem__(self, item):
review = str(self.reviews[item])
target = self.targets[item]
encoding = self.tokenizer.encode_plus(
review,
add_special_tokens=True,
max_length=self.max_len,
return_token_type_ids=False,
padding='max_length',
return_attention_mask=True,
return_tensors='pt',
truncation=True
)
return {
'review_text': review,
'input_ids': encoding['input_ids'].flatten(),
'attention_mask': encoding['attention_mask'].flatten(),
'targets': torch.tensor(target, dtype=torch.long)
}
df_train, df_test = train_test_split(df, test_size=0.1, random_state=RANDOM_SEED)
df_val, df_test = train_test_split(df_test, test_size=0.5, random_state=RANDOM_SEED)
print(df_train.shape, df_val.shape, df_test.shape)
def create_data_loader(df, tokenizer, max_len, batch_size):
ds = GPReviewDataset(
reviews=df.content.to_numpy(),
targets=df.sentiment.to_numpy(),
tokenizer=tokenizer,
max_len=max_len
)
return DataLoader(
ds,
batch_size=batch_size,
num_workers=0
)
BATCH_SIZE = 32
train_data_loader = create_data_loader(df_train, tokenizer, MAX_LEN, BATCH_SIZE)
val_data_loader = create_data_loader(df_val, tokenizer, MAX_LEN, BATCH_SIZE)
test_data_loader = create_data_loader(df_test, tokenizer, MAX_LEN, BATCH_SIZE)
data = next(iter(train_data_loader))
print(data.keys())
print(data['input_ids'].shape)
print(data['attention_mask'].shape)
print(data['targets'].shape)
# bert_model = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME)
#
# ouput = bert_model(
# input_ids=encoding['input_ids'],
# attention_mask=encoding['attention_mask']
# )
# last_hidden_state = ouput.last_hidden_state
# pooled_output = ouput.pooler_output
# print(last_hidden_state.shape)
# print(bert_model.config.hidden_size)
# print(pooled_output.shape)
class SentimentClassifier(nn.Module):
def __init__(self, n_classes):
super(SentimentClassifier, self).__init__()
self.bert = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME)
self.drop = nn.Dropout(p=0.3)
#self.out = nn.Linear(self.bert.config.hidden_size, n_classes)
self.resnet = torch.hub.load('pytorch/vision:v0.6.0', 'resnet18', pretrained=True)
def forward(self, input_ids, attention_mask):
output = self.bert(
input_ids=input_ids,
attention_mask=attention_mask
)
output = self.drop(output.pooler_output)
return self.out(output)
model = SentimentClassifier(len(class_names))
model = model.to(device)
input_ids = data['input_ids'].to(device)
attention_mask = data['attention_mask'].to(device)
print(input_ids.shape) # batch size x seq length
print(attention_mask.shape) # batch size x seq length
EPOCHS = 10
optimizer = AdamW(model.parameters(), lr=2e-5, correct_bias=False)
total_steps = len(train_data_loader) * EPOCHS
scheduler = get_linear_schedule_with_warmup(
optimizer,
num_warmup_steps=0,
num_training_steps=total_steps
)
loss_fn = nn.CrossEntropyLoss().to(device)
def train_epoch(
model,
data_loader,
loss_fn,
optimizer,
device,
scheduler,
n_examples
):
model = model.train()
losses = []
correct_predictions = 0
for d in data_loader:
input_ids = d["input_ids"].to(device)
attention_mask = d["attention_mask"].to(device)
targets = d["targets"].to(device)
outputs = model(
input_ids=input_ids,
attention_mask=attention_mask
)
_, preds = torch.max(outputs, dim=1)
loss = loss_fn(outputs, targets)
correct_predictions += torch.sum(preds == targets)
losses.append(loss.item())
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
scheduler.step()
optimizer.zero_grad()
return correct_predictions.double() / n_examples, np.mean(losses)
def eval_model(model, data_loader, loss_fn, device, n_examples):
model = model.eval()
losses = []
correct_predictions = 0
with torch.no_grad():
for d in data_loader:
input_ids = d["input_ids"].to(device)
attention_mask = d["attention_mask"].to(device)
targets = d["targets"].to(device)
outputs = model(
input_ids=input_ids,
attention_mask=attention_mask
)
_, preds = torch.max(outputs, dim=1)
loss = loss_fn(outputs, targets)
correct_predictions += torch.sum(preds == targets)
losses.append(loss.item())
return correct_predictions.double() / n_examples, np.mean(losses)
history = defaultdict(list)
best_accuracy = 0
for epoch in range(EPOCHS):
print(f'Epoch {epoch + 1}/{EPOCHS}')
print('-' * 10)
train_acc, train_loss = train_epoch(
model,
train_data_loader,
loss_fn,
optimizer,
device,
scheduler,
len(df_train)
)
print(f'Train loss {train_loss} accuracy {train_acc}')
val_acc, val_loss = eval_model(
model,
val_data_loader,
loss_fn,
device,
len(df_val)
)
print(f'Val loss {val_loss} accuracy {val_acc}')
print()
history['train_acc'].append(train_acc)
history['train_loss'].append(train_loss)
history['val_acc'].append(val_acc)
history['val_loss'].append(val_loss)
if val_acc > best_accuracy:
torch.save(model.state_dict(), 'best_model_state.bin')
best_accuracy = val_acc
plt.plot(history['train_acc'], label='train accuracy')
plt.plot(history['val_acc'], label='validation accuracy')
plt.title('Training history')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend()
plt.ylim([0, 1])
test_acc, _ = eval_model(
model,
test_data_loader,
loss_fn,
device,
len(df_test)
)
test_acc.item()
if __name__ == '__main__':
run()