순환신경망 (RNN - Recurrent Neural Network)
RNN의 은닉 상태가 이전 시점의 정보를 기억하고 전달하기 때문에 시퀀스 데이터의 패턴을 파악하는 데 유용하다.
but
기본적인 RNN은 긴 시퀀스에 대해 장기 의존성을 처리하는데 어려움을 겪을 수 있다. 이는 긴 시퀀스에서 과거 정보를 적절히 기억하지 못해 정보가 소실되거나 불필요하게 강조되는 문제를 의미한다.
⇒ 그래서 긴 시퀀스에 대해서도 장기 의존성 문제를 해결하는 LSTM과 GRU와 같은 변형된 RNN 모델들이 주로 활용된다.
RNN을 이용한 Sequence data 처리 예
RNN (Recurrent Neural Networks)
Sequence가 긴 경우 앞쪽의 기억이 뒤쪽에 영향을 미치지 못해 학습능력이 떨어진다.
LSTM (Long Short Term Memory)
오래 기억할 것은 유지하고 잊어버릴 것은 빨리 잊어버리자
LSTM은 장기 의존성을 더욱 효과적으로 기억하고 전달하기 위한 메커니즘을 가지고 있다.
LSTM은 기본적으로 은닉 상태(hidden state)와 장기 상태(long-term state)라는 두 가지 상태를 가지고 있다.
LSTM은 게이트라 불리는 구조를 사용하여 장기 상태와 은닉 상태를 제어하고 업데이트한다.
장기 상태(long-term state)는 과거 정보를 기억하는데 사용
은닉 상태(hidden state)는 현재 입력과 과거 정보를 바탕으로 계산되는 새로운 정보를 담고 있다,
LSTM 이전 기억
LSTM의 구조
Forget gate
입력된 cell state에서 얼마나 잊어 버릴지를 처리.
입력: 이전 시점의 은닉 상태 (h_t-1)와 현재 시점의 입력 (x_t)
출력: 0과 1 사이의 값으로 이루어진 벡터 f_t
계산 방법:
forget gate의 계산은 시그모이드 함수를 사용하여 이루어집니다.
f_t = sigmoid(W_f * [h_t-1, x_t] + b_f)
Input gate
현재 sequence의 입력데이터를 cell state(Long term memory)에 더한다.
Input gate는 현재 입력과 이전 장기 상태로부터 어떤 정보를 기억해야 할지를 결정한다.
입력: 이전 시점의 은닉 상태 (h_t-1)와 현재 시점의 입력 (x_t)
출력: 0과 1 사이의 값으로 이루어진 벡터 i_t
계산 방법:
input gate의 계산은 시그모이드 함수를 사용하여 이루어집니다.
i_t = sigmoid(W_i * [h_t-1, x_t] + b_i)
Output gate
- 현재 sequnce의 입력데이터를 처리해서 output으로 출력(output으로도 보내주고 hidden state로 보내진다.)
- Output gate는 현재 시점의 입력과 이전 장기 상태를 바탕으로 현재 은닉 상태를 어떻게 계산할지를 결정한다.
- 입력: 이전 시점의 은닉 상태 (h_t-1)와 현재 시점의 입력 (x_t)
- 출력: 0과 1 사이의 값으로 이루어진 벡터 o_t
계산 방법:
output gate의 계산은 시그모이드 함수를 사용하여 이루어집니다.
o_t = sigmoid(W_o * [h_t-1, x_t] + b_o)
LSTM_stockprice
- Yahoo Finance 에서 주가 데이터 다운로드 (https://finance.yahoo.com/)
이제 read_csv해서 실습시작!!
4일치의 주가를 보고 5일치의 close(마감주가)를 예측하는 방식이다.
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
df = pd.read_csv('005930.KS.csv')
df.shape # (5913, 7)
df.head() # Volume: 거래량
df.info() # object 타입을 Datetime타입으로 변환해야한다는 것을 알 수 있다.
#Date 컬럼을 Datetime 타입으로 변환 후 index로 변환
df['Date'] = pd.to_datetime(df['Date'])
df.set_index('Date', inplace=True) # Date컬럼을 행인덱스로 빼줄것이다. 왜? 굳이 필요없어서
df.head() # 다시 확인해보자. 잘 반영되었다.
df[['Open', 'Close']].plot(figsize=(20,5), alpha=0.5)
df[['Open', 'Close']].iloc[:50].plot(figsize=(20, 5), marker='*'); # iloc: 앞의 50만 보겠다.
전처리
df.drop(columns=['Adj Close'], inplace=True) # Adj Close 컬럼 제거
df.head()
⇒ 입력값이 5개이니 weight도 5개이다.
X, y 분리
df_y = df['Close'].to_frame() # Series.to_frame(): Series를 DateFrame으로 변환(1차원->2차원 구조)
df_x = df
Scaling - MinMaxScaler
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
y_scaler = MinMaxScaler()
X = scaler.fit_transform(df_x)
y = y_scaler.fit_transform(df_y)
X.shape, y.shape # ((5913, 5), (5913, 1))
전처리 후 이제 묶어주기!
연속된 날짜를 지정
연속된 날짜가 5인 경우
day1~day5의 데이터를 보고 day6를 예측하도록
한칸씩한칸씩 움직이면서 예측하도록한 것이다.
⇒ 이런 방식을 사용해 우리는 연속된 날짜를 50으로 해서 한칸씩 움직이면서 예측하도록 한다는 것이다.
window_size = 50 # 몇일 치를 한 단위로 묶을지 설정(50일치)
data_x = [] # input data들을 저장할 리스트 ex) [[1-50],[2,51,[3,52], ...]
data_y = [] # output data를 저장할 리스트 / 예측된 값인 [[51],[52],[53]]의 close 가격
# y.size - 50
for i in range(0, y.size - window_size): # windowsize +1개의 행(x: window_sizw행, y:1행)이 확보될때까지 반복문을 실행.
# 뒤로갈수록 windowsize를 반복시 예측될 close가 없을 수 있으므로
_X = X[i:i+window_size] # 처음에는 [0:50]-0~49까지 반복 / [1:51] - 1~50까지 반복
_y = y[i+window_size] # 50 / 51
data_x.append(_X)
data_y.append(_y)
np.shape(data_x) # (5863, 50, 5)
np.shape(data_y) # (5863, 1)
Train, Test 분리
Train : 8, Test: 2의 비율로 나눈다.
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(data_x, data_y,
test_size=0.2,
random_state=0)
type(X_train) # list
X_train = np.array(X_train)
X_test = np.array(X_test)
y_train = np.array(y_train)
y_test = np.array(y_test)
type(X_train) # numpy.ndarray
import
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
하이퍼파라미터 정의
LEARNING_RATE = 0.001
N_EPOCH = 100 # 우선 10 로 실행해봄
N_BATCH = 200
Dataset 생성
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))\
.shuffle(X_train.shape[0])\
.batch(N_BATCH, drop_remainder=True)
test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(N_BATCH)
모델 생성, 컴파일
def get_model():
model = keras.Sequential()
model.add(layers.InputLayer(input_shape=(window_size, 5)))
model.add(layers.LSTM(32, return_sequences=False, activation='tanh')) # 32가지의 특성을 찾아봐
model.add(layers.Dense(32, activation='relu'))
model.add(layers.Dense(1))
model.compile(optimizer=keras.optimizers.Adam(LEARNING_RATE), loss='mse')
return model
model = get_model()
model.summary()
모델 학습, 평가
hist = model.fit(train_dataset,
epochs=N_EPOCH,
validation_data=test_dataset)
결과 시각화
plt.plot(hist.epoch, hist.history['loss'])
plt.plot(hist.epoch, hist.history['val_loss']);
plt.ylim(0,0.001)
plt.show()
*최종평가*
model.evaluate(test_dataset)
X[-50].shape # (5,)
X[-50:][np.newaxis, ...].shape # (1, 50, 5)
pred = model.predict(new_data) # 1/1 [==============================] - 0s 397ms/step
pred # minmax scaling # array([[0.7722465]], dtype=float32)
y_scaler.inverse_transform(pred) # array([[70896.195]], dtype=float32)
Generator와 Discriminator 네트워크
생성자생성후 판별자가 학습 시 분류 오차가 크면 가짜오차를 발견하지 못한것이다. 분류오차가 작을 수록 가짜 샘플을 잘 판별한 것이다.
⇒ 오차 역전파를 이용해 판별자를 업데이트한다.
⇒ 생성자를 생성 후 판별자가 학습시 분류오차가 0이라면 내가 만든 가짜 샘플을 너무 잘 판별한것이다. 분류오차가 높을 수록 가짜샘플을 잘 생성한것이다.
⇒ 생성자업데이트는 생성자가 한다.
다음으로는 GAN의 기본 구조에 딥러닝의 기술인 합성곱 신경망(Convolutional Neural Network, CNN)을 적용한 GAN에 대하여 알아볼 것인데 그전에 Down sampling과 Up sampling에 대하여 먼저 알아보자.
Down sampling과 Up sampling
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
X = np.arange(1, 5).reshape(1, 2, 2, 1)
X.shape
# interpolation='nearest' : default, 'bilinear' 두가지 방식 제공
model = keras.Sequential()
model.add(layers.UpSampling2D(size=2, input_shape=(2,2,1)))
model.summary() # (1, 2, 2, 1)
X_up = model.predict(X)
X_up.shape, X.shape # ((1, 4, 4, 1), (1, 2, 2, 1))
X_up.squeeze()
# interpolation='nearest' : default, 'bilinear' 두가지 방식 제공
model = keras.Sequential()
# 'bilinear'은 선형 보간법의 일종으로, 주어진 4개의 가장 가까운 픽셀 값을 사용하여 새로운 픽셀 값을 추정하는 방법
# 업샘플링할 때 이미지의 픽셀 값을 채우는데, 이웃하는 4개의 픽셀 값으로부터 새로운 픽셀 값을 계산하는 방식이기 때문에 비교적 계산량이 작으면서도 자연스러운 이미지를 얻을 수 있다.
model.add(layers.UpSampling2D(size=5, interpolation='bilinear', input_shape=(2,2,1)))
model.summary()
X_up = model.predict(X)
X_up.shape, X.shape # ((1, 10, 10, 1), (1, 2, 2, 1))
X_up.squeeze()
Transpose Convolution
model = keras.Sequential()
model.add(layers.Conv2DTranspose(kernel_size=3, filters=12, strides=2, padding='same', input_shape=(2,2,1)))
X_up2 = model.predict(X)
X.shape, X_up2.shape # ((1, 2, 2, 1), (1, 4, 4, 12))
X_up2[0]
DCGAN
img_shape = (28, 28, 1) # 판별자의 input shape
z_dim = 100
생성자
def create_generator(z_dim=100):
model = keras.Sequential()
model.add(layers.Dense(7*7*256, input_shape=(z_dim, )))
model.add(layers.Reshape((7, 7, 256)))
# image size는 두배씩 늘리고 channel은 두배씩 줄인다.
# Convolution block : Conv2DTranspose -> BatchNormalization -> Activation(LeakyReLU)
model.add(layers.Conv2DTranspose(filters=128, kernel_size=3, strides=2, padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.LeakyReLU(alpha=0.01))
# 채널만 절반으로 줄임.
model.add(layers.Conv2DTranspose(filters=64, kernel_size=3, strides=1, padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.LeakyReLU(alpha=0.01))
# size는 절반으로 늘림. 14 X 14 => 28 X 28, channle: 1 (grayscale)
model.add(layers.Conv2DTranspose(filters=1, kernel_size=3, strides=2, padding='same'))
model.add(layers.Activation("tanh"))
return model
판별자
def create_discriminator(img_shape):
model = keras.Sequential()
# pooling layer를 사용하지 않고 stride를 이용해서 size를 줄인다.
# size를 절반씩 줄여나간다. filter(channel)는 32->64->128 늘린다.
model.add(layers.Conv2D(filters=32, kernel_size=3, strides=2, padding='same',
input_shape=img_shape))
model.add(layers.LeakyReLU(alpha=0.01))
model.add(layers.Conv2D(filters=64, kernel_size=3, strides=2,padding='same'))
model.add(layers.LeakyReLU(alpha=0.01))
model.add(layers.Conv2D(filters=128, kernel_size=3, strides=2,padding='same'))
model.add(layers.LeakyReLU(alpha=0.01))
model.add(layers.Flatten())
model.add(layers.Dense(1, activation='sigmoid')) # 출력: 이진분류 - 0:fake, 1:real
return model
LeakyReLU란
⇒ GAN(Generative Adversarial Networks)과 같이 비선형성을 강조하는 모델에서 LeakyReLU를 사용하여 좀 더 안정적인 학습을 도모하는 경우가 많다.
GAN 모델
def create_gan(generator, discriminator):
model = keras.Sequential()
model.add(generator)
model.add(discriminator)
return model
모델 생성 및 컴파일
# 판별자 생성 + 컴파일
discriminator = create_discriminator(img_shape) #(28, 28, 1)
discriminator.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
# 생성자 생성
generator = create_generator(z_dim)
discriminator.trainable=False # GAN 모델의 discriminator(layer)를 Frozen(학습할때 weight가 update되지 않게함.)
# GAN 모델 생성
gan = create_gan(generator, discriminator)
gan.compile(optimizer='adam', loss='binary_crossentropy')
훈련
import matplotlib.pyplot as plt
def sample_images(generator, image_grid_row=4, image_grid_col=4):
"""
Generator를 이용해 가짜 이미지를 생성해 그리는 함수.
그리드 행, 열의 개수를 받아 행 * 열 개수만큼 그린다.
[매개변수]
generator: Generator 모델
image_grid_rows: 이미지를 그릴 grid 행수 (기본값 : 4)
image_grid_columns: 이미지를 그릴 grid 열수(기본값 : 4)
"""
z = np.random.normal(0, 1, (image_grid_row*image_grid_col, z_dim))
gen_images = generator.predict(z)
plt.figure(figsize=(7,7))
for i in range(image_grid_row * image_grid_col):
plt.subplot(image_grid_row, image_grid_col, i+1)
plt.imshow(gen_images[i, :, :, 0], cmap='gray')
plt.axis('off')
plt.show()
sample_images(generator)
**training 함수**
loss_list = [] # step 별 loss를 저장할 리스트
acc_list = [] # step 별 acc를 저장할 리스트
iteration_list = [] # loss, acc를 저장할 step 수를 저장할 리스트
def train(train_image, iterations, batch_size, sample_interval):
"""
[parameter]
train_image: 진짜 이미지데이터셋
iterations : 총 step수
batch_size : batch size
sample_interval: 몇 iteration당 한번씩 훈결결과를 출력/저장할지 간격
"""
train_image = train_image/127.5-1 # -1 ~ 1사이로 scaling # 전처리
train_image = train_image[..., np.newaxis] #채널 차원 증가. (28, 28) => (28, 28, 1)
# Label 생성: fake-0, real: 1
real = np.ones((batch_size, 1))
fake = np.zeros((batch_size, 1))
# 학습.
for iteration in range(iterations):
#################################### 판별자 훈련
# 정답에서 추출할 이미지의 index를 random 함수를 이용해 batch 개수만큼 조회
idx = np.random.randint(0, train_image.shape[0], batch_size)
# 학습에 사용할 정답 이미지들 조회
real_imgs = train_image[idx]
# Fake image를 만들기 위해 generator에 넣어줄 잡음 생성.
z = np.random.normal(0,1, (batch_size, 100))
# Generator를 이용해 Fake image 생성
gen_imgs = generator.predict(z)
#진짜 이미지로 학습
d_loss_acc_real = discriminator.train_on_batch(real_imgs, real) # 한스텝 학습시키는 메소드
#생성자가 만든 가짜 이미지로 학습
d_loss_acc_fake = discriminator.train_on_batch(gen_imgs, fake)
d_loss, acc = np.add(d_loss_acc_real, d_loss_acc_fake)*0.5
####################################생성자 훈련 - gan을 이용해서 훈련.
z = np.random.normal(0, 1, (batch_size, 100))
gan_loss = gan.train_on_batch(z, real) # input으로 잡음과 정답을 전달
# 중간결과 확인
if iteration % sample_interval == 0:
loss_list.append((d_loss, gan_loss))
acc_list.append(acc)
iteration_list.append(iteration)
print(f'판별자 손실:{d_loss}, 판별자정확도:{acc}, gan 손실:{gan_loss}')
sample_images(generator)
import time
(X_train, y_train), (X_test, y_test) = keras.datasets.fashion_mnist.load_data()
iterations = 10000
batch_size=100
sample_interval=500
start = time.time()
train(X_train, iterations, batch_size, sample_interval)
end = time.time()
print((end-start)/60, '분')
이미지 생성
import tensorflow as tf
save_path = '/content/drive/MyDrive/saved_model/fashion_mnist_gan'
saved_generator = tf.keras.models.load_model(save_path)
z = np.random.normal(0, 1, (3, 100))
pred = saved_generator.predict(z)
plt.figure(figsize=(10,5))
for i in range(3):
plt.subplot(1, 3, i+1)
plt.imshow(pred[i].reshape(28,28), cmap='gray')
plt.axis('off')
plt.show()