Policy Iteration(code)

Seungsoo Lee·2022년 7월 7일
0

RL

목록 보기
11/15

date: 2021-10-18 19:00:00


(https://github.com/rlcode/reinforcement-learning-kr-v2) 의 코드를 참고하였습니다

이 코드를 이해하려면 앞선 policy iteration 포스트를 읽어주세요.

설명은 주석으로 해두었습니다.


Vπ(s) = E[Rt+1 + γVπ(St+1) | St = s]

아래의 과정을 코드로 표현한다.

  1. 처음에는 random policy(무작위 정책)로 정책을 설정한다.
  2. 모든 state에 대해서 정해진 업데이트 된 현재 policy를 통해 모든 state의 value값을 정하는 policy evaluation(정책 평가)를 한다.
  3. 모든 state의 value를 바탕으로 모든 state의 policy를 업데이트를 하는 policy improvement(정책 발전)을 한다.
  4. 2 ~ 3을 반복한다.



policy_iteration.py

5 x 5 grid world의 예제이다.


PolicyIeration 클래스의 초기화 부분을 보겠다

class PolicyIteration:
    def __init__(self, env):
        # 환경에 대한 객체 선언
        self.env = env
        # 가치함수를 2차원 리스트로 초기화 (25개)
        self.value_table = [[0.0] * env.width for _ in range(env.height)]
        # 상 하 좌 우 동일한 확률로 정책 초기화 (100개)
        # 한 state 에 4가지 policy가 있기 때문
        self.policy_table = [[[0.25, 0.25, 0.25, 0.25]] * env.width
                            for _ in range(env.height)]
        # 마침 상태의 설정
        # goal 인 지점(2, 2)
        self.policy_table[2][2] = []
        # 할인율
        self.discount_factor = 0.9

policy_evaluation 함수에 대해서 보겠다.

# 벨만 기대 방정식을 통해 다음 가치함수를 계산하는 정책 평가
    def policy_evaluation(self):
        # 다음 가치함수 초기화
        next_value_table = [[0.00] * self.env.width
                           for _ in range(self.env.height)]

        # 모든 상태에 대해서 벨만 기대방정식을 계산
        for state in self.env.get_all_states():
          	# 처음 value 는 일단 0으로 초기화
            value = 0.0
            # 마침 상태의 가치 함수 = 0
            if state == [2, 2]:
                next_value_table[state[0]][state[1]] = value
                # 바로 다음 state로 간다
                continue

            # 벨만 기대 방정식
            # 2, 2가 아닌 경우 현재 state에서 가능한 action 4가지를 모두 반복함
            for action in self.env.possible_actions:
              	# 현재 반복중인 state에서 해당 action을 하면 다음 state는 무엇일지 next_state에 넣음
                next_state = self.env.state_after_action(state, action)
                # 현재 반복중인 state에서 해당 action을 하면 다음 state에 있는 reward가 얼마일지 reward에 넣음
                reward = self.env.get_reward(state, action)
                # next_state에 있는 현재 timestep의 value값을 next_value에다 넣음
                next_value = self.get_value(next_state)
                # 이게 벨만 기대 방정식, 이 반복문을 돌면서 각 action에 대한 q 값들을 모두 벨만 기대방정식에 의해 합해 주는 과정
                value += (self.get_policy(state)[action] *
                          (reward + self.discount_factor * next_value))
						# (state[0], state[1]) 지점의 state(현재 반복에서의 state)에 계산한 value를 next_value_table에넣어줌
            next_value_table[state[0]][state[1]] = value
				
        # 각 state에 해당되는 모든 next_value를 인스턴스 변수인 self.value_table에 업데이트를 시켜준다
        self.value_table = next_value_table

policy_improvement 함수에 대해서 알아보겠다.

# 현재 가치 함수에 대해서 탐욕 정책 발전
    def policy_improvement(self):
    		# 아까 __init__에서 초기화 해주었던 인스턴스 변수인 self.policy_table를 next_policy에다가 넣음
        next_policy = self.policy_table
        # 모든 state에 대해서 반복을 돌림
        for state in self.env.get_all_states():
          	# 마침 상태가 현재 반복의 state라면 아무것도 안하고 continue를 통해 다음 state 로 반복을 이어간다.
            if state == [2, 2]:
                continue
            
            value_list = []
            # 반환할 정책 초기화
            result = [0.0, 0.0, 0.0, 0.0]

            # 모든 행동에 대해서 [보상 + (할인율 * 다음 상태 가치함수) = Qvalue] 계산
            for index, action in enumerate(self.env.possible_actions):
              	# 현재 반복중인 state에서 해당 action을 하면 다음 state는 무엇일지 next_state에 넣음
                next_state = self.env.state_after_action(state, action)
                # 현재 반복중인 state에서 해당 action을 하면 다음 state에 있는 reward가 얼마일지 reward에 넣음
                reward = self.env.get_reward(state, action)
                # next_state에 있는 현재 timestep의 value값을 next_value에다 넣음
                next_value = self.get_value(next_state)
                # 이번엔 각 action에 대한 q value들을 value_list에다가 추가를 해준다.
                value = reward + self.discount_factor * next_value
                value_list.append(value)

            # 받을 보상이 최대인 행동들에 대해 탐욕 정책 발전
            # 지금 현재 반복중인 state에서의 모든 action에 대한 q값을 비교한후 가장 큰 값의 index를 max_idx_list에 넣음
            max_idx_list = np.argwhere(value_list == np.amax(value_list))
            max_idx_list = max_idx_list.flatten().tolist()
            # 만약 그 q값들중 같은 값들이 있을때 확률이 같게 나누어 준다.
            prob = 1 / len(max_idx_list)

            # result에 각 action에 대한 policy를 넣어준다.
            for idx in max_idx_list:
                result[idx] = prob
						
            # next_policy에다 현재 반복중인 state의 policy를 업데이트 해준다.
            next_policy[state[0]][state[1]] = result

        # 각 state에서 모든 action에 해당되는 next_policy를 인스턴스 변수인 self.policy_table에 업데이트를 시켜준다
        self.policy_table = next_policy

제가 올린 글에서 잘못된 부분이 있으면 제 메일로 연락주세요!

Reference : https://github.com/rlcode/reinforcement-learning-kr-v2


이승수의 저작물인 이 저작물은(는) 크리에이티브 커먼즈 저작자표시-비영리-동일조건변경허락 4.0 국제 라이선스에 따라 이용할 수 있습니다.

1개의 댓글

comment-user-thumbnail
2022년 12월 14일

결과적으로 moto x3m 개발 워크플로가 더욱 효율적이 됩니다.

답글 달기