FLY AI 4기 9일차: 강화학습

염지현·2024년 1월 8일

FLY AI 4기

목록 보기
10/16

9일차: 강화학습

9일차 요약

  • 오전: 강화학습 이론 및 back-propagation 실습
  • 오후: 강화학습 실습
    - MDP Grid World 실습

Backpropagation 실습

import math
import torch
from torch.autograd import Variable

x = Variable(
 torch.tensor(1., dtype=torch.float32),
 requires_grad=True)
y = Variable(
 torch.tensor(1., dtype=torch.float32),
 requires_grad=True)

optimizer = torch.optim.Adam(params=[x,y], lr=0.01)

EPOCHS = 100
for epoch in range(EPOCHS):
  f = 2*(x**2) + (y**2) + (math.exp(4*x*y))
  optimizer.zero_grad()
  f.backward()
  optimizer.step()
  
print(x, y)
print(f)

MDP 실습: GIRD WORLD

  • Grid world가 다음과 같이 주어졌을 때 정답 길 찾기!
  • 직접 state transition matrix, reward matrix 만들어 봄

matrix p 생성

  • 행은 현재 내 상태, 열은 이동 가능한 확률을 의미함
  • #up은 행 i번에서 위로 올라갈 수 있는지
    - 위에가 막혀있을 경우 자기 자신 스스로이기 때문에 자기 자신한테 1이라고 표현
    • 따라서 4번 인덱스로 위로 올라갈 경우 1번 인덱스에 도달하기 때문에 4,1에 1이라고 작성
  • #down, #right, #left도 동일
import numpy as np

P = np.array([
    # up
    # 0   1  2  3  4  5  6  7  8  9  10 11
    [ [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0],
      [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
      [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]],

    # down
    [ [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0],
      [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]],

    # left
    [ [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
      [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]],

    # right
    [ [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]]], dtype='float32')

P.shape
P = P.transpose(1, 0, 2)

Reward matrix

  • 위로 갈 때 finish index에 도달할 경우 1, 도달하지 못할 경우 0, 자기 자신일 경우 1
  • 따라서 해당 문제에서는 10번 인덱스에서 위로 올라가야지만 finish index에 도달할 수 있기 때문에 첫번째 행에서 10번 열만 1이 됨
R = np.array([
    [-1, -1, -1,  -1, -1, -1, -1, -1, -1, -1, 1, 0], # up
    [-1, -1, -1,  -1, 1, 1, 1, -1, -1, -1, -1, 0], # down
    [-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0], # left
    [-1, -1, -1,  -1, -1, -1, -1, -1, -1, -1, -1, 0]], #right
    dtype='float32')
    
R = R.transpose()

MDP

pi = np.ones((12, 4), dtype='float32') * 0.25

def policy_eval(P, R, pi, maxiter=30):
    V = np.zeros((maxiter, 12), dtype='float32')

    for i in range(maxiter-1):
        V[i+1] = np.squeeze(
            np.matmul(
                np.expand_dims( pi, 1 ),
                np.expand_dims( R + 0.6 * np.dot(P, V[i]), 2 )))

    return V[maxiter-1]

def policy_upd(P, R, v):
    print(np.squeeze(np.expand_dims( R + 0.6 * np.dot(P, v), 2 )))
    a_idx = np.argmax(np.squeeze(np.expand_dims( R + 0.6 * np.dot(P, v), 2 )), axis=1)
    pi = np.zeros((12, 4), dtype='float32')
    pi[range(12), a_idx] = 1.
    return pi

pi_old = None
pi = np.ones((12, 4), dtype='float32') * 0.25

while not np.all(np.equal(pi_old, pi)):
    pi_old = pi.copy()
    v = policy_eval(P, R, pi)
    pi = policy_upd(P, R, v)

print(pi)

결과값

0개의 댓글