pytorch로 LSTM 구현하기

정예슬·2022년 6월 21일
3

pytorch

목록 보기
13/14

LSTM(Long-short Term Memory)


장단기 메모리(LSTM)은 RNN의 기울기 소멸 문제를 해결하기 위해, 망각 게이트, 입력 게이트, 출력 게이트라는 새로운 요소를 은닉층의 각 뉴런에 추가했다.

LSTM 구조를 순전파역전파 과정으로 살펴보자.

  • 순전파

    1. 망각 게이트(forget gate)

      과거 정보를 어느 정도 기억할지 결정한다. 과거 정보와 현재 데이터를 입력받아 시그모이드를 취한 후, 그 값을 과거 정보에 곱한다. 시그모이드 출력이 0이면 과거 정보를 버리고, 1이면 과거 정보를 보존한다.

      0과 1 사이 출력값을 가지는 ht1h_{t-1}xtx_t를 입력 값으로 받는다. 이 때 xtx_t는 새로운 입력 값이고, ht1h_{t-1}은 이전 은닉층에서 입력되는 값이다. 즉, ht1h_{t-1}xtx_t를 이용하여 이전 상태 정보를 현재 메모리에 반영할지 결정한다.

      망각 게이트의 수식은 다음과 같다.

      ft=σ(wf[ht1,xt])f_t = σ(w_f[h_{t-1}, x_t])

      ct=ft×ct1c_t = f_t \times c_{t-1}

    2. 입력 게이트(input gate)

      현재 정보를 기억하기 위해 만들어졌다. 과거 정보와 현재 데이터를 입력받아 시그모이드하이퍼볼릭 탄젠트 함수를 기반으로 현재 정보에 대한 보존량을 결정한다.

      즉, 현재 메모리에 새로운 정보를 반영할 지 결정하는 역할을 한다. 계산값이 1이면 입력 xtx_t가 들어올 수 있도록 허용하고, 0이면 차단한다.

      입력 게이트의 수식은 다음과 같다.

      it=σ(wi[ht1,xt])i_t = σ(w_i[h_{t-1}, x_t])

      c~t=tanh(wc[ht1,xt])\tilde c_t = tanh(w_c[h_{t-1}, x_t])

      ct=ct1+it×ct~c_t = c_{t-1} + i_t \times \tilde{c_t}

    3. 셀(cell)

      각 단계에 대한 은닉 노드(hidden node)를 메모리 셀이라고 한다. ‘총합’을 사용하여 셀 값을 반영하며, 이를 통해 기울기 소멸 문제를 해결한다. 망각 게이트와 입력 게이트의 이전 단계 셀 정보를 계산하여 현재 단계의 셀 상태(cell state)를 업데이트 한다.

      다음은 셀에 대한 수식이다.

      ft=σ(wf[ht1,xt])f_t = σ(w_f[h_{t-1}, x_t])

      ct=ct1+it×c~tc_t = c_{t-1} + i_t \times \tilde c_t

  • 역전파

    LSTM은 셀을 통해 역전파를 수행하므로 중단 없는 기울기(uninterrupted gradient flow)라고도 한다. 최종 오차는 모든 노드에 전파되는데, 이 때 셀을 통해 중단 없이 전파된다.


LSTM 구현(pytorch)

import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.datasets as dataset
from torch.autograd import Variable
from torch.nn import Parameter
from torch import Tensor
import torch.nn.functional as F
from torch.utils.data import DataLoader
import math
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
cuda = True if torch.cuda.is_available() else False

Tensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor

torch.manual_seed(125)
if torch.cuda.is_available() : 
    torch.cuda.manual_seed_all(125)
import torchvision.transforms as transforms

mnist_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5), (1.0,))
])
from torchvision.datasets import MNIST

download_root = '../080289-main/chap07/MNIST_DATASET'

train_dataset = MNIST(download_root, transform=mnist_transform, 
                      train=True, download=True)
valid_dataset = MNIST(download_root, transform=mnist_transform,
                     train=False, download=True)
test_dataset = MNIST(download_root, transform=mnist_transform,
                    train=False, download=True)
batch_size = 64
train_loader = DataLoader(dataset=train_dataset, 
                         batch_size=batch_size,
                         shuffle=True)
valid_loader = DataLoader(dataset=test_dataset, 
                         batch_size=batch_size,
                         shuffle=True)
test_loader = DataLoader(dataset=test_dataset, 
                         batch_size=batch_size,
                         shuffle=True)

데이터셋 내려받고 로드

batch_size=100
n_iters=6000
num_epochs=n_iters/(len(train_dataset) / batch_size)
num_epochs = int(num_epochs)

변수 값 지정

LSTM Cell

class LSTMCell(nn.Module) :
    def __init__(self, input_size, hidden_size, bias=True) :
        super(LSTMCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.bias = bias
        self.x2h = nn.Linear(input_size, 4*hidden_size, bias=bias)
        self.h2h = nn.Linear(hidden_size, 4*hidden_size, bias=bias)
        self.reset_parameters()
        
    def reset_parameters(self) :
        std = 1.0 / math.sqrt(self.hidden_size)
        for w in self.parameters() :
            w.data.uniform_(-std, std)
            
    def forward(self, x, hidden) :
        hx, cx = hidden
        x = x.view(-1, x.size(1))
        
        gates = self.x2h(x) + self.h2h(hx)
        gates = gates.squeeze()
        ingate, forgetgate, cellgate, outgate = gates.chunk(4, 1)
        
        ingate = F.sigmoid(ingate) # 입력 게이트에 시그모이드 적용
        forgetgate = F.sigmoid(forgetgate) # 망각 게이트에 시그모이드 적용
        cellgate = F.tanh(cellgate) # 셀 게이트에 탄젠트 적용
        outgate = F.sigmoid(outgate) # 출력 게이트에 시그모이드 적용
        
        cy = torch.mul(cx, forgetgate) + torch.mul(ingate, cellgate)
        hy = torch.mul(outgate, F.tanh(cy))
        
        return (hy, cy)

LSTM cell 네트워크 구축

LSTM cell 네트워크

class LSTMModel(nn.Module) :
   def __init__(self, input_dim, hidden_dim, layer_dim, output_dim, bias=True) :
       super(LSTMModel, self).__init__()
       self.hidden_dim = hidden_dim
       
       self.layer_dim = layer_dim
       self.lstm = LSTMCell(input_dim, hidden_dim, layer_dim)
       self.fc = nn.Linear(hidden_dim, output_dim)
       
   def forward(self, x) :
       if torch.cuda.is_available() :
           h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).cuda())
       else :
           h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim))
           
       if torch.cuda.is_available() :
           c0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).cuda())
       else :
           c0 = Variable(torch.zeros(self.layer_dim, x.size(0), hidden_dim))
           
       outs = []
       cn =  c0[0,:,:]
       hn = h0[0,:,:]
       
       for seq in range(x.size(1)) :
           hn, cn = self.lstm(x[:, seq, :], (hn, cn))
           outs.append(hn)
           
       out = outs[-1].squeeze()
       out = self.fc(out)
       return out

LSTM 네트워크 구성

input_dim=28
hidden_dim=128
layer_dim=1
output_dim=18

model =  LSTMModel(input_dim, hidden_dim, layer_dim, output_dim) 
if torch.cuda.is_available() :
   model.cuda()
   
criterion = nn.CrossEntropyLoss()
learning_rate =0.1
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

옵티마이저 및 손실함수 정의

seq_dim=28
loss_list = []
iter = 0
for epoch in range(num_epochs) :
   for i, (images ,labels) in enumerate(train_loader) :
       if torch.cuda.is_available() :
           images = Variable(images.view(-1, seq_dim, input_dim).cuda())
           labels = Variable(labels.cuda())
       
       else :
           images = Variable(images.view(-1, seq_dim, input_dim))
           labels = Variable(labels)
           
       optimizer.zero_grad()
       outputs = model(images)
       loss = criterion(outputs, labels)
       
       if torch.cuda.is_available() :
           loss.cuda()
           
       loss.backward()
       optimizer.step()
       loss_list.append(loss.item())
       iter += 1
       
       
       if iter % 500 == 0 :
           correct = 0
           total = 0
           
           for images, labels in valid_loader :
               
               if torch.cuda.is_available() :
                   images = Variable(images.view(-1, seq_dim, input_dim).cuda())
               else :
                   images = Variable(images.view(-1, seq_dim, input_dim))
                   
               outputs = model(images)
               _, predicted = torch.max(outputs.data, 1)
               
               total += labels.size(0)
               if torch.cuda.is_available() : 
                   correct += (predicted.cpu() == labels.cpu()).sum()
                   
               else :
                   correct += (predicted == labels).sum()
                   
           accuracy = 100 * correct / total
           print(f'Iteration : {iter} Loss : {loss.item()} Accuracy : {accuracy}')
           

모델 학습 및 성능 체크

def evaluate(model, val_iter) :
    corrects, total, total_loss = 0, 0, 0
    model.eval()
    for images, labels in val_iter :
        if torch.cuda.is_available() :
            images = Variable(images.view(-1, seq_dim, input_dim).cuda())
        else :
            images = Variable(images.view(-1, seq_dim, input_dim).to(device))
            
        logit = model(images).to(device)
        loss = F.cross_entropy(logit, labels, reduction='sum')
        _, predicted = torch.max(logit.data, 1)
        total += labels.size(0)
        total_loss += loss.item()
        corrects += (predicted == labels).sum()
        
    avg_loss = total_loss / len(val_iter.dataset)
    avg_accuracy = corrects / total
    return avg_loss, avg_accuracy

테스트 데이터셋 검증

test_loss, test_acc = evaluate(model, test_loader)
print(f'Test Loss : {test_loss:5.2f} | Test Accuracy : {test_acc:5.2f}')

예측 결과 출력


LSTM 계층 구현

import os
import time

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from torch.autograd import Variable
from tqdm import tqdm_notebook
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

data = pd.read_csv('../080289-main/chap07/data/SBUX.csv')
print(data.dtypes)
print(data)

스타벅스 주가 데이터를 불러온다.

data['Date'] = pd.to_datetime(data['Date'])
data.set_index('Date', inplace=True)
data['Volume'] = data['Volume'].astype(float)
X = data.iloc[:, :-1]
y = data.iloc[:, 5:6]
print(X)
print(y)

레이블을 분리한다.

ms = MinMaxScaler()
ss = StandardScaler()

X_ss = ss.fit_transform(X)
y_ms = ms.fit_transform(y)

X_train = X_ss[:200, :]
X_test = X_ss[200:, :]

y_train = y_ms[:200, :]
y_test = y_ms[200:, :]

print('Training Shape :', X_train.shape, y_train.shape)
print('Testing Shape :', X_test.shape, y_test.shape)

스케일러를 적용한다.

# 데이터셋 형태, 크기 조정
X_train_tensors = Variable(torch.Tensor(X_train))
X_test_tensors = Variable(torch.Tensor(X_test))

y_train_tensors = Variable(torch.Tensor(y_train))
y_test_tensors = Variable(torch.Tensor(y_test))

X_train_tensors_f = torch.reshape(X_train_tensors, (X_train_tensors.shape[0],
                                                  1, X_train_tensors.shape[1]))
X_test_tensors_f = torch.reshape(X_test_tensors, (X_test_tensors.shape[0],
                                                 1, X_test_tensors.shape[1]))

print('Training Shape :', X_train_tensors_f.shape, y_train_tensors.shape)
print('Testing Shape :', X_test_tensors_f.shape, y_test_tensors.shape)

데이터셋 형태와 크기를 조정한다.

class LSTM(nn.Module) :
    def __init__(self, num_classes, input_size, hidden_size, num_layers, seq_length) :
        super(LSTM, self).__init__()
        self.num_classes = num_classes
        self.num_layers = num_layers
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.seq_length = seq_length
        
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size,
                           num_layers=num_layers, batch_first=True)
        self.fc_1 = nn.Linear(hidden_size, 128)
        self.fc = nn.Linear(128, num_classes)
        self.relu = nn.ReLU()
        
    def forward(self, x) :
        h_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))
        c_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))
        output, (hn, cn) = self.lstm(x, (h_0, c_0))
        hn = hn.view(-1, self.hidden_size)
        out = self.relu(hn)
        out = self.fc_1(out)
        out = self.relu(out)
        out = self.fc(out)
        return out

네트워크를 정의한다.

# 변수값 설정
num_epochs = 1000
lr = 0.0001

input_size=5
hidden_size=2
num_layers=1

num_classes=1
model = LSTM(num_classes, input_size, hidden_size, num_layers, X_train_tensors_f.shape[1])

criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

변수값을 설정한다.

for epoch in range(num_epochs) :
    outputs = model.forward(X_train_tensors_f)
    optimizer.zero_grad()
    loss = criterion(outputs, y_train_tensors)
    loss.backward()
    
    optimizer.step()
    if epoch % 100 == 0 :
        print(f'Epoch : {epoch}, loss : {loss.item():1.5f}')

모델을 학습한다.

df_x_ss = ss.transform(data.iloc[:, :-1])
df_y_ms = ms.transform(data.iloc[:, -1:])

df_x_ss = Variable(torch.Tensor(df_x_ss))
df_y_ms = Variable(torch.Tensor(df_y_ms))
df_x_ss = torch.reshape(df_x_ss, (df_x_ss.shape[0], 1, df_x_ss.shape[1]))
train_predict = model(df_x_ss)
predicted = train_predict.data.numpy()
label_y = df_y_ms.data.numpy()

predicted = ms.inverse_transform(predicted)
label_y = ms.inverse_transform(label_y)
plt.figure(figsize=(10, 6))
plt.axvline(x=200, c='r', linestyle='--')

plt.plot(label_y, label='Actual Data')
plt.plot(predicted, label='Predicted Data')
plt.title('Time-Series Prediction')
plt.legend()
plt.show()

예측 결과를 출력한다.


📚 reference

  • (길벗) 딥러닝 파이토치 교과서 / 서지영 지음
  • github
profile
춘식이랑 함께하는 개발일지.. 그런데 이제 먼작귀를 곁들인

0개의 댓글