장단기 메모리(LSTM)은 RNN의 기울기 소멸 문제를 해결하기 위해, 망각 게이트, 입력 게이트, 출력 게이트라는 새로운 요소를 은닉층의 각 뉴런에 추가했다.
LSTM 구조를 순전파
와 역전파
과정으로 살펴보자.
순전파
망각 게이트(forget gate)
과거 정보를 어느 정도 기억할지 결정한다. 과거 정보와 현재 데이터를 입력받아 시그모이드를 취한 후, 그 값을 과거 정보에 곱한다. 시그모이드 출력이 0이면 과거 정보를 버리고, 1이면 과거 정보를 보존한다.
0과 1 사이 출력값을 가지는 과 를 입력 값으로 받는다. 이 때 는 새로운 입력 값이고, 은 이전 은닉층에서 입력되는 값이다. 즉, 과 를 이용하여 이전 상태 정보를 현재 메모리에 반영할지 결정한다.
망각 게이트의 수식은 다음과 같다.
입력 게이트(input gate)
현재 정보를 기억하기 위해 만들어졌다. 과거 정보와 현재 데이터를 입력받아 시그모이드와 하이퍼볼릭 탄젠트 함수를 기반으로 현재 정보에 대한 보존량을 결정한다.
즉, 현재 메모리에 새로운 정보를 반영할 지 결정하는 역할을 한다. 계산값이 1이면 입력 가 들어올 수 있도록 허용하고, 0이면 차단한다.
입력 게이트의 수식은 다음과 같다.
셀(cell)
각 단계에 대한 은닉 노드(hidden node)를 메모리 셀
이라고 한다. ‘총합’을 사용하여 셀 값을 반영하며, 이를 통해 기울기 소멸 문제를 해결한다. 망각 게이트와 입력 게이트의 이전 단계 셀 정보를 계산하여 현재 단계의 셀 상태(cell state)를 업데이트 한다.
다음은 셀에 대한 수식이다.
역전파
LSTM은 셀을 통해 역전파를 수행하므로 중단 없는 기울기(uninterrupted gradient flow)라고도 한다. 최종 오차는 모든 노드에 전파되는데, 이 때 셀을 통해 중단 없이 전파된다.
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.datasets as dataset
from torch.autograd import Variable
from torch.nn import Parameter
from torch import Tensor
import torch.nn.functional as F
from torch.utils.data import DataLoader
import math
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
cuda = True if torch.cuda.is_available() else False
Tensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor
torch.manual_seed(125)
if torch.cuda.is_available() :
torch.cuda.manual_seed_all(125)
import torchvision.transforms as transforms
mnist_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5), (1.0,))
])
from torchvision.datasets import MNIST
download_root = '../080289-main/chap07/MNIST_DATASET'
train_dataset = MNIST(download_root, transform=mnist_transform,
train=True, download=True)
valid_dataset = MNIST(download_root, transform=mnist_transform,
train=False, download=True)
test_dataset = MNIST(download_root, transform=mnist_transform,
train=False, download=True)
batch_size = 64
train_loader = DataLoader(dataset=train_dataset,
batch_size=batch_size,
shuffle=True)
valid_loader = DataLoader(dataset=test_dataset,
batch_size=batch_size,
shuffle=True)
test_loader = DataLoader(dataset=test_dataset,
batch_size=batch_size,
shuffle=True)
데이터셋 내려받고 로드
batch_size=100
n_iters=6000
num_epochs=n_iters/(len(train_dataset) / batch_size)
num_epochs = int(num_epochs)
변수 값 지정
class LSTMCell(nn.Module) :
def __init__(self, input_size, hidden_size, bias=True) :
super(LSTMCell, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.bias = bias
self.x2h = nn.Linear(input_size, 4*hidden_size, bias=bias)
self.h2h = nn.Linear(hidden_size, 4*hidden_size, bias=bias)
self.reset_parameters()
def reset_parameters(self) :
std = 1.0 / math.sqrt(self.hidden_size)
for w in self.parameters() :
w.data.uniform_(-std, std)
def forward(self, x, hidden) :
hx, cx = hidden
x = x.view(-1, x.size(1))
gates = self.x2h(x) + self.h2h(hx)
gates = gates.squeeze()
ingate, forgetgate, cellgate, outgate = gates.chunk(4, 1)
ingate = F.sigmoid(ingate) # 입력 게이트에 시그모이드 적용
forgetgate = F.sigmoid(forgetgate) # 망각 게이트에 시그모이드 적용
cellgate = F.tanh(cellgate) # 셀 게이트에 탄젠트 적용
outgate = F.sigmoid(outgate) # 출력 게이트에 시그모이드 적용
cy = torch.mul(cx, forgetgate) + torch.mul(ingate, cellgate)
hy = torch.mul(outgate, F.tanh(cy))
return (hy, cy)
LSTM cell 네트워크 구축
class LSTMModel(nn.Module) :
def __init__(self, input_dim, hidden_dim, layer_dim, output_dim, bias=True) :
super(LSTMModel, self).__init__()
self.hidden_dim = hidden_dim
self.layer_dim = layer_dim
self.lstm = LSTMCell(input_dim, hidden_dim, layer_dim)
self.fc = nn.Linear(hidden_dim, output_dim)
def forward(self, x) :
if torch.cuda.is_available() :
h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).cuda())
else :
h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim))
if torch.cuda.is_available() :
c0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).cuda())
else :
c0 = Variable(torch.zeros(self.layer_dim, x.size(0), hidden_dim))
outs = []
cn = c0[0,:,:]
hn = h0[0,:,:]
for seq in range(x.size(1)) :
hn, cn = self.lstm(x[:, seq, :], (hn, cn))
outs.append(hn)
out = outs[-1].squeeze()
out = self.fc(out)
return out
LSTM 네트워크 구성
input_dim=28
hidden_dim=128
layer_dim=1
output_dim=18
model = LSTMModel(input_dim, hidden_dim, layer_dim, output_dim)
if torch.cuda.is_available() :
model.cuda()
criterion = nn.CrossEntropyLoss()
learning_rate =0.1
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
옵티마이저 및 손실함수 정의
seq_dim=28
loss_list = []
iter = 0
for epoch in range(num_epochs) :
for i, (images ,labels) in enumerate(train_loader) :
if torch.cuda.is_available() :
images = Variable(images.view(-1, seq_dim, input_dim).cuda())
labels = Variable(labels.cuda())
else :
images = Variable(images.view(-1, seq_dim, input_dim))
labels = Variable(labels)
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
if torch.cuda.is_available() :
loss.cuda()
loss.backward()
optimizer.step()
loss_list.append(loss.item())
iter += 1
if iter % 500 == 0 :
correct = 0
total = 0
for images, labels in valid_loader :
if torch.cuda.is_available() :
images = Variable(images.view(-1, seq_dim, input_dim).cuda())
else :
images = Variable(images.view(-1, seq_dim, input_dim))
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
if torch.cuda.is_available() :
correct += (predicted.cpu() == labels.cpu()).sum()
else :
correct += (predicted == labels).sum()
accuracy = 100 * correct / total
print(f'Iteration : {iter} Loss : {loss.item()} Accuracy : {accuracy}')
모델 학습 및 성능 체크
def evaluate(model, val_iter) :
corrects, total, total_loss = 0, 0, 0
model.eval()
for images, labels in val_iter :
if torch.cuda.is_available() :
images = Variable(images.view(-1, seq_dim, input_dim).cuda())
else :
images = Variable(images.view(-1, seq_dim, input_dim).to(device))
logit = model(images).to(device)
loss = F.cross_entropy(logit, labels, reduction='sum')
_, predicted = torch.max(logit.data, 1)
total += labels.size(0)
total_loss += loss.item()
corrects += (predicted == labels).sum()
avg_loss = total_loss / len(val_iter.dataset)
avg_accuracy = corrects / total
return avg_loss, avg_accuracy
테스트 데이터셋 검증
test_loss, test_acc = evaluate(model, test_loader)
print(f'Test Loss : {test_loss:5.2f} | Test Accuracy : {test_acc:5.2f}')
예측 결과 출력
import os
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from torch.autograd import Variable
from tqdm import tqdm_notebook
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
data = pd.read_csv('../080289-main/chap07/data/SBUX.csv')
print(data.dtypes)
print(data)
스타벅스 주가 데이터를 불러온다.
data['Date'] = pd.to_datetime(data['Date'])
data.set_index('Date', inplace=True)
data['Volume'] = data['Volume'].astype(float)
X = data.iloc[:, :-1]
y = data.iloc[:, 5:6]
print(X)
print(y)
레이블을 분리한다.
ms = MinMaxScaler()
ss = StandardScaler()
X_ss = ss.fit_transform(X)
y_ms = ms.fit_transform(y)
X_train = X_ss[:200, :]
X_test = X_ss[200:, :]
y_train = y_ms[:200, :]
y_test = y_ms[200:, :]
print('Training Shape :', X_train.shape, y_train.shape)
print('Testing Shape :', X_test.shape, y_test.shape)
스케일러를 적용한다.
# 데이터셋 형태, 크기 조정
X_train_tensors = Variable(torch.Tensor(X_train))
X_test_tensors = Variable(torch.Tensor(X_test))
y_train_tensors = Variable(torch.Tensor(y_train))
y_test_tensors = Variable(torch.Tensor(y_test))
X_train_tensors_f = torch.reshape(X_train_tensors, (X_train_tensors.shape[0],
1, X_train_tensors.shape[1]))
X_test_tensors_f = torch.reshape(X_test_tensors, (X_test_tensors.shape[0],
1, X_test_tensors.shape[1]))
print('Training Shape :', X_train_tensors_f.shape, y_train_tensors.shape)
print('Testing Shape :', X_test_tensors_f.shape, y_test_tensors.shape)
데이터셋 형태와 크기를 조정한다.
class LSTM(nn.Module) :
def __init__(self, num_classes, input_size, hidden_size, num_layers, seq_length) :
super(LSTM, self).__init__()
self.num_classes = num_classes
self.num_layers = num_layers
self.input_size = input_size
self.hidden_size = hidden_size
self.seq_length = seq_length
self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size,
num_layers=num_layers, batch_first=True)
self.fc_1 = nn.Linear(hidden_size, 128)
self.fc = nn.Linear(128, num_classes)
self.relu = nn.ReLU()
def forward(self, x) :
h_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))
c_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))
output, (hn, cn) = self.lstm(x, (h_0, c_0))
hn = hn.view(-1, self.hidden_size)
out = self.relu(hn)
out = self.fc_1(out)
out = self.relu(out)
out = self.fc(out)
return out
네트워크를 정의한다.
# 변수값 설정
num_epochs = 1000
lr = 0.0001
input_size=5
hidden_size=2
num_layers=1
num_classes=1
model = LSTM(num_classes, input_size, hidden_size, num_layers, X_train_tensors_f.shape[1])
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
변수값을 설정한다.
for epoch in range(num_epochs) :
outputs = model.forward(X_train_tensors_f)
optimizer.zero_grad()
loss = criterion(outputs, y_train_tensors)
loss.backward()
optimizer.step()
if epoch % 100 == 0 :
print(f'Epoch : {epoch}, loss : {loss.item():1.5f}')
모델을 학습한다.
df_x_ss = ss.transform(data.iloc[:, :-1])
df_y_ms = ms.transform(data.iloc[:, -1:])
df_x_ss = Variable(torch.Tensor(df_x_ss))
df_y_ms = Variable(torch.Tensor(df_y_ms))
df_x_ss = torch.reshape(df_x_ss, (df_x_ss.shape[0], 1, df_x_ss.shape[1]))
train_predict = model(df_x_ss)
predicted = train_predict.data.numpy()
label_y = df_y_ms.data.numpy()
predicted = ms.inverse_transform(predicted)
label_y = ms.inverse_transform(label_y)
plt.figure(figsize=(10, 6))
plt.axvline(x=200, c='r', linestyle='--')
plt.plot(label_y, label='Actual Data')
plt.plot(predicted, label='Predicted Data')
plt.title('Time-Series Prediction')
plt.legend()
plt.show()
예측 결과를 출력한다.