※ KoBERT 적용 참고자료 - [깃허브]
# KoBERT_finetuning_test.ipynb
import torch from torch import nn import torch.nn.functional as F import torch.optim as optim from torch.utils.data import Dataset, DataLoader import gluonnlp as nlp import numpy as np from tqdm.notebook import tqdm, tqdm_notebook from kobert.utils import get_tokenizer from kobert.pytorch_kobert import get_pytorch_kobert_model from transformers import AdamW from transformers.optimization import get_cosine_schedule_with_warmup import pandas as pd import sklearn ## CPU device = torch.device("cpu") ## GPU # device = torch.device("cuda:0")
# data setting
bertmodel, vocab = get_pytorch_kobert_model(cachedir=".cache") data = pd.read_csv('./train_dataset.csv') data2['label'].value_counts()
df = sklearn.utils.shuffle(data) df = df[df.notnull()] df['conts'] = df['conts'].astype(str) df.drop_duplicates(subset='conts', inplace=True) df = df.reset_index(drop=True) data = df.copy()
data_list = [] for q, label in zip(data['conts'], data['label']) : data = [] data.append(q) data.append(str(label)) data_list.append(data)
f'''데이터 셋 길이 : {len(data_list)} ''' train_data = data_list[:58298] test_data = data_list[58298:]
# Bert parameter Setting
class BERTDataset(Dataset): def __init__(self, dataset, sent_idx, label_idx, bert_tokenizer, max_len, pad, pair): transform = nlp.data.BERTSentenceTransform( bert_tokenizer, max_seq_length=max_len, pad=pad, pair=pair) self.sentences = [transform([i[sent_idx]]) for i in dataset] self.labels = [np.int32(i[label_idx]) for i in dataset] def __getitem__(self, i): return (self.sentences[i] + (self.labels[i], )) def __len__(self): return (len(self.labels)) ## Setting parameters max_len = 64 batch_size = 64 warmup_ratio = 0.1 num_epochs = 5 max_grad_norm = 1 log_interval = 200 learning_rate = 5e-5
# Tokenizing text with BertTokenizer
tokenizer = get_tokenizer() tok = nlp.data.BERTSPTokenizer(tokenizer, vocab, lower=False) data_train = BERTDataset(train_data, 0, 1, tok, max_len, True, False) data_test =BERTDataset(test_data, 0, 1, tok, max_len, True, False) train_dataloader = torch.utils.data.DataLoader(data_train, batch_size=batch_size, num_workers=5) test_dataloader = torch.utils.data.DataLoader(data_test, batch_size=batch_size, num_workers=5)
# BertClassifier class 선언
class BERTClassifier(nn.Module): def __init__(self, bert, hidden_size = 768, num_classes=2, ##예측할 범주(클래스) 수 조정 dr_rate=None, params=None): super(BERTClassifier, self).__init__() self.bert = bert self.dr_rate = dr_rate self.classifier = nn.Linear(hidden_size , num_classes) if dr_rate: self.dropout = nn.Dropout(p=dr_rate) def gen_attention_mask(self, token_ids, valid_length): attention_mask = torch.zeros_like(token_ids) for i, v in enumerate(valid_length): attention_mask[i][:v] = 1 return attention_mask.float() def forward(self, token_ids, valid_length, segment_ids): attention_mask = self.gen_attention_mask(token_ids, valid_length) _, pooler = self.bert(input_ids = token_ids, token_type_ids = segment_ids.long(), attention_mask = attention_mask.float().to(token_ids.device)) if self.dr_rate: out = self.dropout(pooler) return self.classifier(out) # set BertModel model = BERTClassifier(bertmodel, dr_rate=0.5).to(device) # Prepare optimizer and schedule (linear warmup and decay) no_decay = ['bias', 'LayerNorm.weight'] optimizer_grouped_parameters = [ {'params': [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01}, {'params': [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], 'weight_decay': 0.0} ] # set Optimizer optimizer = AdamW(optimizer_grouped_parameters, lr=learning_rate) loss_fn = nn.CrossEntropyLoss() t_total = len(train_dataloader) * num_epochs warmup_step = int(t_total * warmup_ratio)
scheduler = get_cosine_schedule_with_warmup(optimizer, num_warmup_steps=warmup_step, num_training_steps=t_total) def calc_accuracy(X,Y): max_vals, max_indices = torch.max(X, 1) train_acc = (max_indices == Y).sum().data.cpu().numpy()/max_indices.size()[0] return train_acc
# Train Model
for e in range(num_epochs): train_acc = 0.0 test_acc = 0.0 model.train() for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(tqdm_notebook(train_dataloader)): optimizer.zero_grad() token_ids = token_ids.long().to(device) segment_ids = segment_ids.long().to(device) valid_length= valid_length label = label.long().to(device) out = model(token_ids, valid_length, segment_ids) loss = loss_fn(out, label) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm) optimizer.step() scheduler.step() # Update learning rate schedule train_acc += calc_accuracy(out, label) if batch_id % log_interval == 0: print("epoch {} batch id {} loss {} train acc {}".format(e+1, batch_id+1, loss.data.cpu().numpy(), train_acc / (batch_id+1))) print("epoch {} train acc {}".format(e+1, train_acc / (batch_id+1))) model.eval() for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(tqdm_notebook(test_dataloader)): token_ids = token_ids.long().to(device) segment_ids = segment_ids.long().to(device) valid_length= valid_length label = label.long().to(device) out = model(token_ids, valid_length, segment_ids) test_acc += calc_accuracy(out, label) print("epoch {} test acc {}".format(e+1, test_acc / (batch_id+1)))
# Save Model and Predict new data
# model save torch.save(model.state_dict(), './BERTmodel.pt') bertmodel, vocab = get_pytorch_kobert_model(cachedir=".cache") load_model = torch.load('./BERTmodel.pt') load_model = BERTClassifier(bertmodel, dr_rate=0.5).to(device) load_model.load_state_dict(torch.load('./BERTmodel.pt')) def predict_with_load_model(predict_sentence): data = [predict_sentence, '0'] dataset_another = [data] another_test = BERTDataset(dataset_another, 0, 1, tok, max_len, True, False) test_dataloader = torch.utils.data.DataLoader(another_test, batch_size=batch_size, num_workers=5) load_model.eval() for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(test_dataloader): token_ids = token_ids.long().to(device) segment_ids = segment_ids.long().to(device) valid_length= valid_length label = label.long().to(device) out = load_model(token_ids, valid_length, segment_ids) test_eval=[] for i in out: logits=i logits = logits.detach().cpu().numpy() if np.argmax(logits) == 0: test_eval.append("정상") elif np.argmax(logits) == 1: test_eval.append("무성의글") print(f">> 입력하신 내용은 {test_eval[0]} 입니다.")
# 실행
end = 1 while end == 1 : sentence = input("하고싶은 말을 입력해주세요 : ") if sentence.endswith('0') : break predict_with_load_model(sentence) print("\n")