pytorch를 활용한 RNN 모델 구현 내용임.
(사용한 dataset: IMDB)
# torch package
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
# dataset packagea
from torch.utils.data import Dataset, DataLoader
from tensorflow.keras.datasets import imdb
from sklearn.model_selection import train_test_split
# metrcis
from sklearn.metrics import accuracy_score, classification_report
# ETC
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
class SimpleRNN_cell(nn.Module):
def __init__(self, input_size, hidden_size, device):
super().__init__()
# input_size: X features(X's columns)
# if X.shape (batch_size, sequence_length, feature) then X.shape[2](feature)
self.input_size = input_size
self.hidden_size = hidden_size
self.device = device
# 파라미터 생성 및 초기화
# forward 내 tanh 인자값에 대한 연산으로 torch.matmul을 사용할 경우 아래 주석처리된 W_xh를 사용
# self.W_xh = nn.Parameter(torch.empty(self.input_size, self.hidden_size, device=self.device))
self.W_xh = nn.Parameter(torch.empty(self.hidden_size, self.input_size, device=self.device))
self.W_hh = nn.Parameter(torch.empty(self.hidden_size, self.hidden_size, device=self.device))
self.b_xh = nn.Parameter(torch.empty(self.hidden_size, device=self.device))
self.b_hh = nn.Parameter(torch.empty(self.hidden_size, device=self.device))
nn.init.xavier_uniform_(self.W_xh, gain=1.0)
nn.init.xavier_uniform_(self.W_hh, gain=1.0)
nn.init.constant_(self.b_xh, 1.)
nn.init.constant_(self.b_hh, 1.)
# tanh함수를 구현하였으나 torch cuda/cpu 연산처리 관련해서 복잡하므로 그냥 torch.tanh 사용
def tanh(self, x):
return (torch.exp(x) - torch.exp(-x))/(torch.exp(x) + torch.exp(-x))
def forward(self, x, hidden=None): # x.shape: (batch_size, sequence_length, embeding_size)
# init hidden state
if hidden is None:
hidden = torch.zeros(x.shape[0], self.hidden_size, device=self.device)
# output.shape: (batch_size, sequence_length, hidden_size)
outputs = []
for seq in range(x.shape[1]):
input_x = x[:,seq,:]
hidden = torch.tanh(F.linear(input_x, self.W_xh, self.b_xh) + F.linear(hidden, self.W_hh, self.b_hh))
outputs.append(hidden.unsqueeze(1))
outputs = torch.cat(outputs, dim=1)
return outputs
class SimpleRNN_cell(nn.Module):
def __init__(self, input_size, hidden_size, device):
super().__init__()
# input_size: X features(X's columns)
# if X.shape (batch_size, sequence_length, feature) then X.shape[2](feature)
self.input_size = input_size
self.hidden_size = hidden_size
self.device = device
# 파라미터 생성 및 초기화
# forward 내 tanh 인자값에 대한 연산으로 torch.matmul을 사용할 경우 아래 주석처리된 W_xh를 사용
# self.W_xh = nn.Parameter(torch.empty(self.input_size, self.hidden_size, device=self.device))
self.W_xh = nn.Parameter(torch.empty(self.hidden_size, self.input_size, device=self.device))
self.W_hh = nn.Parameter(torch.empty(self.hidden_size, self.hidden_size, device=self.device))
self.b_xh = nn.Parameter(torch.empty(self.hidden_size, device=self.device))
self.b_hh = nn.Parameter(torch.empty(self.hidden_size, device=self.device))
nn.init.xavier_uniform_(self.W_xh, gain=1.0)
nn.init.xavier_uniform_(self.W_hh, gain=1.0)
nn.init.constant_(self.b_xh, 1.)
nn.init.constant_(self.b_hh, 1.)
input_size
: 입력 데이터 X의 feature_size임.hidden_size
: hyperparameter로 사용자가 적합한 수치를 설정device
: 사용자가 연산할 하드웨어 설정 (cuda or cpu)
- 필요한 가중치
- 필요한 편향
- 위 RNN Cell은 를 출력값으로 그대로 사용했지만 필요에 따라서 에 추가적인 연산을 진행하여 출력할 수도 있다.
input_size를 정하는 부분에서 어려움을 겪을 수 있다.
input_size의 shape이 (batch_size, sequence_length, feature)일 경우 input_size는 feature값이 된다.
hidden_size는 hyperparameter이기 때문에 임의로 설정하면 된다.
해당 포스트에는 나오지 않았으나 전체 코드에서는 input data가 Embedding layer를 통과하기 때문에 Embedding layer를 통과한 input data의 shape은 (batch_size, sequence_length, embedding_dim)이 되며 embedding_dim이 RNN Cell의 input_size가 된다.
추가로 forward 연산시 F.linear
연산을 해야하므로 input_data
와 연산이 되는 W_xh
가중치의 shape은 (hidden_size, input_size)로 한다.
F.linear(X, W, b)
: X @ W.T + b프레임워크에서 backpropagation을 통한 가중치 업데이트가 가능하도록
nn.Parameter 메소드를 통해 가중치와 편향을 선언해주고
tanh 연산에 적합한 xavier_uniform으로 가중치를 초기화해준다.
def tanh(self, x):
return (torch.exp(x) - torch.exp(-x))/(torch.exp(x) + torch.exp(-x))
torch.tanh
를 사용함.# init hidden state
if hidden is None:
hidden = torch.zeros(x.shape[0], self.hidden_size, device=self.device)
# output.shape: (batch_size, sequence_length, hidden_size)
outputs = []
for seq in range(x.shape[1]):
input_x = x[:,seq,:]
hidden = torch.tanh(F.linear(input_x, self.W_xh, self.b_xh) + F.linear(hidden, self.W_hh, self.b_hh))
outputs.append(hidden.unsqueeze(1))
outputs = torch.cat(outputs, dim=1)
return outputs
outputs
: Recurrent 연산을 진행하면서 생성된 각 셀의 output들의 모음이다.for문을 통해 각 sequence_length만큼 hidden_state를 연산하여 outputs list에 추가함.
여기서 sequence_length는 문장의 길이를 의미함. (정확하게는 embedding이 된 문장벡터)
이후 반복문을 통해 연산된 hidden_state를 torch.Tensor로 변환한 뒤 반환해줌.