1) 먼저 05 NLP Basic "Text to Tensor" 내용을 숙지하고 이 포스트를 보자.
2) 오늘은 Text Classification을 할 예정.
3) Basic Flow 중심
: 이 포스트의 주된 목적은 "Basic Flow"에 있다.
: Kaggle API -> Mecab 설치 -> Import -> Data -> Dataset -> DataLoader -> Model -> Loss Function and Optimizer -> train_one_epoch() and valid_one_epoch() -> run_train()
: Colab Cell에서 다음과 같이 입력하여 Kaggle Dataset을 한 번에 다운로드할 수 있다.
'KAGGLE_USERNAME', 'KAGGLE_KEY'의 경우, Kaggle에서 'Your Profile' -> 'Account' 탭 -> 'API'에서 얻을 수 있다.
import os
# os.environ을 이용하여 Kaggle API Username, Key 세팅하기
os.environ['KAGGLE_USERNAME'] = ############
os.environ['KAGGLE_KEY'] = ############
# Linux 명령어로 Kaggle API를 이용하여 데이터셋 다운로드하기 (!kaggle ~)
!kaggle datasets download -d heiswicked/dacon-shoppingmall-reviews-classification
# Linux 명령어로 압축 해제하기
!unzip '*.zip'
압축 해제가 끝났다면, 어떤 파일들이 있는지 살펴보자.
!ls
import re
import os
import gc
import time
import random
import string
import copy
from copy import deepcopy
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
# Utils
from tqdm import tqdm
## Pytorch Import
import torch
import torch.nn as nn
# from torch.optim import lr_scheduler
from torch.utils.data import Dataset, DataLoader
# Suppress warnings
import warnings
warnings.filterwarnings("ignore")
# For descriptive error messages
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
: nn을 이용해서 nn.Module을 상속받아서 다음 예시처럼 Model을 빌드한다. 그래서 필요하다.
class Model(nn.Module):
def __init__(self, input_dim = 8, output_dim = 1):
super().__init__()
self.fc1 = nn.Linear()
...
: Dataset과 DataLoader는 데이터를 배치 단위로 학습할 데이터(x)와 정답 데이터(y)를 묶어서 뱉어주는 역할을 한다.
: DACON Basic "쇼핑몰 리뷰 평점 분류 경진대회" 데이터셋을 사용한다.
base_path = '/content/bitstampUSD_1-min_data_2012-01-01_to_2021-03-31.csv'
df = pd.read_csv(base_path)
df = df[df.Open.notnull()].reset_index(drop = True)
# 결측치 제거 및 index 초기화
num = int(df.shape[0] * .01)
print(num)
df = df[:num] # 1%의 데이터만 사용!
print(df.shape)
df.head()
: 'target' class인 2, 1, 5, 4를 0, 1, 2, 3으로 인코딩해준다. 이 때 인코딩된 값들은 'new_target'인 새로운 컬럼에 넣어주도록 한다.
from sklearn.preprocessing import LabelEncoder
encoder = LabelEncoder()
train['new_target'] = encoder.fit_transform(train['target'])
: mecab.morphs() 로 Tokenize 해도 무방하지만, 아래 함수는 KDT AI 수강생 시절 김용담 강사님이 주신 코드이다. 아래 코드의 이점은 정규식이 포함되어있다는점과 토크나이징한 결과 중 불용어와 특정 품사를 제외할 수 있다는 점이다.
import re
from konlpy.tag import Mecab
mecab = Mecab()
predefined_pos = ["NNG", "NNP", "NNB", "NNBC", "NR", "NP",
"VV",
"VA", "VX", "VCP", "VCN",
"MM", "MAG", "MAJ"]
def text_pre(text, tokenizer = 'morphs'):
# 1. Cleaning
# 밑에 있는 cleaning 코드는 3개를 다 써도 되고, 일부만 사용 가능.
#text = re.sub("[^ㄱ-ㅎㅏ-ㅣ가-힣 ]", "", text) # 한국어 빼고 다 지우기
text = re.sub("[\{\}\[\]\/?.,;:|\)*~`!^\-_+<>@\#$%&\\\=\(\'\"]", "", text) # 특수문자 다 지우기
#text = re.sub(["A-Za-z"], "", text) # 영어 다 지우기
if tokenizer =='word':
tokens = text.split()
elif tokenizer =='nouns':
tokens = mecab.nouns(text)
elif tokenizer =='morphs':
tokens = mecab.morphs(text)
elif tokenizer =='predefined':
tokens = []
temp = mecab.pos(text)
for token, pos in temp:
if pos in predefined_pos:
tokens.append(token)
## 3. Stop words
SW = set()
SW.add("불용어")
result = [token for token in tokens if token not in SW]
return result
아래 예시를 참고하길 바란다.
: 이 Task에서는 text_pre(sentence, tokenizer = 'morphs') 로 진행할 예정.
def getbow(corpus):
# corpus: [sentence1, sentence2, ....]
bow = {'<PAD>': 0, '<BOS>': 1, '<EOS>':2}
for line in corpus:
for tok in text_pre(line):
if tok not in bow.keys():
bow[tok] = len(bow.keys())
return bow
# train 데이터에 있는 reviews 데이터만으로 getbow를 진행
korbow = getbow(train.reviews.to_list())
len(korbow.keys())
: DataLoader에서 뱉어줄 sl(=Sequence Length)를 결정하기 위해 필요한 것이 Max Length이다. 여기서 reviews 의 문장 중 가장 긴 문장을 기준으로 패딩을 진행할 예정이기 때문에 필요하다.
max_length = 0
length_list = []
for num in range(train.shape[0]):
length = len(text_pre(train.loc[num, 'reviews']))
length_list.append(length)
if length > max_length:
max_length = length
max_length
max_length가 82가 나오지만, 필자는 sl을 90으로 진행하였다. (BOS, EOS 토큰을 생각하면 84로 진행해도 된다.)
sl = 90
class MyDataset(Dataset):
def __init__(self,
df = train,
korbow = korbow,
tokenizer = text_pre,
sl = sl):
self.x = df.reviews
self.y = df.new_target.values
self.tokenizer = tokenizer
self.korbow = korbow
self.sl = sl
def __len__(self):
return self.x.shape[0]
def make_sentence(self, sentence):
# sentence = "나는 학교에 간다"
x = self.tokenizer(sentence) # list
# x: ["나는", "학교", "에", "간-", "다"]
x = ['<BOS>'] + x + ['<EOS>']
# x: ['<BOS>', "나는", "학교", "에", "간-", "다", '<EOS>']
# 14 나머지 50은 PAD 로 채워줘야
x += ['<PAD>'] * (self.sl - len(x))
# x: ['<BOS>', "나는", "학교", "에", "간-", "다", '<EOS>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', ...]
x = np.array([self.korbow[word] for word in x])
# x = [1, 3, 442, 23, 11, 345, 2, 0, 0, 0, 0, ....]
return x
def __getitem__(self, idx):
sen = self.x[idx] # 문장 하나가 특정이 된다.
x = self.make_sentence(sen)
y = self.y[idx] # 숫자하나(= label)
return x, y # np.array
: 여기서는 학습에 사용할 데이터와 성능 검증에 필요한 데이터로 쪼개준다. 그리고 각 데이터에 대해서 배치 단위로 뱉어줄 수 있도록 DataLoader 객체를 각각 만들어준다. 이 과정을 prepare_loaders() 함수에 담았다.
def prepare_loaders(df = train, index_num = 18000, bs = 2*64):
# train, valid split
train_df = df[:index_num].reset_index(drop = True)
valid_df = df[index_num:].reset_index(drop = True)
# train_ds, valid_ds
train_ds = MyDataset(df = train_df)
valid_ds = MyDataset(df = valid_df)
# train_loader, valid_loader
train_loader = DataLoader(train_ds, batch_size = bs, shuffle= True)
valid_loader = DataLoader(valid_ds, batch_size = bs, shuffle= False)
print("DataLoader Completed")
return train_loader, valid_loader
train_loader, valid_loader = prepare_loaders()
: Data의 Shape 추적이 가장 중요하다. 그래서 여기서도 확인해볼 필요가 있다. 아마 x에 해당하는 부분의 Shape은 [128, 90], y에 해당하는 부분의 Shape은 [128] 으로 나올 것이다.
## train_loader가 뱉는 배치 크기 확인해보자
data = next(iter(train_loader))
data[0].shape, data[1].shape
: torch.tensor를 비롯해서 train_loader, valid_loader에서 나오는 배치들과 Model의 layer들을 모두 GPU로 보내기 위한 코드이다. M1, Colab에서 적용할 수 있는 코드이며, '지금 GPU 쓸 수 있니? 없니?' 라고 확인 후, if else 조건문을 통해 GPU로 보내는 코드이다.
# Colab
# device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
# M1 버전
device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")
device
: nn.Module을 상속받아서 Model 클래스를 빌드한다.
: RNN 계열의
class Model(nn.Module):
def __init__(self,
input_dim = len(korbow.keys()),
emb_dim = 512,
hidden_size = 1024,
num_layers = 2,
sl = 2*64,
dropout = .1,
device = device):
super().__init__()
self.device = device
self.dropout = nn.Dropout(dropout)
self.nl = num_layers
self.hs = hidden_size
self.sl = sl
self.emb = nn.Embedding(input_dim, emb_dim)
self.rnn = nn.GRU(input_size = emb_dim,
hidden_size = self.hs,
num_layers = self.nl,
batch_first = True,
bidirectional = False)
# input's shape: [bs, sl, emb_dim]
# input(h_0)'s shape: [nl, bs, hidden_size]
# output's shape: [bs, sl, hidden_size]
# output(h_out)'s shape: [nl, bs, hidden_size]
# [bs, sl, hidden_size] -> [bs, k]
k = self.sl * self.hs
# [bs, k] -> [bs, 768] -> [bs, 4]
self.seq = nn.Sequential(nn.Linear(k, 768),
nn.LeakyReLU(),
nn.Linear(768, 4),
nn.LogSoftmax(dim=-1))
def forward(self, x):
# x: [bs, sl]
x = self.emb(x) # [bs, sl] -> [bs, sl, emb_dim]
h_0 = torch.zeros(self.nl, x.shape[0], self.hs).to(self.device)
# h_0 : [nl, bs, hidden_size]
# x: [bs, sl, emb_dim]
output, _ = self.rnn(x, h_0)
# output's shape: [bs, sl, hidden_size]
output = output.reshape(output.shape[0], -1)
output = self.seq(output)
# output: [bs, 4]
return output
model = Model().to(device) # GPU로 보내준다.
# loss_fn도 to(device)가 가능하다. (= GPU로 보내줄 수 있다.)
loss_fn = nn.NLLLoss().to(device)
# optimizer는 to(device)가 불가능하다. (= GPU로 보내줄 수 없다.)
optimizer = torch.optim.Adam(model.parameters()) # lr = 1e-3
: model이 한 epoch를 도는 동안, model이 학습되고 또한 train_epoch_loss(=epoch당 평균 train loss)와 train_acc(=epoch당 Accuracy)를 return 시키는 함수이다.
# train_loss => 실시간 단위로 구할 예정
# accuracy => 실시간 단위로 구할 예정
# from tqdm import tqdm
epoch = 1
def train_one_epoch(model = model,
dataloader = train_loader,
loss_fn = loss_fn,
optimizer = optimizer,
device = device,
epoch = epoch):
model.train()
train_loss, dataset_size = 0, 0
preds, trues = [], []
bar = tqdm(dataloader, total = len(dataloader))
for data in bar:
x = data[0].to(device)
y_true = data[1].to(device)
y_pred = model(x)
loss = loss_fn(y_pred, y_true)
optimizer.zero_grad()
loss.backward()
optimizer.step()
bs = x.shape[0]
dataset_size += bs
train_loss += (loss.item() * bs)
train_epoch_loss = train_loss / dataset_size
preds.append(y_pred)
trues.append(y_true)
preds_cat = torch.cat(preds, dim = 0)
trues_cat = torch.cat(trues, dim = 0)
train_acc = 100*(torch.argmax(preds_cat, dim =-1) == trues_cat).sum().item() / dataset_size
bar.set_description(f"Epoch{epoch:02d}|TL:{train_epoch_loss:.3e}|ACCURACY:{train_acc:.2f}")
return train_epoch_loss, train_acc
: model이 한 epoch를 도는 동안, valid_epoch_loss(=epoch당 평균 valid loss)와 valid_acc(=epoch당 Accuracy)를 return 시키는 함수이다.
# @torch.no_grad() : 데코레이터 형태의 'model을 학습 시키지 않겠다는 필수 의지 표명 2'
# --> 근데 함수형일 때 쓸 수 있는 것으로 알고 있다.
# --> model을 학습 시키지 않겠다는 필수 의지 표명 1과 2 중 하나만 써도 무방하다.
# valid_loss => 실시간 단위로 구할 예정
# accuracy => 실시간 단위로 구할 예정
# from tqdm import tqdm
epoch = 1
@torch.no_grad()
def valid_one_epoch(model = model,
dataloader = valid_loader,
loss_fn = loss_fn,
device = device,
epoch = epoch):
model.eval()
valid_loss, dataset_size = 0, 0
preds, trues = [], []
bar = tqdm(dataloader, total = len(dataloader))
with torch.no_grad():
for data in bar:
x = data[0].to(device)
y_true = data[1].to(device)
y_pred = model(x)
loss = loss_fn(y_pred, y_true)
bs = x.shape[0]
dataset_size += bs
valid_loss += (loss.item() * bs)
valid_epoch_loss = valid_loss / dataset_size
preds.append(y_pred)
trues.append(y_true)
preds_cat = torch.cat(preds, dim = 0)
trues_cat = torch.cat(trues, dim = 0)
valid_acc = 100*(torch.argmax(preds_cat, dim =-1) == trues_cat).sum().item() / dataset_size
bar.set_description(f"Epoch{epoch:02d}|VL:{valid_epoch_loss:.3e}|ACCURACY:{valid_acc:.2f}")
return valid_epoch_loss, valid_acc
: 여기서는 전반적인 train 과정을 담았다.
def run_train(model = model,
loss_fn = loss_fn,
optimizer = optimizer,
train_loader = train_loader,
valid_loader = valid_loader):
n_epochs = 25
print_iter =10
lowest_loss, lowest_epoch = np.inf, np.inf
early_stop = 30
train_hs, valid_hs = [], [] # visualization
train_accs, valid_accs = [], [] # visualization
for epoch in range(n_epochs):
train_loss, train_acc = train_one_epoch(model = model, dataloader = train_loader, loss_fn = loss_fn, optimizer = optimizer,
device = device, epoch = epoch)
valid_loss, valid_acc = valid_one_epoch(model = model, dataloader = valid_loader, loss_fn = loss_fn,
device = device, epoch = epoch)
# 줍줍
train_hs.append(train_loss)
valid_hs.append(valid_loss)
train_accs.append(train_acc)
valid_accs.append(valid_acc)
if (epoch + 1) % print_iter == 0:
print()
print(f"Ep:[{epoch + 1:02d}]|TL:{train_loss:.4e}|VL:{valid_loss:.4e}|LL:{lowest_loss:.4e}|")
print()
# Lowest Loss 갱신 - valid_loss 기준
if valid_loss < lowest_loss:
lowest_loss = valid_loss
lowest_epoch = epoch
# model 저장
torch.save(model.state_dict(), './model.bin') # pt, pth
else:
if early_stop > 0 and lowest_epoch + early_stop < epoch + 1:
print("넌 삽 질 중")
break
print()
print("The Best Validation Loss=%.4e at %d Epoch" % (lowest_loss, lowest_epoch))
# model load
model.load_state_dict(torch.load('./model.bin'))
result = dict()
result["Train Loss"] = train_hs
result["Valid Loss"] = valid_hs
result["Train Acc"] = train_accs
result["Valid Acc"] = valid_accs
return result, model
: 다음 코드로 학습 시작!
model, result = run_train()
## Visualization: Train Loss, Valid Loss
plot_from = 0
plt.figure(figsize=(20, 10))
plt.title("Train/Valid Loss History", fontsize = 20)
plt.plot(
range(0, len(result['Train Loss'][plot_from:])),
result['Train Loss'][plot_from:],
label = 'Train Loss'
)
plt.plot(
range(0, len(result['Valid Loss'][plot_from:])),
result['Valid Loss'][plot_from:],
label = 'Valid Loss'
)
plt.legend()
plt.yscale('log')
plt.grid(True)
plt.show()
## Visualization: Train Accuracy, Valid Accuracy
plot_from = 0
plt.figure(figsize=(20, 10))
plt.title("Train/Valid Accuracy History", fontsize = 20)
plt.plot(
range(0, len(result['Train Acc'])),
result['Train Acc'],
label = 'Train Acc'
)
plt.plot(
range(0, len(result['Valid Acc'])),
result['Valid Acc'],
label = 'Valid Acc'
)
plt.legend()
# plt.yscale('log')
plt.grid(True)
plt.show()