1) 오늘은 시계열 데이터 예측을 해보려고 한다.
2) RNN에 대한 기본적인 개념과 지식은 어느 정도 알고 있다고 가정하고 시작하겠다.
3) 성능은 장담하지 않는다. (시계열 데이터 전처리 및 도메인 지식 부재)
: Kaggle API -> Import -> Data -> Dataset -> DataLoader -> Model -> Loss Function and Optimizer -> train_one_epoch() and valid_one_epoch() -> run_train()
: Colab Cell에서 다음과 같이 입력하여 Kaggle Dataset을 한 번에 다운로드할 수 있다.
import os
# os.environ을 이용하여 Kaggle API Username, Key 세팅하기
os.environ['KAGGLE_USERNAME'] = ############
os.environ['KAGGLE_KEY'] = ############
# Linux 명령어로 Kaggle API를 이용하여 데이터셋 다운로드하기 (!kaggle ~)
!kaggle datasets download -d mczielinski/bitcoin-historical-data
# Linux 명령어로 압축 해제하기
!unzip '*.zip'
압축 해제가 끝났다면, 어떤 파일들이 있는지 살펴보자.
!ls
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
: nn을 이용해서 nn.Module을 상속받아서 다음 예시처럼 Model을 빌드한다. 그래서 필요하다.
class Model(nn.Module):
def __init__(self, input_dim = 8, output_dim = 1):
super().__init__()
self.fc1 = nn.Linear()
...
: Dataset과 DataLoader는 데이터를 배치 단위로 학습할 데이터(x)와 정답 데이터(y)를 묶어서 뱉어주는 역할을 한다.
: Kaggle에 Bitcoin Historical Data 라는 데이터셋을 사용한다.
base_path = '/content/bitstampUSD_1-min_data_2012-01-01_to_2021-03-31.csv'
df = pd.read_csv(base_path)
df = df[df.Open.notnull()].reset_index(drop = True)
# 결측치 제거 및 index 초기화
num = int(df.shape[0] * .01)
print(num)
df = df[:num] # 1%의 데이터만 사용!
print(df.shape)
df.head()
: Dataset을 상속받아서 MyDataset 클래스를 만들어준다.
sl = 12
class MyDataset(Dataset):
def __init__(self, df = df, days = sl):
self.days = days # RNNModel에서 Sequence Length가 된다.
# x
self.x = df.iloc[:, 1:4].values # Open, High, Low
self.x = self.x / np.max(self.x) # Scaling
# y
self.y = df.iloc[:, 4:5].values # Close
self.y = self.y / np.max(self.y) # Scaling
def __len__(self):
# 전체 길이 정보를 반환.
return self.x.shape[0] - self.days
def __getitem__(self, index):
# index를 통해서 row 하나를 특정.
x = self.x[index: index + self.days]
# index ~ index + 3 -> x's Shape: [12, 3]
y = self.y[index + self.days]
# index + 3 -> y's Shape: [1]
return torch.tensor(x, dtype = torch.float), torch.tensor(y, dtype = torch.float)
# Shape: [12, 3] Shape: [1]
## 다른 데서 코드 가져옴
for fold, ( _, val_) in enumerate(skf.split(X=df, y=df['target'])):
df.loc[val_ , "kfold"] = int(fold)
# _, val_ : index 넘버
return torch.tensor(x, dtype = torch.float), torch.tensor(y, dtype = torch.float)
: 여기서는 학습에 사용할 데이터와 성능 검증에 필요한 데이터로 쪼개준다. 그리고 각 데이터에 대해서 배치 단위로 뱉어줄 수 있도록 DataLoader 객체를 각각 만들어준다. 이 과정을 prepare_loaders() 함수에 담았다.
def prepare_loaders(df = df, index_num = int(df.shape[0] * .7), bs = 128 * 2):
# train, valid split (7:3 Split)
train = df[:index_num].reset_index(drop = True)
valid = df[index_num:].reset_index(drop = True)
# train_ds, valid_ds MyDataset(Dataset)
train_ds = MyDataset(df = train)
valid_ds = MyDataset(df = valid)
# train_loader, valid_loader를 만들어준다.
# 시계열 데이터이기 때문에 Shuffle 에 대해서는 False로 주었다.
train_loader = DataLoader(train_ds, shuffle = False, batch_size = bs)
valid_loader = DataLoader(valid_ds, shuffle = False, batch_size = bs)
return train_loader, valid_loader
: Data의 Shape 추적이 가장 중요하다. 그래서 여기서도 확인해볼 필요가 있다. 아마 x에 해당하는 부분의 Shape은 [256, 12, 3], y에 해당하는 부분의 Shape은 [256, 1] 으로 나올 것이다.
## train_loader가 뱉는 배치 크기 확인해보자
data = next(iter(train_loader))
data[0].shape, data[1].shape
: torch.tensor를 비롯해서 train_loader, valid_loader에서 나오는 배치들과 Model의 layer들을 모두 GPU로 보내기 위한 코드이다. M1, Colab에서 적용할 수 있는 코드이며, '지금 GPU 쓸 수 있니? 없니?' 라고 확인 후, if else 조건문을 통해 GPU로 보내는 코드이다.
# Colab
# device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
# M1 버전
device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")
device
: nn.Module을 상속받아서 Model 클래스를 빌드한다.
: RNN 계열의
class RNNModel(nn.Module):
def __init__(self,
input_size = 3, # open, high, low
hidden_size = 100,
num_layers= 3,
sequence_length = sl, # sl = 12
device = device
):
super().__init__()
self.num_layers = num_layers
self.sequence_length = sequence_length
self.hidden_size = hidden_size
self.device = device
# RNN 레이어
# https://pytorch.org/docs/stable/generated/torch.nn.RNN.html
self.rnn = nn.RNN(input_size = input_size,
hidden_size = hidden_size,
num_layers = num_layers,
batch_first = True,
bidirectional = False)
# input(=x)'s shape: [bs, sl, input_size]
# input_h0 = [num_layers, bs, hidden_size]
# output(=y)'s shape: [bs, sl, hidden_size]
# output_h = [num_layers, bs, hidden_size]
# [bs, sl, hidden_size] -> [bs, sl * hidden_size]
# 1) 3차원 -> 2차원
k = self.sequence_length * self.hidden_size
# 2) Use output's Last Sequence Length
# k = 1 * self.hidden_size
# Fully Connected Layer
self.seq = nn.Sequential(
nn.Linear(k, 256),
nn.LeakyReLU(),
nn.Linear(256, 1)
# [bs, k] -> [bs, 256] -> [bs, 1]
)
def forward(self, x):
# x: [bs, sl, input_size]
bs = x.shape[0]
h0 = torch.zeros(self.num_layers, bs, self.hidden_size).to(self.device)
# h0: [num_layers, bs, hidden_size]
output, h_n = self.rnn(x, h0)
# output's shape: [bs, sl, hidden_size]
# h_n = [num_layers, bs, hidden_size]
# 1) 3차원 -> 2차원
output = output.reshape(bs, -1)
# [bs, sl, hidden_size] -> [bs, sl * hidden_size]
# 2) Use output's Last Sequence Length
# output = output[:, -1] # [bs, hidden_size]
# [bs, k] -> [bs, 256] ->[bs, 1]
y_pred = self.seq(output)
# y_pred: [bs, 1]
return y_pred
model = RNNModel().to(device) # GPU로 보내준다.
: Loss Function과 Optimizer를 선언한다.
# loss_fn도 to(device)가 가능하다. (= GPU로 보내줄 수 있다.)
loss_fn = nn.MSELoss().to(device)
# optimizer는 to(device)가 불가능하다. (= GPU로 보내줄 수 없다.)
optimizer = torch.optim.Adam(model.parameters()) # lr = 1e-3
: model이 한 epoch를 도는 동안, model이 학습되는 과정과 train_epoch_loss(=epoch당 평균 train loss)를 return 시키는 함수이다.
def train_one_epoch(model = model,
dataloader = train_loader,
loss_fn = loss_fn,
optimizer = optimizer,
device = device):
# model을 학습시키겠다고 선언
model.train()
train_loss = 0
for data in dataloader:
# train_loader = [([bs, 12, 3], [bs, 1]), ([bs, 12, 3], [bs, 1]), .... ]
x = data[0].to(device) # [bs, 12, 3] # GPU로 보내준다.
y = data[1].to(device) # [bs, 1] # GPU로 보내준다.
y_pred = model(x) # [bs, 1] # model을 통해 예측한 값
loss = loss_fn(y_pred, y) # MSE Loss
optimizer.zero_grad() # Gradient 초기화
loss.backward() # 역전파할 Gradient 구함
optimizer.step() # 역전파할 Gradient 토대로 Weight 업데이트
train_loss += loss.item()
train_epoch_loss = train_loss / len(dataloader) # epoch 당 train loss 평균값
return train_epoch_loss
: model이 한 epoch를 도는 동안, valid_epoch_loss(=epoch당 평균 valid loss)를 return 시키는 함수이다.
# @torch.no_grad() : 데코레이터 형태의 'model을 학습 시키지 않겠다는 필수 의지 표명 2'
# --> 근데 함수형일 때 쓸 수 있는 것으로 알고 있다.
# --> model을 학습 시키지 않겠다는 필수 의지 표명 1과 2 중 하나만 써도 무방하다.
@torch.no_grad()
def valid_one_epoch(model = model,
dataloader = valid_loader,
loss_fn = loss_fn,
device = device):
# 여기서는 optimizer가 필요없다. 학습을 하지 않기 때문
# model을 학습 안 시키겠다고 선언
model.eval()
# 예측값들과 실제값들을 담을 빈 리스트를 선언
# 예측값과 실제값을 볼 수 있는 시각화 용도
preds = []
trues = []
valid_loss = 0
# model을 학습 시키지 않겠다는 필수 의지 표명 1
with torch.no_grad():
for data in dataloader:
# valid_loader = [([bs, 12, 3], [bs, 1]), ([bs, 12, 3], [bs, 1]), .... ]
x = data[0].to(device) # [bs, 12, 3] # GPU로 보내준다.
y = data[1].to(device) # [bs, 1] # GPU로 보내준다.
y_pred = model(x) # [bs, 1] # model을 통해 예측한 값
loss = loss_fn(y_pred, y) # MSE Loss - 성능평가
valid_loss += loss.item()
# 예측값들 줍줍
preds.append(y_pred)
# 실제값들 줍줍
trues.append(y)
valid_epoch_loss = valid_loss / len(dataloader) # epoch 당 valid loss 평균값
# 줍줍한 예측값들을 concat
preds_cat = torch.cat(preds, dim = 0)
# 줍줍한 실제값들을 concat
trues_cat = torch.cat(trues, dim = 0)
return valid_epoch_loss, trues_cat, preds_cat
: 여기서는 전반적인 train 과정을 담았다.
def run_train(model = model,
loss_fn = loss_fn,
optimizer = optimizer,
train_loader = train_loader,
valid_loader = valid_loader,
device = device):
n_epochs = 200
print_iter =20
best_model = None
early_stop = 30
lowest_loss, lowest_epoch = np.inf, np.inf
train_hs, valid_hs = [], []
result = dict()
for epoch in range(n_epochs):
train_loss = train_one_epoch(model = model, dataloader = train_loader, loss_fn = loss_fn, optimizer=optimizer, device = device)
valid_loss, trues_cat, preds_cat = valid_one_epoch(model = model, dataloader = valid_loader, loss_fn = loss_fn, device = device)
# rues_cat, preds_cat은 각각 한 에폭에서 예측값, 실제값들을 담은 리스트.
# Valid 데이터 대상
# 매 epoch 마다 train_loss, valid_loss를 줍줍
train_hs.append(train_loss)
valid_hs.append(valid_loss)
# monitoring: print_iter 주기만큼 print문을 찍어줍니다.
if (epoch + 1) % print_iter == 0:
print("Epoch:%d, train_loss=%.3e, valid_loss=%.3e, lowest_loss=%.3e" % (epoch+1,train_loss, valid_loss, lowest_loss))
# lowest_loss 갱신
if valid_loss < lowest_loss:
lowest_loss = valid_loss
lowest_epoch = epoch
# model save
torch.save(model.state_dict(), "./model_rnn.pth")
else:
if early_stop >0 and lowest_epoch + early_stop < epoch + 1:
print("삽질 중")
break
print("The Best Validation Loss=%.3e at %d Epoch" % (lowest_loss, lowest_epoch))
# model load
model.load_state_dict(torch.load("./model_rnn.pth"))
# result 라는 딕셔너리를 생성해서 train_hs, valid_hs 를 담아줍니다.
result["Train Loss"] = train_hs
result["Valid Loss"] = valid_hs
# trues_cat(y값들), preds_cat(y_pred값들): 마지막 epoch에서의 값들만 줍줍하여, result에 넣는다.
# 실제값들과 예측값을 시각화하여 보기 위함.
# 마지막 에포크의 예측값들과 실제값들이 담길 것
result["Trues"] = trues_cat
result["Preds"] = preds_cat
return model, result
: 다음 코드로 학습 시작!
model, result = run_train()
## Visualization: Train Loss, Valid Loss
plot_from = 0
plt.figure(figsize=(20, 10))
plt.title("Train/Valid Loss History", fontsize = 20)
plt.plot(
range(0, len(result['Train Loss'][plot_from:])),
result['Train Loss'][plot_from:],
label = 'Train Loss'
)
plt.plot(
range(0, len(result['Valid Loss'][plot_from:])),
result['Valid Loss'][plot_from:],
label = 'Valid Loss'
)
plt.legend()
# plt.yscale('log')
plt.grid(True)
plt.show()
## Visualization: True Values and Predicted Values
plot_from = 0
plt.figure(figsize=(20, 10))
plt.title("Trues, Preds History", fontsize = 20)
plt.plot(
range(0, len(result['Trues'].detach().cpu().numpy()[plot_from:])), # 마지막 에포크에서의 실제값들입니다.
result['Trues'].detach().cpu().numpy()[plot_from:],
label = 'Trues'
)
plt.plot(
range(0, len(result['Preds'].detach().cpu().numpy()[plot_from:])), # 마지막 에포크에서의 예측값들입니다.
result['Preds'].detach().cpu().numpy()[plot_from:],
label = 'Preds'
)
plt.legend()
# plt.yscale('log')
plt.grid(True)
plt.show()
보면 알겠지만, 성능은 장담하지 못 한다. 필자의 경우, 시계열 예측에 필요한 전처리 방법 혹은 비트코인 주가 예측에 필요한 도메인 지식 또한 없다.