pytorch로 GRU 구현하기

정예슬·2022년 6월 22일
0

pytorch

목록 보기
14/14
post-thumbnail

GRU(gate recurrent unit)


GRU(게이트 순환 유닛)는 LSTM에서 사용하는 망각 게이트와 입력 게이트를 하나로 합친 것이며, 별도의 업데이트 게이트로 구성되어 있다.

하나의 게이트 컨트롤러(gate controller)가 망각 게이트와 입력 게이트를 모두 제어한다. 게이트 컨트롤러가 1을 출력하면 망각 게이트가 열리고 입력 게이트는 닫히며, 반대로 0을 출력하면 망각 게이트는 닫히고 입력 게이트가 열린다. 즉, 이전 기억이 저장될 때마다 단계별 입력이 삭제된다.

GRU는 출력 게이트가 없어 전체 상태 벡터가 매 단계마다 출력되며, 이전 상태 어느 부분이 출력될 지 제어하는 새로운 게이트 컨트롤러가 별도로 존재한다.

  • 망각 게이트(reset gate)
    과거 정보를 적당히 초기화(reset)시키기 위해 시그모이드 함수를 출력으로 이용하여 (0, 1)값을 이전 은닉층에 곱한다. 이전 시점 은닉층 값에 현 시점의 정보에 대한 가중치를 곱한 것이다.

망각 게이트의 수식은 다음과 같다.

rt=σ(Wr×[ht1,xt])r_t = σ(W_r \times [h_{t-1}, x_t])

  • 업데이트 게이트(update gate)

    과거와 현재 정보의 최신화 비율을 결정하는 역할을 한다. 시그모이드로 출력된 결과(ztz_t)는 현 시점의 정보량을 결정하고 1에서 뺀 값(1zt1-z_t)을 직전 시점 은닉층 정보와 곱한다.

    업데이트 게이트의 수식은 다음과 같다.

    zt=σ(Wz×[ht1,xt])z_t = σ(W_z \times [h_{t-1}, x_t])

  • 후보군(candidate)

    현 시점의 정보에 대한 후보군을 계산한다. 과거 은닉층 정보를 그대로 이용하지 않고 망각 게이트의 결과를 이용하여 후보군을 계산한다.

    후보군 계산 수식은 다음과 같다.

    h~t=tanh(W×[rtht1,xt])\tilde h_t = tanh(W\times [r_t * h_{t-1}, x_t])

  • 은닉층 계산

    업데이트 게이트 결과후보군 결과를 결합하여 현 시점 은닉층을 계산한다. 시그모이드 함수의 결과는 현 시점에서 결과에 대한 정보량을 결정하고, 1-시그모이드 함수의 결과는 과거 정보량을 결정한다.

    은닉층 계산 수식은 다음과 같다.

    ht=(1zt)ht1+zt×h~th_t = (1 - z_t) * h_{t-1} + z_t \times \tilde h_t


GRU 구현(pytorch)

앞선 LSTM 편의 계층 구현과 동일한 데이터, 방식으로 진행한다. 셀과 네트워크만 GRU로 변경한 것!

GRU Cell

class GRUCell(nn.Module) :
    def __init__(self, input_size, hidden_size, bias=True) :
        super(GRUCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.bias = bias
        self.x2h = nn.Linear(input_size, 3 * hidden_size, bias=bias)
        self.h2h = nn.Linear(hidden_size, 3*hidden_size, bias=bias)
        self.reset_parameters()
        
    def reset_parameters(self) :
        std = 1.0 / math.sqrt(self.hidden_size)
        for w in self.parameters() : 
            w.data.uniform_(-std, std)
            
    def forward(self, x, hidden) : 
        x = x.view(-1, x.size(1))
        
        gate_x = self.x2h(x)
        gate_h = self.h2h(hidden)
        
        gate_x = gate_x.squeeze()
        gate_h = gate_h.squeeze()
        
        i_r, i_i, i_n = gate_x.chunk(3, 1)
        h_r, h_i, h_n = gate_h.chunk(3, 1)
        
        resetgate = F.sigmoid(i_r + h_r)
        inputgate = F.sigmoid(i_i + h_i)
        newgate = F.tanh(i_n + (resetgate * h_n))
        
        hy = newgate + inputgate * (hidden - newgate)
        return hy

LSTM 셀에서는 hidden size에 4를 곱하지만, GRU 셀에서는 2개의 게이트(망각, 입력 게이트)와 탄젠트 활성화 함수가 사용되므로 3을 곱한다.

GRU Model Network

import torch
import torch.nn
import torchvision.transforms as transforms
import torchvision.datasets
from torch.autograd import Variable
from torch.nn import Parameter
import torch.nn as nn
from torch import Tensor
import torch.nn.functional as F
from torch.utils.data import DataLoader
import math

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

cuda = True if torch.cuda.is_available() else False
Tensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor

torch.manual_seed(125)

if torch.cuda.is_available() :
    torch.cuda.manual_seed_all(125)
mnist_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, ), (1.0, )) # 정규화    
])

from torchvision.datasets import MNIST
download_root = '../080289-main/chap07/MNIST_DATASET'

train_dataset = MNIST(download_root, transform=mnist_transform, train=True,
                    download=True)
valid_dataset = MNIST(download_root, transform=mnist_transform, train=False,
                     download=True)
test_dataset = MNIST(download_root, transform=mnist_transform, train=False,
                    download=True)
batch_size=64
train_loader = DataLoader(dataset=train_dataset,
                         batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(dataset=test_dataset,
                         batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset,
                        batch_size=batch_size, shuffle=True)
batch_size=100
n_iters=6000
num_epochs = n_iters/(len(train_dataset)/batch_size)
num_epochs = int(num_epochs)
class GRUModel(nn.Module) :
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim, bias=True) :
        super(GRUModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.layer_dim = layer_dim
        self.gru_cell = GRUCell(input_dim, hidden_dim, layer_dim)
        self.fc = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x) :
        if torch.cuda.is_available() :
            h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).cuda())
        else :
            h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim))
            
        outs = []
        hn = h0[0, :, :]
        
        for seq in range(x.size(1)) : 
            hn = self.gru_cell(x[:, seq, :], hn)
            outs.append(hn)
        out = outs[-1].squeeze()
        out = self.fc(out)
        return out
# 옵티마이저, 손실 함수
input_dim = 28
hidden_dim = 128
layer_dim = 1
output_dim = 10

model = GRUModel(input_dim, hidden_dim, layer_dim, output_dim)

if torch.cuda.is_available() :
    model.cuda()
    
criterion = nn.CrossEntropyLoss()
learning_rate = 0.1

optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
# 학습 및 검증
seq_dim = 28
loss_list = []
iter = 0
for epoch in range(num_epochs) :
    for i, (images, labels) in enumerate(train_loader) :
        if torch.cuda.is_available() :
            images = Variable(images.view(-1, seq_dim, input_dim).cuda())
            labels = Variable(labels.cuda())
            
        else :
            images = Variable(images.view(-1, seq_dim, input_dim))
            labels = Variable(labels)
            
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        if torch.cuda.is_available() : 
            loss.cuda()
            
        loss.backward()
        optimizer.step()
        
        loss_list.append(loss.item())
        iter += 1
        
        if iter % 500 == 0 :
            correct = 0
            total = 0
            for images, labels in valid_loader : 
                if torch.cuda.is_available() :
                    images = Variable(images.view(-1, seq_dim, input_dim).cuda())
                else :
                    images = Variable(images.view(-1, seq_dim, input_dim))
                    
                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                
                if torch.cuda.is_available() :
                    correct += (predicted.cpu() == labels.cpu()).sum()
                else :
                    correct += (predicted == labels).sum()
                    
            accuracy = 100 * correct / total
            print(f'Iteration : {iter}. Loss : {loss.item()}. Accuracy : {accuracy}')
def evaluate(model, val_iter) : 
    corrects, total, total_loss = 0, 0, 0
    model.eval()
    for images, labels in val_iter :
        if torch.cuda.is_available() :
            images = Variable(images.view(-1, seq_dim, input_dim).cuda())
        else :
            images = Variable(images.view(-1, seq_dim, input_dim)).to(device)
            
        logit = model(images).to(device)
        loss = F.cross_entropy(logit, labels, reduction='sum')
        _, predicted = torch.max(logit.data, 1)
        total += labels.size(0)
        total_loss += loss.item()
        corrects += (predicted == labels).sum()
        
    avg_loss = total_loss / len(val_iter.dataset)
    avg_accuracy = corrects / total
    return avg_loss, avg_accuracy
test_loss, test_acc = evaluate(model, test_loader)
print(f'Test Loss : {test_loss:5.2f} | Test Accuracy : {test_acc:5.2f}')

GRU 계층 구현

앞선 LSTM 편의 계층 구현과 동일한 데이터, 방식으로 진행한다.

import os
import time

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from torch.autograd import Variable
from tqdm import tqdm_notebook
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split
device =torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
data = pd.read_csv('../080289-main/chap07/data/SBUX.csv')
print(data.dtypes)
data['Date'] = pd.to_datetime(data['Date'])
data.set_index('Date', inplace=True)
data['Volume'] = data['Volume'].astype(float)
X = data.iloc[:, :-1]
y = data.iloc[:, 5:6]
print(X)
print(y)
ms = MinMaxScaler()
ss = StandardScaler()

X_ss = ss.fit_transform(X)
y_ms = ms.fit_transform(y)

X_train = X_ss[:200, :]
X_test = X_ss[200:, :]

y_train = y_ms[:200, :]
y_test = y_ms[200:, :]

print('Training Shape :', X_train.shape, y_train.shape)
print('Testing Shape :', X_test.shape, y_test.shape)
X_train_tensors = Variable(torch.Tensor(X_train))
X_test_tensors = Variable(torch.Tensor(X_test))

y_train_tensors = Variable(torch.Tensor(y_train))
y_test_tensors = Variable(torch.Tensor(y_test))

X_train_tensors_f = torch.reshape(X_train_tensors, 
                                  (X_train_tensors.shape[0], 1, X_train_tensors.shape[1]))

X_test_tensors_f = torch.reshape(X_test_tensors,
                                (X_test_tensors.shape[0], 1, X_test_tensors.shape[1]))

print('Training Shape :', X_train.shape, y_train.shape)
print('Testing Shape :', X_test.shape, y_test.shape)
class GRU(nn.Module) :
    def __init__(self, num_classes, input_size, hidden_size, num_layers, seq_length) :
        super(GRU, self).__init__()
        self.num_classes = num_classes
        self.num_layers = num_layers
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.seq_length = seq_length
        
        self.gru = nn.GRU(input_size=input_size,hidden_size=hidden_size,
                         num_layers=num_layers,batch_first=True)
        self.fc_1 = nn.Linear(hidden_size, 128)
        self.fc = nn.Linear(128, num_classes)
        self.relu = nn.ReLU()
        
    def forward(self, x) :
        h_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))
        output, (hn) = self.gru(x, (h_0))
        hn = hn.view(-1, self.hidden_size)
        out = self.relu(hn)
        out = self.fc_1(out)
        out = self.relu(out)
        out = self.fc(out)
        return out
num_epochs = 1000
learning_rate = 0.0001

input_size=5
hidden_size=2
num_layers=1

num_classes=1
model=GRU(num_classes,input_size,hidden_size,num_layers,X_train_tensors_f.shape[1])

criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
for epoch in range(num_epochs) :
    outputs = model.forward(X_train_tensors_f)
    optimizer.zero_grad()
    loss = criterion(outputs, y_train_tensors)
    loss.backward()
    
    optimizer.step()
    if epoch % 100 == 0 :
        print(f'Epoch : {epoch}, loss : {loss.item():1.5f}')
df_x_ss = ss.transform(data.iloc[:, :-1])
df_y_ms = ms.transform(data.iloc[:, -1:])

df_x_ss = Variable(torch.Tensor(df_x_ss))
df_y_ms = Variable(torch.Tensor(df_y_ms))
df_x_ss = torch.reshape(df_x_ss, (df_x_ss.shape[0], 1, df_x_ss.shape[1]))
train_predict = model(df_x_ss)
predicted = train_predict.data.numpy()

label_y = df_y_ms.data.numpy()

predicted = ms.inverse_transform(predicted)
label_y = ms.inverse_transform(label_y)
plt.figure(figsize=(10, 6))
plt.axvline(x=200, c='r', linestyle='--')

plt.plot(label_y, label='Actual Data')
plt.plot(predicted, label='Predicted Data')
plt.title('Time-series Prediction')
plt.legend()
plt.show()


📚 reference

  • (길벗) 딥러닝 파이토치 교과서 / 서지영 지음
  • github
profile
춘식이랑 함께하는 개발일지

2개의 댓글

comment-user-thumbnail
2024년 3월 19일

안녕하세요, 딥러닝 공부하고 있는 한 학생입니다.
GRU 공부하는데 정말 큰 도움 됐어요

근데 GRUCell에 있는 hy = newgate + inputgate (hidden - newgate) 여기서
마지막 output값으로 hy = (1-inputgate)
hidden + inputgatenewgate 의 식으로 계산하면,
hy = hideen + inputgate
(newgate - hidden) 아닌가요,,? 공부하다가 마지막 hy값이 계산한 거랑 달라서 질문 드립니다!

1개의 답글