Pytorch로 Transformer 구현해보기 (3/3)

이권동·2022년 4월 1일
0

공부 정리를 위한 글입니다.

트랜스포머


3. 트랜스포머

트랜스포머의 구조는 N개의 인코더와 디코더가 쌓여있고 입력 문장(소스 문장)을 입력하면 인코더에서 해당 문장에 대한 표현을 학습시키고, 그 결과값을 디코더에 보내면 디코더에서 타깃 문장을 생성한다.

디코더는 vocab에 대한 확률 분포를 예측하고 확률이 가장 큰 단어를 선택한다. 트랜스포머에서는 올바른 문장을 생성하려면 예측 확률 분포와 실제 확률 분포 사이의 차이를 최소화해야 한다. 이때 교차 엔트로피(cross-entropy)를 사용하여 분포의 차이를 알 수 있다. 이때 옵티마이저는 Adam 을 사용한다.

# transformers.py
import torch.nn as nn

from module.encoder import Encoder
from module.decoder import Decoder
from module.utils import make_mask

class Transformer(nn.Module):
    def __init__(self, vocab_size, num_layers, d_model, n_heads, hidden, max_len, dropout) -> None:
        super().__init__()
        self.enc_outputs = Encoder(vocab_size, num_layers, d_model, n_heads, hidden, max_len, dropout)
        self.dec_outputs = Decoder(vocab_size, num_layers, d_model, n_heads, hidden, max_len, dropout)
        self.output = nn.Linear(d_model, vocab_size)
        self.softmax = nn.LogSoftmax(dim=-1)

    def forward(self, input, dec_input):
        enc_input = input
        dec_input = dec_input
        enc_padding_mask = make_mask(enc_input, "padding")
        dec_padding_mask = make_mask(enc_input, "padding")
        look_ahead_mask = make_mask(dec_input, "lookahead")

        enc_output = self.enc_outputs(enc_input, enc_padding_mask)
        dec_output = self.dec_outputs(enc_output, dec_input, dec_padding_mask, look_ahead_mask)

        outputs = self.output(dec_output)
        outputs = self.softmax(outputs)

        return outputs
# train.py
import torch
import torch.nn as nn
import pandas as pd
import re
import os
import matplotlib.pyplot as plt
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

from module.transformer import Transformer
from torchtext.legacy import data
from soynlp.tokenizer import LTokenizer

tokenizer = LTokenizer()

MAX_LENGTH = 60
D_MODEL = 128
NUM_LAYERS = 4
NUM_HEADS = 4
HIDDEN = 512
BATCH_SIZE = 64
NUM_EPOCH = 100
DROPOUT = 0.3

train_data = pd.read_csv("chatbot.csv")

questions = []
answers = []

for sentence in train_data['Q']:
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = sentence.strip()
    questions.append(sentence)

for sentence in train_data['A']:
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = sentence.strip()
    answers.append(sentence)


Q = data.Field(
    sequential=True,
    use_vocab=True,
    lower=True,
    tokenize=tokenizer,
    batch_first=True,
    init_token="<SOS>",
    eos_token="<EOS>",
    fix_length=MAX_LENGTH
)

A = data.Field(
    sequential=True,
    use_vocab=True,
    lower=True,
    tokenize=tokenizer,
    batch_first=True,
    init_token="<SOS>",
    eos_token="<EOS>",
    fix_length=MAX_LENGTH
)

train_data_set = data.TabularDataset(
    path="chatbot.csv", 
    format="csv", 
    skip_header=False, 
    fields=[("Q",Q), ('A',A)]
)

print('훈련 샘플의 개수 : {}'.format(len(train_data_set)))

Q.build_vocab(train_data_set.Q, train_data_set.A, min_freq = 2)
A.vocab = Q.vocab

VOCAB_SIZE = len(Q.vocab)

PAD_TOKEN, START_TOKEN, END_TOKEN, UNK_TOKEN = Q.vocab.stoi['<pad>'], Q.vocab.stoi['<SOS>'], Q.vocab.stoi['<EOS>'], Q.vocab.stoi['<unk>']

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iter = data.BucketIterator(
    train_data_set, batch_size=BATCH_SIZE,
    shuffle=True, repeat=False, sort=False, device=device
)


print(VOCAB_SIZE)
transformer = Transformer(VOCAB_SIZE, NUM_LAYERS, D_MODEL, NUM_HEADS, HIDDEN, MAX_LENGTH, DROPOUT)

# 네트워크 초기화
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Linear') != -1:
        # Liner층의 초기화
        nn.init.kaiming_normal_(m.weight)
        if m.bias is not None:
            nn.init.constant_(m.bias, 0.0)

# 훈련 모드 설정
transformer.train()

# TransformerBlock모듈의 초기화 설정
transformer.apply(weights_init)

print('네트워크 초기화 완료')

# 손실 함수의 정의
criterion = nn.CrossEntropyLoss()

# 최적화 설정
learning_rate = 2e-4
optimizer = torch.optim.Adam(transformer.parameters(), lr=learning_rate)

best_epoch_loss = float("inf")
epoch_ = []
epoch_train_loss = []

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("사용 디바이스:", device)
print('-----start-------')
transformer.to(device)

# 네트워크가 어느정도 고정되면 고속화
torch.backends.cudnn.benchmark = True

for epoch in range(NUM_EPOCH):
    epoch_loss = 0.0
    count = 0
    for batch in train_iter:
        questions = batch.Q.to(device)
        answers = batch.A.to(device)
        with torch.set_grad_enabled(True):
            preds = transformer(questions, answers)
            pad = torch.LongTensor(answers.size(0), 1).fill_(PAD_TOKEN).to(device)
            preds_id = torch.transpose(preds,1,2)
            outputs = torch.cat((answers[:, 1:], pad), -1)
            optimizer.zero_grad()
            loss = criterion(preds_id, outputs)  # loss 계산
            loss.backward()
            torch.nn.utils.clip_grad_norm_(transformer.parameters(), 0.5)
            optimizer.step()
            epoch_loss +=loss.item()
            count += 1
    epoch_loss = epoch_loss / count

    if not best_epoch_loss or epoch_loss < best_epoch_loss:
        if not os.path.isdir("snapshot"):
            os.makedirs("snapshot")
        torch.save(transformer.state_dict(), "./snapshot/transformer.pt")
    
    epoch_.append(epoch)
    epoch_train_loss.append(epoch_loss)

    print(f"Epoch {epoch+1}/{NUM_EPOCH} Average Loss: {epoch_loss}")

fig = plt.figure(figsize=(8,8))
fig.set_facecolor('white')
ax = fig.add_subplot()

ax.plot(epoch_,epoch_train_loss, label='Average loss')


ax.legend()
ax.set_xlabel('epoch')
ax.set_ylabel('loss')

plt.savefig('train.png')
profile
배워서 효율적으로 써먹자

0개의 댓글