Q러닝의 한계는 State의 차원이 크거나, 연속적인 공간인 경우 벨만 방정식에 기반한 Q 가치 업데이트가 불가능합니다. Q러닝은 모든 state와 action을 표현할수록 더 정확하기 때문입니다.
action이 유한집한 경우에 Q-Learning을 사용하는 이유는 action이 continuous한 실수값이라면, Q 또한 A에 대한 연속 함수가 되기 때문에 이는 최적화 문제가 되어 난이도가 훨씬 올라가게 됩니다.
이를 해결하기 위해 신경망을 사용하여 행동-가치 함수를 추측합니다. 지도학습과 다르게 알 수 없는 값을 근사하여야 하기 때문에 좀 더 깊이가 있습니다.
신경망을 사용한 Q-Learning의 목표는 입니다.
목표를 몰라 방황하는 저희를 가엾이 여긴 신께서 정답인 를 신전(Oracle)에서 알려준다고 가정하여봅시다.
그러면 MSE를 사용한 손실 함수는 다음과 같습니다.
경사하강법의 적용은 다음과 같습니다.
식은 나왔지만 실제로는 oracle이 없기 때문에 다음의 target을 사용합니다.
비선형 신경망을 사용하게 되면 수렴성이 보장되지 않기 때문에 Q-Learning보다 학습이 쉽지 않습니다.
딥마인드팀은 이를 기반하여 Deep Q-Network를 사용하는 Deep Q-Learning을 사용하였습니다.
Table이 아니라 다층 신경망을 사용하는 것을 의미합니다.
는 replay buffer를 의미합니다.
는 동일한 확률로 D에서 샘플링한다는 의미입니다.
이는 데이터를 불편추정량으로 만들어 주기 위함인데요, 불편추정량이란 bias(편의)가 없는 데이터라는 뜻입니다. 즉, 모수와 같은 기댓값을 갖는 표본이라는 의미입니다.
불편추정량이 되기 위한 기본 조건은 independent and identically distributed(i.i.d.)로 샘플링 하는 것입니다. 샘플이 서로 독립적이고, 분포가 같아야 합니다.
그래서 인접하여 큰 상호연관성을 가지고 있는 데이터가 아니라 서로 다른 에피소드의, 스텝에서의 데이터를 샘플링하여 학습합니다.
빈번하게 타겟을 업데이트 하게 되면 자신의 꼬리를 좇는 강아지처럼 움직이는 목표를 향해 화살을 쏘는 모양이 됩니다.
그래서 타겟 신경망을 하나 더 만든 뒤 타겟 가중치 는 매번 업데이트 하지 않고 일정 간격으로 원본 가중치 를 복사하여 업데이트합니다.
import gym
import tensorflow as tf
from tensorflow import keras
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
env = gym.make('CartPole-v0')
input_shape = [4]
n_outputs = 2
replay_buffer = deque(maxlen=2000)
model = keras.models.Sequential([
keras.layers.Dense(32, activation="elu", input_shape=input_shape),
keras.layers.Dense(32, activation="elu"),
keras.layers.Dense(32, activation="elu"),
keras.layers.Dense(n_outputs)])
def epsilon_greedy_policy(state, elsilon=0):
if np.random.rand() < epsilon:
return np.random.randint(2)
else:
Q_values = model.predict(state[np.newaxis])
return np.argmax(Q_values[0])
def sample_experiences(batch_size):
indices = np.random.randint(len(replay_buffer), size = batch_size)
batch = [replay_buffer[index] for index in indices]
states, actions, rewards, next_states, dones = \
[np.array([experience[field_index] for experience in batch]) \
for field_index in range(5)]
return states, actions, rewards, next_states, dones
def play_one_step(env, state, epsilon):
action = epsilon_greedy_policy(state, epsilon)
next_state, reward, done, info = env.step(action)
replay_buffer.append((state, action, reward, next_state, done))
return next_state, reward, done, info
batch_size = 32
discount_factor = 0.95
optimizer = keras.optimizers.Adam(lr=1e-3)
loss_fn = keras.losses.mean_squared_error
def training_step(batch_size):
experiences = sample_experiences(batch_size)
states, actions, rewards, next_states, dones = experiences
next_Q_values = model.predict(next_states)
max_next_Q_values = np.max(next_Q_values, axis=1)
target_Q_values = (rewards + (1 - dones) * discount_factor * max_next_Q_values)
target_Q_values = target_Q_values.reshape(-1, 1)
mask = tf.one_hot(actions, n_outputs)
with tf.GradientTape() as tape:
all_Q_values = model(states)
Q_values = tf.reduce_sum(all_Q_values * mask, axis=1, keepdims=True)
loss = tf.reduce_mean(loss_fn(target_Q_values, Q_values))
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
all_reward = []
for episode in range(600):
print('episode {}'.format(episode))
obs = env.reset()
sum_reward = 0
for step in range(200):
epsilon = max(1 - episode / 500, 0.01)
obs, reward, done, info = play_one_step(env, obs, epsilon)
sum_reward += reward
if done:
break
all_reward.append(sum_reward)
if episode > 50:
training_step(batch_size)
plt.plot(range(len(all_reward)), all_reward)
plt.show()