다음은 이미지 재구성을 위한 변형 자동 인코더를 교육하기 위한 PyTorch의 예제 코드 스니펫입니다.
import torch
import torch.nn as nn
import torch.optim as optim
class VAE(nn.Module):
def __init__(self, input_dim, latent_dim):
super(VAE, self).__init__()
self.encoder = nn.Sequential(
nn.Conv2d(input_dim, 32, kernel_size=4, stride=2),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=4, stride=2),
nn.ReLU(),
nn.Conv2d(64, 128, kernel_size=4, stride=2),
nn.ReLU(),
nn.Conv2d(128, 256, kernel_size=4, stride=2),
nn.ReLU()
)
self.mu = nn.Linear(1024, latent_dim)
self.logvar = nn.Linear(1024, latent_dim)
self.decoder = nn.Sequential(
nn.ConvTranspose2d(latent_dim, 128, kernel_size=5, stride=2),
nn.ReLU(),
nn.ConvTranspose2d(128, 64, kernel_size=5, stride=2),
nn.ReLU(),
nn.ConvTranspose2d(64, 32, kernel_size=6, stride=2),
nn.ReLU(),
nn.ConvTranspose2d(32, input_dim, kernel_size=6, stride=2),
nn.Sigmoid()
)
def encode(self, x):
x = self.encoder(x)
x = torch.flatten(x, start_dim=1)
mu = self.mu(x)
logvar = self.logvar(x)
return mu, logvar
def reparameterize(self, mu, logvar):
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
return eps * std + mu
def decode(self, z):
z = z.view(z.size(0), z.size(1), 1, 1)
x = self.decoder(z)
return x
def forward(self, x):
mu, logvar = self.encode(x)
z = self.reparameterize(mu, logvar)
x_recon = self.decode(z)
return x_recon, mu, logvar
# Define training loop
def train_vae(model, train_loader, optimizer, criterion, device):
model.train()
train_loss = 0
for batch_idx, (data, _) in enumerate(train_loader):
data = data.to(device)
optimizer.zero_grad()
recon_batch, mu, logvar = model(data)
loss = criterion(recon_batch, data, mu, logvar)
loss.backward()
train_loss += loss.item()
optimizer.step()
return train_loss
# Train the VAE
vae = VAE(input_dim=3, latent_dim=32).to(device)
optimizer = optim.Adam(vae.parameters(), lr=1e-3)
criterion = nn
Here's an example code snippet in PyTorch for training an actor-critic model:
import torch
import torch.nn as nn
import torch.optim as optim
class Actor(nn.Module):
def __init__(self, state_dim, action_dim):
super(Actor, self).__init__()
self.fc1 = nn.Linear(state_dim, 64)
self.fc2 = nn.Linear(64, 64)
self.fc3 = nn.Linear(64, action_dim)
def forward(self, state):
x = torch.relu(self.fc1(state))
x = torch.relu(self.fc2(x))
action = torch.tanh(self.fc3(x))
return action
class Critic(nn.Module):
def __init__(self, state_dim):
super(Critic, self).__init__()
self.fc1 = nn.Linear(state_dim, 64)
self.fc2 = nn.Linear(64, 64)
self.fc3 = nn.Linear(64, 1)
def forward(self, state):
x = torch.relu(self.fc1(state))
x = torch.relu(self.fc2(x))
value = self.fc3(x)
return value
# Define training loop
def train_actor_critic(actor, critic, optimizer, memory, device, gamma=0.99):
actor.train()
critic.train()
state_batch, action_batch, reward_batch, next_state_batch, done_batch = memory.sample()
state_batch = torch.FloatTensor(state_batch).to(device)
action_batch = torch.FloatTensor(action_batch).to(device)
reward_batch = torch.FloatTensor(reward_batch).to(device)
next_state_batch = torch.FloatTensor(next_state_batch).to(device)
done_batch = torch.FloatTensor(done_batch).to(device)
with torch.no_grad():
next_value = critic(next_state_batch)
target_value = reward_batch + gamma * (1 - done_batch) * next_value
value = critic(state_batch)
critic_loss = nn.functional.mse_loss(value, target_value)
optimizer.zero_grad()
critic_loss.backward()
optimizer.step()
advantage = target_value - value.detach()
actor_loss = -torch.mean(advantage * actor(state_batch))
optimizer.zero_grad()
actor_loss.backward()
optimizer.step()
return actor_loss.item(), critic_loss.item()
# Train the actor-critic model
actor = Actor(state_dim=state_dim, action_dim=action_dim).to(device)
critic = Critic(state_dim=state_dim).to(device)
optimizer = optim.Adam(list(actor.parameters()) + list(critic.parameters()), lr=1e-3)
memory = ReplayBuffer(buffer_size=100000)
for i in range(num_episodes):
state = env.reset()
for t in range(max_steps_per_episode):
action = actor(torch.FloatTensor(state).to(device))
next_state, reward, done, _ = env.step(action.cpu().numpy())
memory.add(state, action, reward, next_state, done)
if len(memory) >= batch_size:
train_actor_critic(actor, critic, optimizer, memory, device)
if done:
break
Here's an example code snippet in PyTorch for selecting actions using the trained actor-critic model:
# Use the trained actor-critic model to select actions
def select_action(state, actor, device):
actor.eval()
with torch.no_grad():
state = torch.FloatTensor(state).to(device)
action = actor(state)
action = action.cpu().numpy()
return action
# Interact with the environment using the trained actor-critic model
state = env.reset()
for t in range(max_steps_per_episode):
action = select_action(state, actor, device)
next_state, reward, done, _ = env.step(action)
memory.add(state, action, reward, next_state, done)
if len(memory) >= batch_size:
train_actor_critic(actor, critic, optimizer, memory, device)
state = next_state
if done:
break
And here's an example code snippet in PyTorch for updating the world model using the experiences collected during interactions with the environment:
# Update the world model using collected experiences
def update_world_model(world_model, optimizer, memory, device):
world_model.train()
state_batch, action_batch, reward_batch, next_state_batch, done_batch = memory.sample()
state_batch = torch.FloatTensor(state_batch).to(device)
action_batch = torch.FloatTensor(action_batch).to(device)
reward_batch = torch.FloatTensor(reward_batch).to(device)
next_state_batch = torch.FloatTensor(next_state_batch).to(device)
done_batch = torch.FloatTensor(done_batch).to(device)
with torch.no_grad():
next_latent_state, _ = world_model.encode(next_state_batch)
latent_state, latent_action = world_model.encode(state_batch, action_batch)
latent_next_state_pred, reward_pred = world_model.predict_next_latent_state_and_reward(latent_state, latent_action)
latent_next_state_pred_error = nn.functional.mse_loss(latent_next_state_pred, next_latent_state)
reward_pred_error = nn.functional.mse_loss(reward_pred, reward_batch)
loss = latent_next_state_pred_error + reward_pred_error
optimizer.zero_grad()
loss.backward()
optimizer.step()
return loss.item()
# Update the world model using collected experiences
world_model = WorldModel(state_dim=state_dim, action_dim=action_dim, latent_dim=latent_dim).to(device)
optimizer = optim.Adam(world_model.parameters(), lr=1e-3)
memory = ReplayBuffer(buffer_size=100000)
for i in range(num_episodes):
state = env.reset()
for t in range(max_steps_per_episode):
action = select_action(state, actor, device)
next_state, reward, done, _ = env.step(action)
memory.add(state, action, reward, next_state, done)
if len(memory) >= batch_size:
update_world_model(world_model, optimizer, memory, device)
state = next_state
if done:
break