강화학습 예제

짬그브·2025년 4월 18일

DQN 예제

모델 생성 후 학습

import gymnasium as gym
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import matplotlib.pyplot as plt
import os

# Determine the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class QNetwork(nn.Module):
    def __init__(self, state_size, action_size, hidden_size=64):
        super(QNetwork, self).__init__()
        #########################################################################################
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, action_size)
        #########################################################################################

    def forward(self, x):
    #########################################################################################
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)
    #########################################################################################

class ReplayBuffer:
    def __init__(self, buffer_size, batch_size):
        self.memory = deque(maxlen=buffer_size)
        self.batch_size = batch_size

    def add(self, experience):
        self.memory.append(experience)

    def sample(self):
    #########################################################################################
        experiences = random.sample(self.memory, k=self.batch_size)
        states, actions, rewards, next_states, dones = zip(*experiences)
        states = torch.tensor(np.array(states), dtype=torch.float32).to(device)
        actions = torch.tensor(actions, dtype=torch.int64).unsqueeze(-1).to(device)
        rewards = torch.tensor(rewards, dtype=torch.float32).unsqueeze(-1).to(device)
        next_states = torch.tensor(np.array(next_states), dtype=torch.float32).to(device)
        dones = torch.tensor(dones, dtype=torch.uint8).unsqueeze(-1).to(device)
        return states, actions, rewards, next_states, dones
    #########################################################################################

    def __len__(self):
        return len(self.memory)

class DQNAgent:
    def __init__(self, state_size, action_size, seed, buffer_size=100000, batch_size=64, gamma=0.99, lr=0.001, tau=0.01, update_every=4):
    #########################################################################################
        self.state_size = state_size
        self.action_size = action_size
        self.seed = random.seed(seed)
        self.gamma = gamma
        self.tau = tau
        self.update_every = update_every
        self.batch_size = batch_size

        self.qnetwork_local = QNetwork(state_size, action_size).to(device)
        self.qnetwork_target = QNetwork(state_size, action_size).to(device)
        self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr=lr)

        self.memory = ReplayBuffer(buffer_size, batch_size)
        self.t_step = 0

    #########################################################################################

    def step(self, state, action, reward, next_state, done):
    #########################################################################################
        self.memory.add((state, action, reward, next_state, done))
        self.t_step = (self.t_step +1) % self.update_every
        if self.t_step == 0:
            if len(self.memory) > self.batch_size:
                experiences = self.memory.sample()
                self.learn(experiences)

    #########################################################################################

    def action(self, state, eps=0.):
    #########################################################################################
        state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
        self.qnetwork_local.eval()
        with torch.no_grad():
            action_values = self.qnetwork_local(state)
        self.qnetwork_local.train()
        if random.random() > eps:
            return np.argmax(action_values.cpu().data.numpy())
        else:
            return random.choice(np.arange(self.action_size))

    #########################################################################################

    def learn(self, experiences):
    #########################################################################################
        states, actions, rewards, next_states, dones = experiences
        Q_targets_next = self.qnetwork_target(next_states).detach().max(1)[0].unsqueeze(1)
        Q_targets = rewards + (self.gamma * Q_targets_next * (1-dones))
        Q_expected = self.qnetwork_local(states).gather(1, actions)

        loss = F.mse_loss(Q_expected, Q_targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.soft_update(self.qnetwork_local, self.qnetwork_target, self.tau)


    #########################################################################################


    def soft_update(self, local_model, target_model, tau):
    #########################################################################################
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(tau * local_param.data + (1.0 - tau) * target_param.data)

    #########################################################################################


# Check if the model file exists
load_model = os.path.isfile('dqn_mountaincar.pth')

# Create and train the DQN Agent
env = gym.make('MountainCar-v0', render_mode="rgb_array")
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
seed = 0
agent = DQNAgent(state_size, action_size, seed)

n_episodes = 10000
max_t = 200
eps_start = 1.0
eps_end = 0.01
eps_decay = 0.995

scores = []
if load_model:
    #########################################################################################
    env = gym.make('MountainCar-v0', render_mode='human')
    agent.qnetwork_local.load_state_dict(torch.load('dqn_mountaincar.pth',map_location=device))
    print("Model loaded.")
    #########################################################################################
else:
    for i_episode in range(1, n_episodes + 1):
        #########################################################################################
        state = env.reset()[0]
        eps = max(eps_end, eps_start * (eps_decay ** i_episode))
        score = 0

        for t in range(max_t):
            action = agent.action(state, eps)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated | truncated
            agent.step(state, action, reward, next_state, done)
            state = next_state
            score += reward
            if done:
                break

        #########################################################################################

        scores.append(score)
        print(f"Episode {i_episode}/{n_episodes} - Score: {score:.2f}")

    # Save the trained model
    torch.save(agent.qnetwork_local.state_dict(), 'dqn_mountaincar.pth')
    print("Model saved.")



# Render the trained agent
state = env.reset()[0]
env.render()
done = False
while not done:
    action = agent.action(state, eps=0.0)  # Use epsilon=0 for deterministic actions
    state, _, terminated, truncated, _ = env.step(action)
    done = terminated | truncated
    env.render()
env.close()

# Plotting the rewards if training was done
if not load_model:
    plt.plot(np.arange(1, n_episodes + 1), scores)
    plt.ylabel('Score')
    plt.xlabel('Episode')
    plt.title('Score per Episode')
    plt.show()

Reinforce

import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import gymnasium as gym

# Define the policy network
class PolicyNetwork(nn.Module):
    def __init__(self, state_space, action_space):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_space, 128)
        self.fc2 = nn.Linear(128, action_space)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.softmax(self.fc2(x), dim=-1)
        return x

# Function to select an action
def select_action(policy, state):
    state = torch.FloatTensor(state).unsqueeze(0)
    probs = policy(state)
    m = torch.distributions.Categorical(probs)
    action = m.sample()
    log_prob = m.log_prob(action)
    return action.item(), log_prob

# Function to update the policy
def update_policy(optimizer, log_probs, rewards, gamma):
    discounted_rewards = []
    R = 0
    for r in rewards[::-1]:
        R = r + gamma * R
        discounted_rewards.insert(0, R)
    
    discounted_rewards = torch.FloatTensor(discounted_rewards)
    discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-9)

    policy_loss = []
    for log_prob, reward in zip(log_probs, discounted_rewards):
        policy_loss.append(-log_prob * reward)
    
    optimizer.zero_grad()
    policy_loss = torch.cat(policy_loss).sum()
    policy_loss.backward()
    optimizer.step()

# Main function to train the policy
def train_reinforce(env_name, num_episodes, gamma, learning_rate, model_path, max_steps_per_episode):
    env = gym.make(env_name)
    state_space = env.observation_space.shape[0]
    action_space = env.action_space.n

    policy = PolicyNetwork(state_space, action_space)
    optimizer = optim.Adam(policy.parameters(), lr=learning_rate)

    if os.path.exists(model_path):
        policy.load_state_dict(torch.load(model_path))
        print("Loaded existing model.")
        test_model(env, policy)
        return
    else:
        print("Training a new model.")

    for episode in range(num_episodes):
        state = env.reset()[0]
        log_probs = []
        rewards = []

        for step in range(max_steps_per_episode):
            action, log_prob = select_action(policy, state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated | truncated
            log_probs.append(log_prob)
            rewards.append(reward)
            state = next_state

            if done:
                break

        update_policy(optimizer, log_probs, rewards, gamma)
        print(f"Episode {episode + 1}/{num_episodes} finished with total reward {sum(rewards)}")

    torch.save(policy.state_dict(), model_path)
    print(f"Model saved to {model_path}")

def test_model(env, policy):
    env = gym.make("CartPole-v1",render_mode="human")
    state = env.reset()[0]
    done = False
    total_reward = 0
    while not done:
        env.render()
        action, _ = select_action(policy, state)
        state, reward, terminated, truncated, _ = env.step(action)
        done = terminated
        total_reward += reward
    env.close()
    print(f"Test completed with total reward {total_reward}")

if __name__ == "__main__":
    ENV_NAME = "CartPole-v1"
    NUM_EPISODES = 1000
    GAMMA = 0.99
    LEARNING_RATE = 0.01
    MODEL_PATH = "reinforce_cartpole.pth"
    MAX_STEPS_PER_EPISODE = 1000  # Set a limit on the maximum number of steps per episode

    train_reinforce(ENV_NAME, NUM_EPISODES, GAMMA, LEARNING_RATE, MODEL_PATH, MAX_STEPS_PER_EPISODE)

a2c 예제

import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import gymnasium as gym
from torch.distributions import Categorical

# Actor Network
class Actor(nn.Module):
    def __init__(self, input_size, num_actions):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.fc2 = nn.Linear(64, num_actions)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.softmax(self.fc2(x), dim=-1)
        return x

# Critic Network
class Critic(nn.Module):
    def __init__(self, input_size):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.fc2 = nn.Linear(64, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

def save_model(actor, critic, actor_path, critic_path):
    torch.save(actor.state_dict(), actor_path)
    torch.save(critic.state_dict(), critic_path)
    print(f"Saved actor model to {actor_path}")
    print(f"Saved critic model to {critic_path}")

def load_model(actor, critic, actor_path, critic_path):
    actor.load_state_dict(torch.load(actor_path))
    critic.load_state_dict(torch.load(critic_path))
    actor.eval()
    critic.eval()
    print(f"Loaded actor model from {actor_path}")
    print(f"Loaded critic model from {critic_path}")

def actor_critic(actor, critic, episodes, max_steps=2000, gamma=0.99, lr_actor=1e-3, lr_critic=1e-3):
    optimizer_actor = optim.AdamW(actor.parameters(), lr=lr_actor)
    optimizer_critic = optim.AdamW(critic.parameters(), lr=lr_critic)
    stats = {'Actor Loss': [], 'Critic Loss': [], 'Returns': []}

    env = gym.make('CartPole-v1')

    for episode in range(1, episodes + 1):
        state = env.reset()[0]
        ep_return = 0
        done = False
        step_count = 0

        while not done and step_count < max_steps:
            state_tensor = torch.FloatTensor(state)
            
            # Actor selects action
            action_probs = actor(state_tensor)
            dist = Categorical(action_probs)
            action = dist.sample()
            
            # Take action and observe next state and reward
            next_state, reward, terminated, truncated,_ = env.step(action.item())
            done = terminated | truncated
            # Critic estimates value function
            value = critic(state_tensor)
            next_value = critic(torch.FloatTensor(next_state))
            
            # Calculate TD target and Advantage
            td_target = reward + gamma * next_value * (1 - done)
            advantage = td_target - value
            
            # Critic update with MSE loss
            critic_loss = F.mse_loss(value, td_target.detach())
            optimizer_critic.zero_grad()
            critic_loss.backward()
            optimizer_critic.step()
            
            # Actor update
            log_prob = dist.log_prob(action)
            actor_loss = -log_prob * advantage.detach()
            optimizer_actor.zero_grad()
            actor_loss.backward()
            optimizer_actor.step()
            
            # Update state, episode return, and step count
            state = next_state
            ep_return += reward
            step_count += 1

        # Record statistics
        stats['Actor Loss'].append(actor_loss.item())
        stats['Critic Loss'].append(critic_loss.item())
        stats['Returns'].append(ep_return)

        # Print episode statistics
        print(f"Episode {episode}: Actor Loss: {actor_loss.item():.4f}, Critic Loss: {critic_loss.item():.4f}, Return: {ep_return}, Steps: {step_count}")

    env.close()
    return stats

# Main training loop
if __name__ == "__main__":
    actor_path = "actor_model.pth"
    critic_path = "critic_model.pth"
    
    # Check if pretrained model exists
    if os.path.exists(actor_path) and os.path.exists(critic_path):
        print("Loading pretrained models...")
        actor = Actor(input_size=4, num_actions=2)
        critic = Critic(input_size=4)
        load_model(actor, critic, actor_path, critic_path)
    else:
        print("Training new models...")
        actor = Actor(input_size=4, num_actions=2)
        critic = Critic(input_size=4)
        episodes = 3000  # Number of episodes for training
        stats = actor_critic(actor, critic, episodes)
        save_model(actor, critic, actor_path, critic_path)
    
    # Test the trained agent in human mode
    env = gym.make('CartPole-v1',render_mode='human')
    state = env.reset()[0]
    done = False
    total_reward = 0
    max_steps = 2000  # Maximum steps per episode for testing
    
    while not done:
        env.render()
        state_tensor = torch.FloatTensor(state)
        action_probs = actor(state_tensor)
        action = torch.argmax(action_probs).item()
        state, reward, terminated, truncated, _ = env.step(action)
        done = terminated | truncated
        total_reward += reward
        if total_reward >= max_steps:
            break
    
    print(f"Total reward in human mode: {total_reward}")
    env.close()

profile
+AI to AI+

0개의 댓글