Pytorch DQN 구현시 주의점
- 미니 배치 사용
Experience Replay 및 유사 Fixed Tager Q-Network를 구현하기 위해 미니 배치 학습 적용
- DQN은 각 단계의 trasnsition(상태 st, 행동 at, 그다음 상태 st+1, 보상rt+1을 메모리 객체에 저장
- 미니배치는 이 메모리 객체에서 여러 단계 분량에 해당하는
transition을 무작위로 꺼낸 것
- 게임 종료에 따라 다음 상태가 존재하지 않을 수 있으니 분기하도록 구현
- 구현 편의를 위해 Fixed Target Q-Network를 제대로 구현하는 target Q-Network를 구현하지 않고 main-network를 사용한다
- 미니배치를 다루는 방법
- 변수의 데이터 타입
- MNIST 데이터와는 달리 OpenAI Gym의 CartPole과 파이토치 신경망이 서로 데이터를 주고받아야 한다
- CartPole은 Numpy 타입으로 변수를 다루는데 Torch에서는 Torch.Tensor를 다룬다
- 변수의 크기
- Torch.Tensor의 크기에 주의, 크기 1과 크기 1 * 1은 의미적으론 같지만 연산이 안될 수도
- namedtuple을 사용한다는 점
- CartPole에서 관측된 상태 변수 값에 이름을 붙여 저장해둘 수 있다.
DQN 구현
import numpy as numpy
import matplotlib.pyplot as plt
%matplotlib inline
import gym
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display
def display_frames_as_gif(frames):
'''
Displays a list of frames as a gif, with controls
'''
plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0), dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')
def animate(i):
patch.set_data(frames[i])
anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)
anim.save('./movie_DQN.gif')
from collections import namedtuple
Tr = namedtuple('tr', ('name_a', 'value_b'))
Tr_object = Tr('이름A', 100)
print(Tr_object)
print(Tr_object.value_b)

namedtuple 생성
from collections import namedtuple
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))
상수 정의
ENV = 'CartPole-v0'
GAMMA = 0.99
MAX_STEPS = 200
NUM_EPISODES = 500
ReplayMemory 클래스 정의 : 미니배치 학습에서 경험 데이터를 저장
push : 해당 단계의 transition을 저장
sample : 무작위 transition을 꺼내옴
len : 저장된 transtition의 개수 조회
- 저장된 transition의 개수가 CAPACITY개를 초과하면 오래된 것부터 지움
import random
class ReplayMemory:
def __init__(self, CAPACITY):
self.capacity = CAPACITY
self.memory = []
self.index = 0
def push(self, state, action, state_next, reward):
'''transition = (state, action, state_next, reward) 메모리 저장'''
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.index] = Transition(state, action, state_next, reward)
self.index = (self.index + 1) % self.capacity
def sample(self, batch_size):
'''batch_size 개수만큼 무작위로 저장된 transition을 추출'''
return random.sample(self.memory, batch_size)
def __len__(self):
'''len 함수로 현재 저장된 transition 개수를 반환'''
return len(self.memory)
Brain 클래스 구현
- 신경망으로 Q함수를 나타낼 것
replay : 메모리 클래스에서 미니배치를 꺼내와서 신경망의 결합 가중치를 학습, Q함수 수정
decide_action : ϵ-greedy 알고리즘으로 무작위 선택된 행동 혹은 현재 상태에서 현재 Q값이 최대가 되는 행동을 선택해서 그 행동의 인덱스를 반환
import numpy as np
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
BATCH_SIZE = 32
CAPACITY = 10000
class Brain:
def __init__(self, num_states, num_actions):
self.num_actions = num_actions
self.memory = ReplayMemory(CAPACITY)
self.model = nn.Sequential()
self.model.add_module('fc1', nn.Linear(num_states, 32))
self.model.add_module('relu1', nn.ReLU())
self.model.add_module('fc2', nn.Linear(32, 32))
self.model.add_module('relu2', nn.ReLU())
self.model.add_module('fc3', nn.Linear(32, num_actions))
print(self.model)
self.optimizer = optim.Adam(self.model.parameters(), lr=0.0001)
def replay(self):
'''Experience Replay로 신경망의 결합 가중치 학습'''
if len(self.memory) < BATCH_SIZE :
return
transition = self.memory.sample(BATCH_SIZE)
batch = Transition(*zip(*transition))
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
non_final_next_states = torch.cat([s for s in batch.next_state
if s is not None])
self.model.eval()
state_action_values = self.model(state_batch).gather(1, action_batch)
non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), dtype=torch.bool)
next_state_values = torch.zeros(BATCH_SIZE)
next_state_values[non_final_mask] = self.model(
non_final_next_states
).max(1)[0].detach()
expected_state_action_values = reward_batch + GAMMA * next_state_values
self.model.train()
loss = F.smooth_l1_loss(state_action_values,
expected_state_action_values.unsqueeze(1))
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def decide_action(self, state, episode):
'''현재 상태에 따라 행동을 결정한다'''
epsilon = 0.5 * (1 / (episode + 1))
if epsilon <= np.random.uniform(0, 1):
self.model.eval()
with torch.no_grad():
action = self.model(state).max(1)[1].view(1, 1)
else :
action = torch.LongTensor(
[[random.randrange(self.num_actions)]])
return action
- 저장된 transition 수 확인
ReplayMemory의 크기를 통해 transition의 수 확인. 미니배치 만큼 모이지 않았다면 처리를 중단
- 미니배치 생성
미니배치 만큼 transition을 꺼내오고 학습 가능하도록 변환
- 정답신호로 사용할 Q(st,at) 계산
Agent 클래스 정의
class Agent :
def __init__(self, num_states, num_actions):
'''태스크의 상태 및 행동의 가짓수를 설정'''
self.brain = Brain(num_states, num_actions)
def update_q_function(self):
'''Q함수를 수정'''
self.brain.replay()
def get_action(self, state, episode):
'''행동을 결정'''
action = self.brain.decide_action(state, episode)
return action
def memorize(self, state, action, state_next, reward):
'''memory 객체에 state, action, state_next, reward 저장'''
self.brain.memory.push(state, action, state_next, reward)
Environment 클래스 정의
class Environment:
def __init__(self):
self.env = gym.make(ENV, render_mode = 'rgb_array')
num_states = self.env.observation_space.shape[0]
num_actions = self.env.action_space.n
self.agent = Agent(num_states, num_actions)
def run(self):
'''실행'''
episode_10_list = np.zeros(10)
complete_episodes = 0
episode_final = False
frames = []
for episode in range(NUM_EPISODES):
observation = self.env.reset()
state = observation
state = torch.from_numpy(state[0]).type(
torch.FloatTensor
)
state = torch.unsqueeze(state, 0)
for step in range(MAX_STEPS):
if episode_final is True :
frames.append(self.env.render())
action = self.agent.get_action(state, episode)
observation_next, _, done, _, _ = self.env.step(
action.item()
)
if done :
state_next = None
episode_10_list = np.hstack(
(episode_10_list[1:], step+1)
)
if step < 160:
reward = torch.FloatTensor(
[-1.0]
)
complete_episodes = 0
else :
reward = torch.FloatTensor([1.0])
complete_episodes += 1
else :
reward = torch.FloatTensor([0.0])
state_next = observation_next
state_next = torch.from_numpy(state_next).type(
torch.FloatTensor
)
state_next = torch.unsqueeze(state_next, 0)
self.agent.memorize(state, action, state_next, reward)
self.agent.update_q_function()
state = state_next
if done:
print('%d Episode : Finished after %d steps : 최근 10 에피소드의 평균 단계 수 = %.1lf' % (episode, step + 1, episode_10_list.mean()))
break
if episode_final is True:
display_frames_as_gif(frames)
break
if complete_episodes >= 10:
print("10 연속 에피소드 성공")
episode_final = True
