Genetic GFlowNet 생성 중 시행착오(part1)

seongyun·2025년 3월 4일

Neural Network

목록 보기
1/8

Genetic GFlowNet은 생성적 흐름 네트워크(GFlowNet)와 유전 알고리즘을 결합한 모델로, 분자 설계의 효율성을 높이기 위해 개발됨. 이 모델은 GFlowNet을 통해 분자를 생성하고, 유전 알고리즘을 적용하여 다양한 분자 구조를 탐색함으로써 신약 후보 물질의 발견을 가속화 시킴.


📌 진행 계획

1) GFlowNet 이해 및 기본 모델 구현

GFlowNet은 탐색 공간을 샘플링하는 확률적 모델로, 강화 학습과 유사한 방식으로 동작함.
PyTorch를 사용하여 기본적인 GFlowNet을 구축.

2) 유전 알고리즘 적용

GFlowNet이 생성한 분자 구조를 평가하고, 유전 알고리즘을 사용해 더 나은 샘플을 탐색.
돌연변이(Mutation)와 교차(Crossover) 연산 적용.

3) 모델 최적화 및 실험

생성된 분자 구조의 품질을 평가하는 손실 함수 설계.
학습 과정 튜닝(학습률 조정, Adam 옵티마이저 활용 등).


1단계: Genetic GFlowNet의 핵심 개념

Genetic GFlowNet은 GFlowNet(생성적 흐름 네트워크)유전 알고리즘(Genetic Algorithm, GA)을 결합한 모델

📌 GFlowNet이란?

강화 학습(RL)처럼 동작하면서 다양한 샘플을 생성하는 신경망
목표: 에너지가 낮은 (또는 보상이 높은) 샘플을 생성
일반적인 강화 학습과 차이점
RL은 보상이 높은 하나의 최적해를 찾지만, GFlowNet은 다양한 좋은 샘플을 생성하는 데 집중!

📌 유전 알고리즘(GA)이란?

자연선택과 돌연변이를 모방하여 최적의 해를 찾는 알고리즘
개체(샘플)를 선택 → 교차 → 변이 과정을 반복하며 최적화
Genetic GFlowNet에서는 GFlowNet이 생성한 샘플을 평가하여 더 좋은 샘플을 탐색


2단계: GFlowNet 기본 구현 (PyTorch)

📌 해야 할 일

MDP (Markov Decision Process) 기반으로 GFlowNet 구축
PyTorch로 정책 네트워크 (Policy Network) 설계
학습 과정에서 보상 함수 (Reward Function) 정의

코드 구현

# 1
import torch
import torch.nn as nn
import torch.optim as optim

# GFlowNet 정책 네트워크 정의
class PolicyNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        return torch.softmax(self.fc3(x), dim=-1)  # 확률 분포 출력

# 환경(Environment) 설정
class Environment:
    def __init__(self):
        self.state_dim = 10  # 상태 공간 크기
        self.action_dim = 5  # 행동 공간 크기

    def reset(self):
        return torch.rand(self.state_dim)  # 랜덤 초기 상태

    def step(self, action):
        reward = torch.randn(1).item()  # 임의의 보상
        next_state = torch.rand(self.state_dim)  # 다음 상태
        done = torch.rand(1).item() > 0.95  # 종료 여부 랜덤 결정
        return next_state, reward, done

# GFlowNet 학습 과정
def train_gflownet():
    env = Environment()
    policy_net = PolicyNetwork(state_dim=env.state_dim, action_dim=env.action_dim)
    optimizer = optim.Adam(policy_net.parameters(), lr=0.001)
    loss_fn = nn.MSELoss()

    for episode in range(100):  # 100번 반복 학습
        state = env.reset()
        total_reward = 0

        for _ in range(10):  # 한 에피소드에서 10번 실행
            action_prob = policy_net(state)
            action = torch.multinomial(action_prob, 1).item()
            next_state, reward, done = env.step(action)

            # 손실 계산 및 역전파
            loss = loss_fn(torch.tensor(reward), action_prob[action])
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            state = next_state
            total_reward += reward

            if done:
                break

        print(f"Episode {episode + 1}: Total Reward = {total_reward:.2f}")

# 실행
train_gflownet()

위의 코드에서 중 step(self, action) 함수에서 action이 들어가지만 실제로 사용되지 않았음.


해결 방법: action을 환경에 반영하기

GFlowNet에서는 행동(action)을 사용하여 상태(state)를 변화시키고 보상(reward)을 주어야 함.

def step(self, action):
    # 행동을 반영하여 새로운 상태 생성
    next_state = self.current_state + (action - 2) * 0.1  # 예제: 행동이 상태 변화에 영향을 줌
    next_state = torch.clamp(next_state, 0, 1)  # 값이 0~1 사이로 유지되도록 제한

    # 행동(action)에 따라 보상(reward) 결정
    reward = -torch.sum((next_state - 0.5) ** 2).item()  # 목표 상태(0.5)에 가까울수록 보상이 높음

    # 종료 여부 설정 (예제: 10번 이상 움직이면 종료)
    self.steps += 1
    done = self.steps >= 10

    return next_state, reward, done

이 문제를 해결한 이후 바로 나온 오류 메시지를 보면 dim=1이 잘못 설정됐다고 나와있음.

(IndexError: Dimension out of range (expected to be in range of [-1, 0], but got 1))


오류 원인 분석

입력 state가 1차원(torch.Size([10]))인데, policy_net(state) 호출 시 dim=1으로 softmax 적용
softmax(dim=1)을 적용하려면 입력 차원이 2D (batch_size, feature_size)여야 함
현재 state는 1차원(torch.Size([10])), 따라서 dim=1이 범위를 초과하여 오류 발생

해결 방법

1) state를 2D 텐서로 변환 : state가 (1, 10) 형태가 되면 softmax(dim=1) 적용 가능
2) dim=-1로 변경 : dim=-1은 마지막 차원을 의미하므로, 1차원 텐서에도 안전하게 적용 가능


수정된 코드

import torch
import torch.nn as nn
import torch.optim as optim

# GFlowNet 정책 네트워크 정의
class PolicyNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        return torch.softmax(self.fc3(x), dim=-1)  # 오류 해결 (dim=-1 사용)

# 환경(Environment) 설정
class Environment:
    def __init__(self):
        self.state_dim = 10  # 상태 공간 크기
        self.action_dim = 5  # 행동 공간 크기
        self.current_state = torch.rand(self.state_dim)  # 초기 상태
        self.steps = 0  # 스텝 카운터 추가

    def reset(self):
        self.current_state = torch.rand(self.state_dim)  # 새로운 랜덤 초기 상태
        self.steps = 0  # 스텝 초기화
        return self.current_state

    def step(self, action):
        next_state = self.current_state + (action - 2) * 0.1  
        next_state = torch.clamp(next_state, 0, 1)  # 값이 0~1 사이로 유지되도록 제한

        reward = -torch.sum((next_state - 0.5) ** 2).item()  

        self.steps += 1
        done = self.steps >= 10

        self.current_state = next_state  # 상태 업데이트
        return next_state, reward, done

# GFlowNet 학습 과정
def train_gflownet():
    env = Environment()
    policy_net = PolicyNetwork(state_dim=env.state_dim, action_dim=env.action_dim)
    optimizer = optim.Adam(policy_net.parameters(), lr=0.001)
    loss_fn = nn.MSELoss()

    for episode in range(100):  
        state = env.reset()
        total_reward = 0

        for _ in range(10):  
            state = state.unsqueeze(0)  # 차원 확장 (1D → 2D)
            action_prob = policy_net(state)
            action = torch.multinomial(action_prob, 1).item()
            state = state.squeeze(0)  

            next_state, reward, done = env.step(action)

            loss = loss_fn(torch.tensor([reward]), action_prob[:, action])

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            state = next_state
            total_reward += reward

            if done:
                break

        print(f"Episode {episode + 1}: Total Reward = {total_reward:.2f}")

# 실행
train_gflownet()

오류 없이 진행은 되었지만, 결과 값은 전부 음수.(Total Reward가 계속 음수로 유지되고 있어 학습이 제대로 이루어지지 않는 것 같다고 판단)


1. 문제점 분석

1) 보상 함수(reward function)가 너무 단순함

• 현재 보상은 reward = -torch.sum((next_state - 0.5) ** 2).item()
• 즉, next_state가 0.5에 가까울수록 보상이 커지는데, 이는 일반적인 환경과 다름
• 신경망이 이 목표를 학습하기 어려울 수 있음

2) 행동(action) 선택 방식이 불안정

• 현재 torch.multinomial(action_prob, 1).item()으로 행동을 선택하는데,
→ 만약 action_prob의 분포가 초기에 편향되면 비효율적인 행동을 지속적으로 선택할 가능성이 큼
• 정책이 충분히 학습되지 않으면 랜덤 선택과 다름없음

3) 손실 함수(Loss function)의 문제

• loss = loss_fn(torch.tensor([reward]), action_prob[:, action])에서
→ reward는 스칼라인데 action_prob[:, action]는 확률값
• 행동 확률과 보상을 직접 비교하는 것은 학습 안정성을 저하시킬 가능성이 있음


2. 해결 방법

1) 보상 함수 개선

• 단순히 0.5에 가까운지 보는 대신 목표 방향으로 이동하면 보상을 주도록 설정
next_state가 일정 방향으로 이동하면 보상을 증가

2) 행동 선택 방식 변경

• torch.multinomial(action_prob, 1).item() 대신 탐색을 위한 ε-greedy 전략 사용

3) 손실 함수 수정

• reward를 직접 사용하는 대신 Advantage를 고려한 업데이트 방식 적용


3. 개선된 코드

다음 코드에서는 학습 안정성을 높이기 위해 ε-greedy 탐색과 Advantage 기반 업데이트를 적용

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

# GFlowNet 정책 네트워크 정의
class PolicyNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        return torch.softmax(self.fc3(x), dim=-1)  # 행동 확률 출력

# 환경(Environment) 설정
class Environment:
    def __init__(self):
        self.state_dim = 10  # 상태 공간 크기
        self.action_dim = 5  # 행동 공간 크기
        self.current_state = torch.rand(self.state_dim)  # 초기 상태
        self.steps = 0  # 스텝 카운터 추가

    def reset(self):
        self.current_state = torch.rand(self.state_dim)  # 새로운 랜덤 초기 상태
        self.steps = 0  # 스텝 초기화
        return self.current_state

    def step(self, action):
        # 행동(action)에 따라 상태 변화
        next_state = self.current_state + (action - 2) * 0.1  
        next_state = torch.clamp(next_state, 0, 1)  # 값이 0~1 사이로 유지되도록 제한

        # 보상 함수 개선 (목표 상태 방향으로 이동하면 보상 증가)
        reward = -torch.sum((next_state - 0.7) ** 2).item()  # 목표를 0.7로 설정

        self.steps += 1
        done = self.steps >= 10

        self.current_state = next_state  # 상태 업데이트
        return next_state, reward, done

# GFlowNet 학습 과정
def train_gflownet():
    env = Environment()
    policy_net = PolicyNetwork(state_dim=env.state_dim, action_dim=env.action_dim)
    optimizer = optim.Adam(policy_net.parameters(), lr=0.001)
    loss_fn = nn.MSELoss()
    
    gamma = 0.99  # 할인율
    epsilon = 0.1  # 탐색 비율

    for episode in range(100):  
        state = env.reset()
        total_reward = 0

        for _ in range(10):  
            state = state.unsqueeze(0)  # 차원 확장 (1D → 2D)
            action_prob = policy_net(state).detach().numpy()[0]

            # ε-greedy 행동 선택 적용
            if np.random.rand() < epsilon:
                action = np.random.choice(env.action_dim)  # 랜덤 탐색
            else:
                action = np.argmax(action_prob)  # 가장 높은 확률의 행동 선택

            state = state.squeeze(0)  # 차원 원래대로

            next_state, reward, done = env.step(action)

            # 손실 함수 개선 (Advantage 기반)
            advantage = reward + gamma * np.max(action_prob) - action_prob[action]
            loss = loss_fn(torch.tensor([advantage]), torch.tensor([action_prob[action]]))

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            state = next_state
            total_reward += reward

            if done:
                break

        print(f"Episode {episode + 1}: Total Reward = {total_reward:.2f}")

# 실행
train_gflownet()

하지만 이 코드의 문제는 state.squeeze(0)를 호출했지만, 그 결과를 변수에 저장하지 않았다.

for episode in range(100):
    state = env.reset()
    total_reward = 0

    for _ in range(10):
        state = state.unsqueeze(0)  # (1, state_dim)으로 변환
        action_prob = policy_net(state).detach().numpy()[0]
        # state = state.squeeze(0)  불필요 -> 삭제 가능

        if np.random.rand() < epsilon:
            action = np.random.choice(env.action_dim)
        else:
            action = np.argmax(action_prob)

        next_state, reward, done = env.step(action)

        advantage = reward + gamma * np.max(action_prob) - action_prob[action]
        loss = loss_fn(torch.tensor([advantage]), torch.tensor([action_prob[action]]))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        state = next_state
        total_reward += reward

        if done:
            break

    print(f"Episode {episode + 1}: Total Reward = {total_reward:.2f}")

그래서 state.squeeze(0)을 삭제하고 진행을 했지만 오류는 계속 나옴.
이번 오류의 핵심 원인은 loss.backward()를 호출할 때, loss 텐서가 requires_grad=True 상태가 아니어서 역전파가 불가능하기 때문

RuntimeError: element 0 of tensors does not require grad and does not have a grad_fn

이 오류는 loss 텐서가 requires_grad=True가 아니므로 역전파(gradient 계산)를 할 수 없다는 뜻

문제의 원인은 loss_fn(torch.tensor([...]), torch.tensor([...])) 부분에 있음.
아래 코드를 보면, torch.tensor([...])로 새로운 텐서를 만들 때 자동으로 requires_grad=False로 설정됨.

loss = loss_fn(torch.tensor([advantage]), torch.tensor([action_prob[action]]))

여기서 torch.tensor([...])로 감싸면 새로운 텐서가 생성되며, 자동으로 requires_grad=False가 되어 버림.


해결 방법

1️⃣ torch.tensor([...]) 대신 torch.tensor([...], requires_grad=True) 사용

손실(loss) 계산을 위해 requires_grad=True를 명시적으로 설정해야 함.

2️⃣ torch.tensor([...]) 대신 torch.from_numpy() 사용

이미 numpy 배열로 존재하는 데이터를 torch.Tensor로 변환할 때는 torch.from_numpy()를 사용하는 것이 더 적절함.
→ torch.from_numpy()를 사용하면 PyTorch 텐서로 변환되며, 이 텐서는 기본적으로 requires_grad=True가 유지됨.

3️⃣ torch.tensor()를 안 쓰고 그냥 텐서를 직접 사용

사실상 advantage와 action_prob[action]이 이미 torch.Tensor라면 굳이 torch.tensor([...])를 만들 필요 없음.
unsqueeze(0)를 추가하는 이유: loss_fn() 함수가 1D 텐서가 아니라 2D 텐서를 기대할 수도 있으므로 차원을 맞춰주는 것이 좋음.


부분 수정 코드

advantage_tensor = torch.tensor([advantage], dtype=torch.float32, requires_grad=True)
action_prob_tensor = torch.tensor([action_prob[action]], dtype=torch.float32, requires_grad=True)

loss = loss_fn(advantage_tensor, action_prob_tensor)

하지만 저렇게 코드 수정을 진행하면 gamma가 활성화되지 못하는 문제가 발생함.

part2에서 다음 문제부터 진행

0개의 댓글