본 블로그 포스팅은 수도권 ICT 이노베이션 스퀘어에서 진행하는 AI 핵심 기술 집중 클래스의 자연어처리(NLP) 강좌 내용을 필자가 다시 복기한 내용에 관한 것입니다.
이전 포스트 NLP- Seq to Seq (4) : 번역를 요약하자면

훈련/검증 성능은 상당히 우수하게 나왔는데
실제 모델을 사용하는 '추론' 결과를 보면
번역 품질이 상당히 나쁜것을 확인할 수 있다.
왜 이런참사가 났는지를 요약을 하자면
생성모델(Generative Models)에 속하는 Seq2Seq을 그동안 하던 대로 판별 모델 (Discriminative Models)을 실습하듯이 훈련을 진행했기에 발생한 문제이다.
여기서 잠깐 생성모델(Generative Models)과 판별 모델 (Discriminative Models)에 대해 설명을 하자면

위 사진처럼 판별 모델은 위 사진처럼 여러 데이터()가 색상별로 라벨링()이 되어있다면,
각 데이터 군집을 잘 구분할 수 있는 결정경계(Decision Boundary)를 학습하는 것을 목표로 한다.
따라서 학습 시에는 색상별로 라벨링()이 되어 있음을 매 학습때마다 알아야 하는 것이 전제조건이다.
즉, 매 학습 단계마다 정답 라벨()참조를 해야한다.

반대로 생성모델(Generative Models)은 역시 데이터()가 색상별로 라벨링()처리되어 군집화가 되어 있다면
이 군집(확률분포 : Probability Distribution)을 학습하는 것으로
최종적으로 군집 내에 위치할법한 임의의 데이터를 생성하는 것이다.
여기서 중요한 것은 라벨링() 데이터는 위치정보이지
분포는 영역으로
Position과 Region의 차이점이 발생한다.
어떻게 보면 0차원 데이터인 점(Position)으로 최소 2차원 이상인 데이터 : 영역(Region)을 유추하라는 것이니 상당히 골이 아파지는 문제를 풀어내는게 생성모델(Generative Models)이라 볼 수 있다.
따라서 이 문제를 해결하기 기존의 지도학습과 라벨링 데이터를 의도적으로 배제한 비지도학습 방법론을 혼합하여 모델을 학습시킨다.

이 때 지도학습 / 비 지도학습의 비율을 조정을 해야 하는데
만약 지도학습 비율이 너무 낮아지게 되면 분포는 학습하긴 하지만 그 영역의 크기가 너무 커져서 생성되는 데이터의 랜덤성이 무한에 가깝게 올라가는 문제점이 있고,
지도학습의 비율이 너무 높으면 영역의 크기가 상당히 작아지게 되어 생성하는 데이터가 있을법한 위치를 선택하는게 모델 입장에서는 몇가지 안 남게 되버린다.
따라서 이 비율을 잘 조정하는것, 이것이 바로 Techer forcing(교사강요)이다.
이게... 참 말로 설명하기 어려운 내용이긴 한데
좀더 직접적인 사례를 들어 설명을 하자면
아래의 영상을 통해서 설명이 가능할 듯 하다.
코딩공부를 할 때 코파일럿을 사용하면
어려움은 즉각적으로 사라지고, 오류도 빠르게 수정하는 등
생산성이 올라가는 효용성이 있지만
머릿속에는 하나도 남는게 없다
라고 토로하는 영상이다.

즉 위 gif처럼 함수명만 입력하면
함수 내에 동작해야 할 모든 workflow를 전부 자동으로 코드화 해주지만
코파일럿을 off하고 직접 함수 내 동작구조를 일일이 코딩연습을 할 필요성이 있음을 언급하는 것이다.
필자는 이것이 Techer forcing(교사강요) 문제에 해당한다 생각한다.
코파일럿을 통해 정답을 알려주니까
마치 내가 코딩마스터가 된 기분이지만
진짜 시험장에 입장해서 코딩문제를 오프라인에서 풀어보라고 하면 1줄도 제대로 쓰지 못하는 상황
이게 이전포스트 NLP- Seq to Seq (4) : 번역 에 발생한
정답지를 계속 알려주는
학습/검증 성능은 100%가 나오지만
막상 정답지 없이 추론할 때는
번역성능이 형편없어지는 문제가 발생하는 것이다.
즉, 머리를 싸매고 디버깅을 해야 하듯이
번역(문장생성)도 정답지 없이 AI모델이 스스로 고뇌하는 시간이 필요하다는 것이다.
따라서 결론을 정리하자면
Seq2Seq는 생성모델이고, 라벨링 데이터는 지도학습/비지도학습 비율을 조정해가면서 입력해야 한다.
Seq2Seq를 훈련시킬 때
지도학습 / 비 지도학습 비율을 조정해야 하는 것은 알게 되었다.
그러나 문제가 있다

위 사진처럼 인코더에 데이터()가 입력되고
디코더에는 라벨()이 입력되는 구조이기에
비 지도학습으로 Seq2Seq를 구동시키려면 디코더에 뭘 입력해야 할지 모르는 상황이 발생한다.
어쨋든 디코더도 입력이 있어야 동작을 할 것이 아닌가?
그래서 나온 생성모델의 하위 분류가 Auto-Regressive model(자가회귀 모델)이다.
이 부분은 이전포스트 NLP- Seq to Seq (4) : 번역의 추론 과정에서도 한번 언급만 하고 넘어가긴 했지만
현재 상태의 출력값을 다음 상태의 입력값으로 사용하는 것
이것이 자가회귀 모델이다.
디코더도 구동하려면 입력정보가 필요하니 이전상태의 출력정보를 현재상태의 입력정보로 재귀하여 사용하는 것이다.
따라서 자가 회귀모델을 적용하여 비지도학습상태의 디코더를 살펴본다면 아래와 같아진다.

위 사진처럼 비지도 학습상태에서는 이전 디코더 셀의 출력결과가 현재 디코더 셀의 입력으로 사용되게끔
입력 시퀀스를 구성한다.
지금 실습자료가 번역 단어 생성이기에 시퀀스 데이터를 생성할 때는 자가회귀 모델을 사용하는게 감이 좀 안오는 부분이 있는데
이를 아래의 그래프로 놓고 보면 어느정도 연상이 되리라 생각한다.

위 내용을 통해서 Seq2Seq는 입력되는 라벨 데이터를 배제하는 비 지도학습 과정이 필요하며,
이 비 지도학습을 구현하는데는 자가회귀 모델방식으로 배제된 라벨 데이터를 매 seq마다 생성하여 다시 이를 입력으로 사용해야 함을 이해했을 것이다.
이제 이를 Seq2Seq의 훈련모드 부분에 적용하고자 한다.

Techer forcing(교사강요) 인자값을 적용한 Seq2Seq코드는 위와 같으며,
추론과정 코드는 이전 포스트에서 설명했으니
훈련/검증 과정의 코드만 설명을 진행하겠다.

이전포스트 NLP- Seq to Seq (4) : 번역에서
독일어 한국어 변환이 처참했으니
이를 다시 수행하고자 한다.

데이터 전처리
import os
path_1 = './data/TL_다국어-한국어_deko_기타'
path_2 = './data/TL_다국어-한국어_deko_다큐교양'
path_3 = './data/TL_다국어-한국어_deko_영화드라마'
file_1_list = os.listdir(path_1)
file_2_list = os.listdir(path_2)
file_3_list = os.listdir(path_3)
# 전체 파일 경로 리스트 생성
file_1_paths = [os.path.join(path_1, file) for file in file_1_list]
file_2_paths = [os.path.join(path_2, file) for file in file_2_list]
file_3_paths = [os.path.join(path_3, file) for file in file_3_list]
file_paths = file_1_paths + file_2_paths + file_3_paths
import json
parsered_data = [] # 파싱 데이터를 저장할 변수
for file_path in file_paths:
#json파일 불러오기
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 문서 정보 중 필요항목을 딕셔너리에 담고 리스트에 추가
raw_dict = {
"문서ID" : data['ID'],
"번역" : "독->한",
"원문(src)" : data['원문'],
"번역문(tar)" : data['최종번역문']
}
parsered_data.append(raw_dict)
import pandas as pd
# 파싱이 완료된 데이터를 Dataframe 형식으로 변환
raw_data = pd.DataFrame(parsered_data)
# 데이터 및 텍스트 전처리 함수를 모듈화 시킨 파일
from NLP_pp import *
# 데이터셋읜 결측치 & 중복치 제거 함수 실행
raw_data = df_cleaning(raw_data, '원문(src)')
raw_data = df_cleaning(raw_data, '번역문(tar)')
import re
def regex_sub_DE(sent):
# 입력 텍스트를 소문자로 변환
sent = sent.lower()
# 단어와 구두점 사이에 공백을 추가
sent = re.sub(r"([?.!,¿])", r" \1", sent)
# 다수 개의 공백을 하나의 공백으로 치환
sent = re.sub(r"\s+", " ", sent)
return sent.strip()
def regex_sub_KR(sent):
# 단어와 구두점 사이에 공백을 추가
sent = re.sub(r"([?.!,¿])", r" \1", sent)
# 단순하게 텍스트에서 다수 개의 공백을 하나의 공백으로 치환
sent = re.sub(r"\s+", " ", sent)
return sent.strip()
# 독일어, 한국어에 대한 데이터 클리닝 수행
raw_data['원문(src)'] = raw_data['원문(src)'].apply(regex_sub_DE)
raw_data['번역문(tar)'] = raw_data['번역문(tar)'].apply(regex_sub_KR)

여기까지는 이전포스트와 동일하게 수행했다
텍스트 전처리
# 인코더의 입력 데이터인 src 데이터
raw_src_data = raw_data['원문(src)'].values.tolist()
# 디코더의 정답지 데이터인 tar 데이터
raw_tar_data = raw_data['번역문(tar)'].values.tolist()
from transformers import BertTokenizer
# 독일어 토크나이저 모델: dbmdz/bert-base-german-uncased
german_tokenizer = BertTokenizer.from_pretrained("dbmdz/bert-base-german-uncased")
# 한국어 토크나이저 모델: kykim/bert-kor-base
korean_tokenizer = BertTokenizer.from_pretrained("kykim/bert-kor-base")
# 원문 토큰화 수행
tokenized_src_data = tokenize(raw_src_data, german_tokenizer, arch='Bert')
# 번역문 토큰화 수행
tokenized_tar_data = tokenize(raw_tar_data, korean_tokenizer, arch='Bert')
데이터셋 분리는 훈련/검증/평가 각각 80%/15%/5%로 나눈다
from sklearn.model_selection import train_test_split
# 훈련/검증/평가를 80%, 15%, 5%로 분할을 수행
# random_state -> 데이터셋을 내누는데 '재현성' 유지를 위해 넣음 -> 안넣어도 됨
# stratify -> y 클래스 비율을 알기 어렵기에 해당 항목은 없앰
src_train, src_etc, tar_train, tar_etc = train_test_split(
tokenized_src_data, tokenized_tar_data, test_size=0.20
)
# 그 외 데이터셋을 반반으로 Val, Test로 나눔
src_val, src_test, tar_val, tar_test = train_test_split(
src_etc, tar_etc, test_size=0.25
)
from collections import Counter
src_word_list = []
tar_word_list = []
# train항목을 워드 리스트에 입력
for src_sent, tar_sent in zip(src_train, tar_train):
for word in src_sent:
src_word_list.append(word)
for word in tar_sent:
tar_word_list.append(word)
# val항목을 워드 리스트에 입력
for src_sent, tar_sent in zip(src_train, tar_train):
for word in src_sent:
src_word_list.append(word)
for word in tar_sent:
tar_word_list.append(word)
# 단어와 해당 단어의 출몰 빈도를 함께 저장하는
# Counter 타입의 변수 생성
src_word_counts = Counter(src_word_list)
tar_word_counts = Counter(tar_word_list)

이번에는 희소단어 삭제 과정을 수행하지 않는다
즉 UNK토큰은 있긴 하지만 왠만해서는 사용될 일이 없다 보면 되겟다.
rare_th = 0 #희소단어의 등장 빈도를 결정하는 파라미터
# 희소단어 등장 빈도를 바탕으로 희소 단어를 배제하기 위해 준비 함수
print(f'---원문(src)에 대한 희소단어 분석---')
src_tot_vocab_cnt, src_rare_vocab_cnt = set_rare_vocab(src_word_counts, rare_th)
print(f'\n---번역문(tar)에 대한 희소단어 분석---')
tar_tot_vocab_cnt, tar_rare_vocab_cnt = set_rare_vocab(tar_word_counts, rare_th)
#등장 빈도가 높은 단어 순으로 정렬하기
src_vocab = sorted(src_word_counts, key=src_word_counts.get, reverse=True)
tar_vocab = sorted(tar_word_counts, key=tar_word_counts.get, reverse=True)
# 원문(src)에 대한 희소단어 배제 & 정렬 작업 수행
src_vocab_size = src_tot_vocab_cnt - src_rare_vocab_cnt
src_vocab = src_vocab[:src_vocab_size]
# 번역문(tar에 대한 희소단어 배제 & 정렬 작업 수행
tar_vocab_size = tar_tot_vocab_cnt - tar_rare_vocab_cnt
tar_vocab = tar_vocab[:tar_vocab_size]
# 스페셜 토큰 선언
spec_token = ['<PAD>', '<UNK>', '<SOS>', '<EOS>']
# 스페셜 토큰을 포함한 {단어:단어idx}의 딕셔너리 생성
print(f'---원문(src)에 대한 단어장 분석---')
src_to_idx, idx_to_src = set_word_to_idx(spec_token, src_vocab,
report=True)
print(f'\n---번역문(tar)에 대한 단어장 분석---')
tar_to_idx, idx_to_tar = set_word_to_idx(spec_token, tar_vocab,
report=True)

# 원문(src) 데이터셋의 정수 인코딩 수행
e_src_train = text_to_sequences(src_train, src_to_idx)
e_src_val = text_to_sequences(src_val, src_to_idx)
e_src_test = text_to_sequences(src_test, src_to_idx)
# 번역문(tar) 데이터셋의 정수 인코딩 수행
e_tar_train = text_to_sequences(tar_train, tar_to_idx)
e_tar_val = text_to_sequences(tar_val, tar_to_idx)
e_tar_test = text_to_sequences(tar_test, tar_to_idx)
번역문(tar)에 관하여 접두, 접미에 각각 SOS, EOS토큰을 붙이는 함수는 이전 포스트와 다르게
함수명을 prefix_suffix_token_insert로 재 명기했으며
이 함수는 NLP_pp.py에 추가하여 관리한다.
# 스페셜토큰에 접두토큰, 접미토큰이 있을 시
# 문서의 접두, 접미에 추가해주는 함수
def prefix_suffix_token_insert(e_docs, spec_token):
try:
en_sos = spec_token.index('<SOS>')
en_eos = spec_token.index('<EOS>')
except ValueError as e:
raise ValueError("SOS,EOS토큰이 없음") from e
res_e_doc = []
for e_doc in e_docs:
e_doc.insert(0, en_sos) #맨앞에 sos토큰추가
e_doc.append(en_eos) #맨 뒤에 eos토큰 추가
res_e_doc.append(e_doc)
return res_e_doc
# 번역문(tar)의 접두/접미에 SOS, EOS 토큰 추가
e_tar_train = prefix_suffix_token_insert(e_tar_train, spec_token)
e_tar_val = prefix_suffix_token_insert(e_tar_val, spec_token)
e_tar_test = prefix_suffix_token_insert(e_tar_test, spec_token)
# 원문(src)에 대한 문장패딩 하이퍼 파라미터 설정
src_seq_len = 60
set_sent_pad(e_src_train, src_seq_len, report=True)
# 번역문(tar)에 대한 문장패딩 하이퍼 파라미터 설정
tar_seq_len = 55
set_sent_pad(e_tar_train, tar_seq_len, report=True)

# 원문(src)의 문장 패딩(정수인코딩 완료)
padded_src_train = pad_seq_x(e_src_train, src_seq_len)
padded_src_val = pad_seq_x(e_src_val, src_seq_len)
padded_src_test = pad_seq_x(e_src_test, src_seq_len)
# 번역문(tar)의 문장 패딩(정수인코딩 완료)
padded_tar_train = pad_seq_x(e_tar_train, tar_seq_len)
padded_tar_val = pad_seq_x(e_tar_val, tar_seq_len)
padded_tar_test = pad_seq_x(e_tar_test, tar_seq_len)
import torch
bs = 256 # Batch_size 하이퍼 파라미터
# 정수(원핫)인코딩 데이터를 데이터로더로 변환
trainloader = set_dataloader(padded_src_train, padded_tar_train, bs,
content='훈련', report=True)
valloader = set_dataloader(padded_src_val, padded_tar_val, bs,
content='검증', report=True)
testloader = set_dataloader(padded_src_test, padded_tar_test, bs,
content='평가', report=True)

전처리 과정의 끝은 데이터로더 생성이니
이제 모델에 Techer forcing인자를 포함하어 재설계를 수행하도록 하겠다.
이제 본격적으로 Techer forcing인자를 도입한 Seq2Seq을 학습시켜보자
주요 하이퍼 파라미터 정의
# 주요 하이퍼 파라미터 정리
src_VS = len(src_to_idx) # 원문 단어장 개수
tar_VS = len(tar_to_idx) # 번역문 단어장 개수
src_SL = src_seq_len # 원문의 문장 길이
# 번역문의 문장 길이를 디코더(생성)문장 길이로 쓰자
tar_SL = tar_seq_len # 번역문 문장 길이
EMB_DIM = 256 # 인코더/디코더의 임베딩 레이어 차원
# unit_dim은 인코더-디코더 사이의 히든레이어처럼 생각하는게 편함
UNIT_DIM = 256 # 인코더와 디코더의 rnn_out 차원값
NUM_Layers = 2 # 인코더/디코더의 셀은 2층으로 만들자
BI_DIR = False # 단방향으로만 학습

모델 설계
각각의 인코더, 디코더를 먼저 선언 후 버전별 Seq2Seq를 붙이겠다.
import torch.nn as nn
class Encoder_LSTM(nn.Module):
def __init__(self, src_vocab, src_emb_dim, rnn_dim,
num_layer=1, bi=False):
super(Encoder_LSTM, self).__init__()
# <PAD>토큰의 인덱싱을 지정하면 해당 idx(0)은
# word_vector을 만들 때 모두 0으로 채워지게 만들어준다.
self.embed = nn.Embedding(src_vocab, src_emb_dim,
padding_idx=0)
self.lstm = nn.LSTM(input_size=src_emb_dim,#언어모델 입력차원
hidden_size=rnn_dim, #언어모델 출력차원
num_layers=num_layer, #언어모델 몇층?
bidirectional=bi, #양방향학습 On?
batch_first=True) #왠만하면 True
def forward(self, x): # x의 차원 : (BS, src_seq_len)
emb = self.embed(x) # (BS, src_seq_len, src_emb_dim)
# 인코더에 양방향 학습을 적용한다
# rnn_out : (bs, src_seq_len, hidden_dim * 2)
# hidden : (num_layer * 2, bs, hidden_dim)
rnn_out , (hidden, cell) = self.lstm(emb)
#인코더의 출력은 context_vector
return hidden, cell
import torch.nn as nn
class Decoder_LSTM(nn.Module):
def __init__(self, tar_vocab, tar_emb_dim, rnn_dim,
num_layer=1, bi=False):
super(Decoder_LSTM, self).__init__()
# <PAD>토큰의 인덱싱을 지정하면 해당 idx(0)은
# word_vector을 만들 때 모두 0으로 채워지게 만들어준다.
self.embed = nn.Embedding(tar_vocab, tar_emb_dim,
padding_idx=0)
self.lstm = nn.LSTM(input_size=tar_emb_dim,#언어모델 입력차원
hidden_size=rnn_dim, #언어모델 출력차원
num_layers=num_layer, #언어모델 몇층?
bidirectional=bi, #양방향학습 On?
batch_first=True) #왠만하면 True
# 디코더의 출력은 정답(번역문)의 seq_len이 되게 해야함
# 맞춰야 하는 클래스 개수는 정답지의 단어 개수임
if bi : #양방향으로 학습시에는 FC 레이어 입력차원이 두배
self.fc = nn.Linear(rnn_dim*2, tar_vocab)
else:
self.fc = nn.Linear(rnn_dim, tar_vocab)
# 디코더는 인코더의 context_vector을 초기 hidden으로 입력받는다.
def forward(self, x, hidden, cell): # x의 차원 : (BS, tar_seq_len)
emb = self.embed(x) # (BS, tar_seq_len, tar_emb_dim)
# 디코더에 양방향 학습을 적용한다
# rnn_out : (bs, tar_seq_len, hidden_dim * 2)
# hidden : (num_layer * 2, bs, hidden_dim)
rnn_out, (hidden, cell) = self.lstm(emb, (hidden,cell))
output = self.fc(rnn_out)
# 최종 출력은 (bs, seq_len, tar_vocab)
return output, hidden, cell
여기가 Seq2Seq의 LSTM버전에 사용되는 인코더 - 디코더
import torch.nn as nn
class Encoder_GRU(nn.Module):
def __init__(self, src_vocab, src_emb_dim, rnn_dim,
num_layer=1, bi=False):
super(Encoder_GRU, self).__init__()
# <PAD>토큰의 인덱싱을 지정하면 해당 idx(0)은
# word_vector을 만들 때 모두 0으로 채워지게 만들어준다.
self.embed = nn.Embedding(src_vocab, src_emb_dim,
padding_idx=0)
self.gru = nn.GRU(input_size=src_emb_dim,#언어모델 입력차원
hidden_size=rnn_dim, #언어모델 출력차원
num_layers=num_layer, #언어모델 몇층?
bidirectional=bi, #양방향학습 On?
batch_first=True) #왠만하면 True
def forward(self, x): # x의 차원 : (BS, src_seq_len)
emb = self.embed(x) # (BS, src_seq_len, src_emb_dim)
# 인코더에 양방향 학습을 적용한다
# rnn_out : (bs, src_seq_len, hidden_dim * 2)
# hidden : (num_layer * 2, bs, hidden_dim)
rnn_out , hidden = self.gru(emb)
#인코더의 출력은 context_vector
return hidden
import torch.nn as nn
class Decoder_GRU(nn.Module):
def __init__(self, tar_vocab, tar_emb_dim, rnn_dim,
num_layer=1, bi=False):
super(Decoder_GRU, self).__init__()
# <PAD>토큰의 인덱싱을 지정하면 해당 idx(0)은
# word_vector을 만들 때 모두 0으로 채워지게 만들어준다.
self.embed = nn.Embedding(tar_vocab, tar_emb_dim,
padding_idx=0)
self.gru = nn.GRU(input_size=tar_emb_dim,#언어모델 입력차원
hidden_size=rnn_dim, #언어모델 출력차원
num_layers=num_layer, #언어모델 몇층?
bidirectional=bi, #양방향학습 On?
batch_first=True) #왠만하면 True
# 디코더의 출력은 정답(번역문)의 seq_len이 되게 해야함
# 맞춰야 하는 클래스 개수는 정답지의 단어 개수임
if bi : #양방향으로 학습시에는 FC 레이어 입력차원이 두배
self.fc = nn.Linear(rnn_dim*2, tar_vocab)
else:
self.fc = nn.Linear(rnn_dim, tar_vocab)
# 디코더는 인코더의 context_vector을 초기 hidden으로 입력받는다.
def forward(self, x, hidden): # x의 차원 : (BS, tar_seq_len)
emb = self.embed(x) # (BS, tar_seq_len, tar_emb_dim)
# 디코더에 양방향 학습을 적용한다
# rnn_out : (bs, tar_seq_len, hidden_dim * 2)
# hidden : (num_layer * 2, bs, hidden_dim)
rnn_out, hidden = self.gru(emb, hidden)
output = self.fc(rnn_out)
# 최종 출력은 (bs, seq_len, tar_vocab)
return output, hidden
여기는 Seq2Seq의 GRU버전에 사용되는 인코더 - 디코더
import torch
import torch.nn as nn
class Seq2Seq_LSTM(nn.Module):
def __init__(self, encoder, decoder, spec_token, max_len=None):
super(Seq2Seq_LSTM, self).__init__()
self.encode = encoder
self.decode = decoder
# 최대 디코딩 길이 설정
self.max_len = max_len
# 스페셜 토큰을 초기화에 입력하게 변경
self.spec_token = spec_token
def forward(self, src, tar=None, TF_ratio=1):
# 인코더의 출력 = context_vector
context_h, context_c = self.encode(src)
# 스페셜 토큰에서 SOS, EOS, PAD의 정수인코딩값 추출
en_sos = self.spec_token.index('<SOS>')
en_eos = self.spec_token.index('<EOS>')
en_pad = self.spec_token.index('<PAD>')
if tar is not None:
# 배치사이즈, 연산위치 정보 추출(tar 기준으로)
BS, tar_seq_len = tar.size()
device = tar.device
# 디코더의 첫번째 토큰을 <SOS>에 (BS, 1)차원으로 생성
# 이때 정답지(tar)의 맨 앞토큰은 <SOS>로 채워져 있으니 이를 이용한다.
input_token = tar[:, 0].unsqueeze(1)
outputs = [] # 출력 디코드 결과를 저장
# 임의 난수를 (BS)차원으로 생성 후 TF_ratio비율정보를 받아서
# 마스크 플래그로 변환, 이때 TF_ratio는 0~1 사이값
# 1에 가까울수록 대부분의 Flag는 True가 되서 지도학습비율이 올라감
TF_flag = torch.rand(BS, device=device) < TF_ratio
# 토큰 단위로 예측이니 자가 회귀 방식임
h, c = context_h, context_c
# tar seq는 맨 처음 토큰을 <SOS>로 채웟으니 1번부터 시작
for t in range(1, tar_seq_len):
# 토큰단위로 입력이니 출력은 (bs, 1, tar_vocab)이다.
out_tokens, h, c = self.decode(input_token, h, c)
outputs.append(out_tokens)
# 마스크 플래그가 True : 지도학습 방식으로 동작
# 마스크 플래그가 False : 비지도학습-자가 회귀방식으로 동작
input_token = torch.where(TF_flag.unsqueeze(1),
tar[:, t].unsqueeze(1),
out_tokens.argmax(dim=-1))
# 최종 출력 모양 조정
outputs = torch.cat(outputs, dim=1) # (BS, tar_seq_len-1, tar_vocab)
# (BS, 1, tar_vocab) 차원의 sos 토큰 인덱스로 채워진 텐서를 만듬
sos_tokens = torch.full((BS, 1, outputs.size(-1)), en_sos, device=device)
# sos_tokens랑 outputs를 합쳐서 (BS, tar_seq_len, tar_vocab)가 되게 함
outputs = torch.cat([sos_tokens, outputs], dim=1)
return outputs # (BS, tar_seq_len, tar_vocab)
else: # 정답지가 없는 평가모드
# 배치사이즈, 연산위치 정보 추출(src 기준으로)
BS, src_seq_len = src.size()
device = src.device
# 최대 디코딩 길이 지정 안했으면 원문 seq_len을 쓰자
if self.max_len is None:
self.max_len = src_seq_len
# 디코더의 첫번째 토큰을 <SOS>에 (BS, 1)차원으로 생성
input_token = torch.tensor([[en_sos]] * BS).to(device)
outputs = [] # 출력 디코드 결과를 저장
# EOS를 샘플별로 예측하면 그 뒷단을 PAD로 채우는 flag 텐서
pad_mask_flag = torch.zeros(BS, dtype=torch.bool,
device=device)
# 만약에 Batch내 샘플이 <EOS>예측했다면 해당 샘플의 flag가 올라간다.
# 토큰 단위로 예측이니 자가 회귀 방식임
h, c = context_h, context_c
for _ in range(self.max_len):
# 토큰단위로 입력이니 출력은 (bs, 1, tar_vocab)이다.
out_tokens, h, c = self.decode(input_token, h, c)
# 추론 과정이니 next_token은
# Greedy Search-> (bs, 1)결과가 됨
next_token = out_tokens.argmax(dim=-1)
outputs.append(next_token)
# 배치 내 샘플이 <EOS>토큰을 예측한다면 해당샘플의 flag를 True로 올린다.
# 이때 next_token = (bs, 1) -> (bs)로 차원 축소 후 인디케이터 연산해야함
eos_indices = (next_token.squeeze(1) == en_eos)
# OR연산이니까 EOS예측된 마스크는 계속 True로 남는다.
pad_mask_flag = pad_mask_flag | eos_indices
# 모든 샘플에서 EOS를 예측한 경우, 더이상 수행하지 않고 탈출
if pad_mask_flag.all():
break
# 자가 회귀방식이니 다음 입력을 갱신한다.
input_token = torch.where(pad_mask_flag.unsqueeze(1),
torch.full_like(next_token, en_pad),
next_token)
# 예측된 단어들을 모아서 최종 출력으로 반환
# 이때 최종 출력은 # (BS, max_len) 규격을 맞추기 위해
# PAD토큰을 채우는 과정을 수행한다.
while len(outputs) < self.max_len:
outputs.append(torch.full((BS, 1), en_pad,
device=device))
outputs = torch.cat(outputs, dim=1) # (BS, max_len)
return outputs

마지막 훈련 코드의 최종 output는 번역문(tar)의 맨 앞 토큰이 SOS이기에 이를 감안하여
출력 전에 SOS토큰을 모든 후보 시퀀스 데이터에 붙여줘야한다.
import torch
import torch.nn as nn
class Seq2Seq_GRU(nn.Module):
def __init__(self, encoder, decoder, spec_token, max_len=None):
super(Seq2Seq_GRU, self).__init__()
self.encode = encoder
self.decode = decoder
# 최대 디코딩 길이 설정
self.max_len = max_len
# 스페셜 토큰을 초기화에 입력하게 변경
self.spec_token = spec_token
def forward(self, src, tar=None, TF_ratio=1):
# 인코더의 출력 = context_vector
context_h = self.encode(src)
# 스페셜 토큰에서 SOS, EOS, PAD의 정수인코딩값 추출
en_sos = self.spec_token.index('<SOS>')
en_eos = self.spec_token.index('<EOS>')
en_pad = self.spec_token.index('<PAD>')
if tar is not None:
# 배치사이즈, 연산위치 정보 추출(tar 기준으로)
BS, tar_seq_len = tar.size()
device = tar.device
# 디코더의 첫번째 토큰을 <SOS>에 (BS, 1)차원으로 생성
# 이때 정답지(tar)의 맨 앞토큰은 <SOS>로 채워져 있으니 이를 이용한다.
input_token = tar[:, 0].unsqueeze(1)
outputs = [] # 출력 디코드 결과를 저장
# 임의 난수를 (BS)차원으로 생성 후 TF_ratio비율정보를 받아서
# 마스크 플래그로 변환, 이때 TF_ratio는 0~1 사이값
# 1에 가까울수록 대부분의 Flag는 True가 되서 지도학습비율이 올라감
TF_flag = torch.rand(BS, device=device) < TF_ratio
# 토큰 단위로 예측이니 자가 회귀 방식임
h = context_h
# tar seq는 맨 처음 토큰을 <SOS>로 채웟으니 1번부터 시작
for t in range(1, tar_seq_len):
# 토큰단위로 입력이니 출력은 (bs, 1, tar_vocab)이다.
out_tokens, h = self.decode(input_token, h)
outputs.append(out_tokens)
# 마스크 플래그가 True : 지도학습 방식으로 동작
# 마스크 플래그가 False : 비지도학습-자가 회귀방식으로 동작
input_token = torch.where(TF_flag.unsqueeze(1),
tar[:, t].unsqueeze(1),
out_tokens.argmax(dim=-1))
# 최종 출력 모양 조정
outputs = torch.cat(outputs, dim=1) # (BS, tar_seq_len-1, tar_vocab)
# (BS, 1, tar_vocab) 차원의 sos 토큰 인덱스로 채워진 텐서를 만듬
sos_tokens = torch.full((BS, 1, outputs.size(-1)), en_sos, device=device)
# sos_tokens랑 outputs를 합쳐서 (BS, tar_seq_len, tar_vocab)가 되게 함
outputs = torch.cat([sos_tokens, outputs], dim=1)
return outputs # (BS, tar_seq_len, tar_vocab)
else: # 정답지가 없는 평가모드
# 배치사이즈, 연산위치 정보 추출(src 기준으로)
BS, src_seq_len = src.size()
device = src.device
# 최대 디코딩 길이 지정 안했으면 원문 seq_len을 쓰자
if self.max_len is None:
self.max_len = src_seq_len
# 디코더의 첫번째 토큰을 <SOS>에 (BS, 1)차원으로 생성
input_token = torch.tensor([[en_sos]] * BS).to(device)
outputs = [] # 출력 디코드 결과를 저장
# EOS를 샘플별로 예측하면 그 뒷단을 PAD로 채우는 flag 텐서
pad_mask_flag = torch.zeros(BS, dtype=torch.bool,
device=device)
# 만약에 Batch내 샘플이 <EOS>예측했다면 해당 샘플의 flag가 올라간다.
# 토큰 단위로 예측이니 자가 회귀 방식임
h = context_h
for _ in range(self.max_len):
# 토큰단위로 입력이니 출력은 (bs, 1, tar_vocab)이다.
out_tokens, h = self.decode(input_token, h)
# 추론 과정이니 next_token은
# Greedy Search-> (bs, 1)결과가 됨
next_token = out_tokens.argmax(dim=-1)
outputs.append(next_token)
# 배치 내 샘플이 <EOS>토큰을 예측한다면 해당샘플의 flag를 True로 올린다.
# 이때 next_token = (bs, 1) -> (bs)로 차원 축소 후 인디케이터 연산해야함
eos_indices = (next_token.squeeze(1) == en_eos)
# OR연산이니까 EOS예측된 마스크는 계속 True로 남는다.
pad_mask_flag = pad_mask_flag | eos_indices
# 모든 샘플에서 EOS를 예측한 경우, 더이상 수행하지 않고 탈출
if pad_mask_flag.all():
break
# 자가 회귀방식이니 다음 입력을 갱신한다.
input_token = torch.where(pad_mask_flag.unsqueeze(1),
torch.full_like(next_token, en_pad),
next_token)
# 예측된 단어들을 모아서 최종 출력으로 반환
# 이때 최종 출력은 # (BS, max_len) 규격을 맞추기 위해
# PAD토큰을 채우는 과정을 수행한다.
while len(outputs) < self.max_len:
outputs.append(torch.full((BS, 1), en_pad,
device=device))
outputs = torch.cat(outputs, dim=1) # (BS, max_len)
return outputs

GRU버전도 훈련시 출력 시퀀스 맨 앞에 SOS토큰 텐서 붙이는걸 잊지 말자
학습 준비
# 학습 실험 조건을 구분하기 위한 키
model_key = ['LSTM', 'GRU']
metrics_key = ['Loss', '정확도']
# Seq2Seq의 LSTM버전 인스턴스화
encoder_lstm = Encoder_LSTM(src_VS, EMB_DIM, UNIT_DIM,
num_layer=NUM_Layers, bi=BI_DIR)
decoder_lstm = Decoder_LSTM(tar_VS, EMB_DIM, UNIT_DIM,
num_layer=NUM_Layers, bi=BI_DIR)
Translater_lstm = Seq2Seq_LSTM(encoder_lstm, decoder_lstm,
spec_token=spec_token, max_len=tar_SL)
# Seq2Seq의 GRU버전 인스턴스화
encoder_gru = Encoder_GRU(src_VS, EMB_DIM, UNIT_DIM,
num_layer=NUM_Layers, bi=BI_DIR)
decoder_gru = Decoder_GRU(tar_VS, EMB_DIM, UNIT_DIM,
num_layer=NUM_Layers, bi=BI_DIR)
Translater_gru = Seq2Seq_GRU(encoder_gru, decoder_gru,
spec_token=spec_token, max_len=tar_SL)
# GPU사용 가능 유/무 확인
device = torch.device("cuda" if torch.cuda.is_available() else 'cpu')
models = {} # 딕셔너리
models[model_key[0]] = Translater_lstm.to(device)
models[model_key[1]] = Translater_gru.to(device)
import torch.optim as optim
# 로스함수 및 옵티마이저 설계
# 로스함수에서 <PAD> 토큰의 정수인덱스 번호 -> 0번에 대해서는
# 틀리건 맞건 무시하겠다 : ignore_index에 해당 정수 인덱스 번호 기입
ignore_class_idx = spec_token.index('<PAD>')
criterion = nn.CrossEntropyLoss(ignore_index=ignore_class_idx)
LR = 0.001 # 러닝레이트는 통일
optimizers = {}
optimizers[model_key[0]] = optim.Adam(Translater_lstm.parameters(), lr=LR)
optimizers[model_key[1]] = optim.Adam(Translater_gru.parameters(), lr=LR)
여기까지 수행했다면 모델 학습을 위한 옵티마이저, 손실함수 등의 인스턴스화(준비)작업은 마친 것이다.
Techer forcing 스케줄러 설계
Techer forcing의 비율을 고정값 (0.5) 이런게 아니라 epoch별로 서서히 감소하는 형식으로 설계를 수행하려 한다.

대략 위와 같은 그래프로 학습 초기 epoch 에는 1에 가까운 값이 입력되고(Techer forcing가 100%적용)
학습 후기 epoch에서는 Techer forcing비율을 0%에 가깝게 하여
대부분의 학습이 자가회귀 모델 형식으로 동작하게 설계하고자 한다.
이를 위한 클래스 설계 코드는 아래와 같다.
# Techer Forcing 비율을 epoch가 증가할수록 감소하기 위한 클래스
class TeacherForcingScheduler:
def __init__(self, init_TF=1.0, Damping_Factor=0.8, min_TF=0.05):
self.init_TF = init_TF
self.DF = Damping_Factor
self.min_TF = min_TF # 최소 Teacher Forcing 비율
self.current_epoch = 0
def get_ratio(self):
# 감쇄인자를 활용하여 TF_ratio를 감소
TF_ratio = self.init_TF * (self.DF ** self.current_epoch)
return max(self.min_TF, TF_ratio)
def step(self):
# 에폭 수를 증가시켜 비율을 감소시킴
self.current_epoch += 1
def demo(self, num_epoch=40):
# 에폭에 따른 Teacher Forcing 비율 변화를 그래프로 시각화
tf_ratios = []
for epoch in range(num_epoch):
tf_ratios.append(self.get_ratio())
self.step()
plt.figure(figsize=(6, 4))
plt.plot(range(num_epoch), tf_ratios)
plt.xlabel("epoch")
plt.ylabel("TF-ratio")
plt.title("TF Ratio Changes with Increasing Epochs")
plt.show()
위 클래스처럼 설계하면
Optimizer처럼 매 epoch가 올라갈 때마다 간단하게
TeacherForcingScheduler.step() 메서드만 입력하면
자동으로 Techer forcing 비율이 감소한다.
위 코드는 Seq_trainer.py 코드에 삽입하여 업데이트한다.
from Seq_trainer import *
num_epoch = 40 #총 훈련/검증 epoch값
ES = 8 # 디스플레이용 에포크 스텝
# Techer forcing 비율을 스케줄러 형식으로 조정하기 위한 클래스 객체화
TF_ratio = 1.0
TF_schedular = TeacherForcingScheduler(TF_ratio,
Damping_Factor=0.95,
min_TF= 0.4)
추가로 TF_schedular.demo(num_epoch) 메서드를 활용하면 epoch가 증가함에 따라 Techer forcing비율값인 TF_ratio가 어떻게 변하는지 그래프로 확인하는 코드를 삽입했다.

모델 훈련/평가 코드 업데이트
Seq_trainer.py 코드의
model_train함수와 model_evaluate함수도 업데이트가 필요하다
def model_train(model, data_loader, loss_fn, optimizer_fn,
epoch, epoch_step,
ignore_class=None, TF_schedular=None):
# 1개의 epoch내 batch단위(iter)로 연산되는 값이 저장되는 변수들
iter_size, iter_loss, iter_correct = 0, 0, 0
device = next(model.parameters()).device # 모델의 연산위치 확인
model.train() # 모델을 훈련모드로 설정
#특정 epoch_step 단위마다 tqdm 진행바가 생성되게 설정
if (epoch+1) % epoch_step == 0 or epoch == 0:
tqdm_loader = tqdm(data_loader)
else:
tqdm_loader = data_loader
for src_data, tar_data in tqdm_loader:
src_data, tar_data = src_data.to(device), tar_data.to(device)
# 번역문의 PAD토큰은 마스크 처리하자
tar_mask = (tar_data != ignore_class)
# 현재의 Techer forcing ratio 정보를 가져오기
TF_ratio = TF_schedular.get_ratio() if TF_schedular is not None else 1.0
# Forward, 모델이 예측값을 만들게 함
tar_pred = model(src_data, tar_data, TF_ratio=TF_ratio)
# 데이터의 적절한 구조변환 수행
tar_pred, tar_data, tar_mask = data_reshape(tar_pred, tar_data, tar_mask)
# 구조변환을 해야 LossFn에 입력가능한 차원이 됨
loss = loss_fn(tar_pred, tar_data)
#backward 과정 수행
optimizer_fn.zero_grad()
loss.backward()
optimizer_fn.step() # 마지막에 스케줄러 있으면 업뎃코드넣기
# Teacher Forcing 비율 업데이트
if TF_schedular is not None:
TF_schedular.step()
BS = src_data.size(0) # Batch내 샘플(iter)연산용 인자값 추출
# 현재 batch 내 샘플 개수당 correct, loss, 수행 샘플 개수 구하기
iter_correct += cal_correct(tar_pred, tar_data, tar_mask) * BS
iter_loss += loss.item() * BS
iter_size += BS
# tqdm에 현재 진행상태를 출력하기 위한 코드
if (epoch+1) % epoch_step == 0 or epoch == 0:
prograss_loss = iter_loss / iter_size
prograss_acc = iter_correct / iter_size
desc = (f"[훈련중]로스: {prograss_loss:.3f}, "
f"정확도: {prograss_acc:.3f}")
tqdm_loader.set_description(desc)
#현재 epoch에 대한 종합적인 정확도/로스 계산
epoch_acc = iter_correct / iter_size
epoch_loss = iter_loss / len(data_loader.dataset)
return epoch_loss, epoch_acc
model_train함수의 주요 업데이트 사항은

Techer forcing 인자값이 모델의 forward 과정에 포함되는 것이며,
해당 인자값이 TeacherForcingScheduler 클래스로 학습이 진행될수록 서서히 0에 가까운 값이 되게끔 스케줄링하는 코드가 추가된다.
def model_evaluate(model, data_loader, loss_fn,
epoch, epoch_step, ignore_class=None):
# 1개의 epoch내 batch단위(iter)로 연산되는 값이 저장되는 변수들
iter_size, iter_loss, iter_correct = 0, 0, 0
device = next(model.parameters()).device # 모델의 연산위치 확인
model.eval() # 모델을 평가 모드로 설정
#특정 epoch_step 단위마다 tqdm 진행바가 생성되게 설정
if (epoch+1) % epoch_step == 0 or epoch == 0:
tqdm_loader = tqdm(data_loader)
else:
tqdm_loader = data_loader
with torch.no_grad(): #평가모드에서는 그래디언트 계산 중단
for src_data, tar_data in tqdm_loader:
src_data, tar_data = src_data.to(device), tar_data.to(device)
# 번역문의 PAD토큰은 마스크 처리하자
tar_mask = (tar_data != ignore_class)
# Forward, 모델이 예측값 비지도-자기회귀모델 방식으로 만들게 함
tar_pred = model(src_data, tar_data, TF_ratio=0.0)
# 데이터의 적절한 구조변환 수행
tar_pred, tar_data, tar_mask = data_reshape(tar_pred, tar_data, tar_mask)
# 구조변환을 해야 LossFn에 입력가능한 차원이 됨
loss = loss_fn(tar_pred, tar_data)
BS = src_data.size(0) # Batch내 샘플(iter)연산용 인자값 추출
# 현재 batch 내 샘플 개수당 correct, loss, 수행 샘플 개수 구하기
iter_correct += cal_correct(tar_pred, tar_data, tar_mask) * BS
iter_loss += loss.item() * BS
iter_size += BS
#현재 epoch에 대한 종합적인 정확도/로스 계산
epoch_acc = iter_correct / iter_size
epoch_loss = iter_loss / len(data_loader.dataset)
return epoch_loss, epoch_acc
다음으로 model_evaluate의 주요 변경사항은

모델의 forward부분에서 검증하고자 하는 대상은
Seq2Seq모델의 단어 생성 성능을 검증하려 하기에
Techer forcing 인자값을 0.0을 주어서 모든 검증을 비지도-자가회귀모델로 단어생성 능력만을 검증한다.
학습/검증 수행
# 학습/검증 정보 저장
history = {key: {metric: []
for metric in metrics_key}
for key in model_key}
for key in model_key:
print(f"\n--현재 훈련중인 조건: [Seq2Seq_{key}]--") # 조건에 맞는 실험시작
for epoch in range(num_epoch): #에포크별 모델 훈련/검증
# 모델 훈련
train_loss, train_acc = model_train(
models[key], trainloader, criterion,
optimizers[key], epoch, ES,
ignore_class=ignore_class_idx, #무시할 클래스 인덱스
TF_schedular=TF_schedular #Techer forcing비율 조정 함수
)
#모델 검증
val_loss, val_acc = model_evaluate(
models[key], valloader, criterion,
epoch, ES,
ignore_class=ignore_class_idx, #무시할 클래스 인덱스
)
# 손실 및 성과 지표를 history에 저장
history[key]['Loss'].append((train_loss, val_loss))
history[key]['정확도'].append((train_acc, val_acc))
# Epoch_step(ES)일 때마다 print수행
if (epoch+1) % ES == 0 or epoch == 0:
print(f"epoch {epoch+1:03d}," + "\t" +
f"훈련 [Loss: {train_loss:.3f}, " +
f"Acc: {train_acc*100:.2f}%]")
print(f"epoch {epoch+1:03d}," + "\t" +
f"검증 [Loss: {val_loss:.3f}, " +
f"Acc: {val_acc*100:.2f}%]")
print(f"--[Seq2Seq_{key}] 훈련 종료--\n") # 조건에 맞는 실험종료

정확도가 많이 낮아진것을 보니 제대로 학습이 되고 있는 듯 하다
사실 LSTM, GRU버전의 Seq2Seq의 번역품질이 좋지 않아서
Transformer이 개발된 역사를 생각한다면
이정도 정확도가 적합하게 나오는 듯 하다.
학습 결과 분석
import matplotlib.pyplot as plt
# 한글 사용을 위한 폰트 포함
plt.rcParams['font.family'] = 'Malgun Gothic'
plt.rcParams['axes.unicode_minus'] = False
# 학습/검증 결과 데이터를 재배치
res_data = {}
for key in model_key:
res_data[key] = {}
for metric in metrics_key:
# 각 모델의 메트릭 데이터 추출
metric_data = history[key][metric]
# 훈련 및 검증 값 분리
train_values = [tup[0] for tup in metric_data]
val_values = [tup[1] for tup in metric_data]
res_data[key][f'훈련_{metric}'] = train_values
res_data[key][f'검증_{metric}'] = val_values
# 손실 및 정확도 그래프 그리기 그래프 생성
fig, axes = plt.subplots(2, 2, figsize=(12, 12))
axes = axes.flatten() # 2차원 배열을 1차원으로 변환하여 인덱싱 쉽게 함
# 손실 그래프 그리기
for idx, key in enumerate(model_key):
ax = axes[idx*2] #손실 그래프는 0, 2번째에 위치
ax.plot(res_data[key]['훈련_Loss'], label='훈련 로스')
ax.plot(res_data[key]['검증_Loss'], label='검증 로스')
ax.set_title(f'Seq2Seq_{key} 번역 Loss', fontsize=16)
ax.set_xlabel('Epoch')
ax.set_ylabel('Loss')
ax.legend()
# 정확도 그래프 그리기
for idx, key in enumerate(model_key):
ax = axes[idx*2 + 1] #정확도 그래프는 1, 3번째에 위치
ax.plot(res_data[key]['훈련_정확도'], label='훈련 정확도')
ax.plot(res_data[key]['검증_정확도'], label='검증 정확도')
ax.set_title(f'Seq2Seq_{key} 번역 정확도', fontsize=16)
ax.set_xlabel('Epoch')
ax.set_ylabel('정확도')
ax.legend()
plt.tight_layout()
plt.show()

# 모든 모델의 손실과 정확도를 비교하는 그래프 생성
fig, axes = plt.subplots(1, 2, figsize=(16, 8))
# 모든 모델의 손실 그래프
ax = axes[0]
for key in model_key:
ax.plot(res_data[key]['훈련_Loss'], label=f'Seq2Se2_{key} 훈련 로스')
ax.plot(res_data[key]['검증_Loss'], label=f'Seq2Se2_{key} 검증 로스', linestyle='--')
ax.set_title('모든 조건별 모델의 훈련/검증 Loss', fontsize=16)
ax.set_xlabel('Epoch')
ax.set_ylabel('Loss')
ax.legend(fontsize=16) # 범례의 폰트 크기 설정
# 모든 모델의 정확도 그래프
ax = axes[1]
for key in model_key:
ax.plot(res_data[key]['훈련_정확도'], label=f'Seq2Se2_{key} 훈련 정확도')
ax.plot(res_data[key]['검증_정확도'], label=f'Seq2Se2_{key} 검증 정확도', linestyle='--')
ax.set_title('모든 조건별 모델의 훈련/검증 정확도', fontsize=16)
ax.set_xlabel('Epoch')
ax.set_ylabel('정확도')
ax.legend(fontsize=16) # 범례의 폰트 크기 설정
plt.tight_layout()
plt.show()

이거는 한번 추론까지 해봐서 생성 task인 기계번역이 얼마나 잘 되는지도 검증을 해봐야 겠다.
모델 저장/불러오기
# 학습된 모델 저장
path = {} #모델별 경로명 저장
for mk in model_key:
path[mk] = f'Seq2Seq_{mk}.pth'
torch.save(models[mk].state_dict(), path[mk])
# 저장된 모델 불러오기
load_model = {
'LSTM': Translater_lstm, # LSTM 모델 인스턴스 생성
'GRU': Translater_gru # GRU 모델 인스턴스 생성
}
for mk in model_key:
load_model[mk].load_state_dict(torch.load(path[mk], weights_only=True))
#추론기는 CPU에서 돌리자
load_model[mk] = load_model[mk].to('cpu')
테스트 샘플 추출
import random
# 테스트 데이터셋에서 샘플을 추출
# 전체 테스트 데이터 개수정보를 추출
num_test = padded_src_test.shape[0]
sample_epoch = 10 #추출할 샘플 개수 정의
indices = random.sample(range(num_test), sample_epoch)
# 추출한 샘플번호를 바탕으로 Test 데이터셋에서 무작위 추출
S_src_test = padded_src_test[indices]
S_tar_test = padded_tar_test[indices]
# 원문 데이터만 텐서 자료형으로 변환
TS_src_test = torch.tensor(S_src_test, dtype=torch.long)
여기까지 수행했으면 평가 데이터셋에서 임의의 샘플 10개를 추출한 추론 직전 단계이다.
추론 함수 업데이트
# 모델 추론용 함수
def model_inference(model, src_data):
device = next(model.parameters()).device # 모델의 연산위치 확인
model.eval() # 모델을 평가 모드로 설정
with torch.no_grad(): #평가모드에서는 그래디언트 계산 중단
src_data = src_data.to(device)
# 추론에서 입력되는 데이터 구조는 (1, src_seq_len)이다.
# 추론 과정이기에 정답지(tar_data)는 입력하지 않는다.
tar_infer = model(src_data)
# 추론결과는 (BS=1, max_len) -> numpy자료형 변환
nd_tar_infer = tar_infer.cpu().numpy()
return nd_tar_infer[0] #BS 차원을 날림
업데이트 사항은
tar_infer = model(src_data)
이 부분으로 spec_token을 클래스 생성자로 이전했기에
더이상 forward과정에서 spec_token을 참조하지 않는다.
추론
# 추론 결과를 저장할 딕셔너리
tar_dict = {key: [] for key in model_key}
for key in model_key:
print(f"\n--현재 추론 조건: [Seq2Seq_{key}]--") # 조건에 맞는 실험시작
for idx in tqdm(range(sample_epoch)): #추론 에포크별 추론 시작
# 입력되는 원문 차원을 (1, src_seq_len)으로 만들기 위한 코드
iter_src_data = TS_src_test[idx].unsqueeze(0)
# 모델 추론 -> 추론결과는 (bs, max_len) ndarray타입임
tar_infer_doc = model_inference(load_model[key], iter_src_data)
tar_dict[key].append(tar_infer_doc)

추론은 문제없이 수행되는것을 확인할 수 있다.
추론 후처리
def Translater_post_processor(encode_doc, idx_to_word, spec_token):
decode_doc = ['<SOS>'] #코드 안정성을 위해 맨앞에 토큰 추가
for en_word in encode_doc:
#인코딩된 단어 -> 디코드화
#디코딩한 단어가 없는 경우 스페셜토큰의 <UNK>로 변경
de_word = idx_to_word.get(en_word, spec_token[1])
# 디코딩한 단어가 subword인 경우
if de_word.startswith('##'):
decode_doc[-1] += de_word[2:]
# 디코딩한 결과가 스페셜 토큰이 아닌것만 문장에 추가
elif de_word not in spec_token:
decode_doc.append(de_word)
decode_doc.pop(0) #리스트의 맨 앞 원소 삭제
return ' '.join(decode_doc)
이 함수는 전처리 함수 모음집인 NLP_pp.py 모듈에 추가하여 업데이트하고자 한다.
idx_list = range(1, sample_epoch+1)
for idx, src, pred_lstm, pred_gru, tar in zip(idx_list, S_src_test,
tar_dict['LSTM'],
tar_dict['GRU'],
S_tar_test):
# 원문, 모델번역문_1, 모델번역문_2, 정답번역문 순으로 디코딩
decode_src = Translater_post_processor(src, idx_to_src, spec_token)
de_pred_lstm = Translater_post_processor(pred_lstm, idx_to_tar, spec_token)
de_pred_gru = Translater_post_processor(pred_gru, idx_to_tar, spec_token)
decode_tar = Translater_post_processor(tar, idx_to_tar, spec_token)
print(f"{idx}번째 번역 결과 확인")
print(f"원문(src) : {decode_src}")
print(f"seqLSTM번역 : {de_pred_lstm}")
print(f"seq-GRU번역 : {de_pred_gru}")
print(f"번역문(tar) : {decode_tar}")
print("==============================\n")
이제 후처리 결과물을 확인하고자 한다.

번역 품질은 여전히 형편없기는 하지만
이전포스트 NLP- Seq to Seq (4) : 번역보다는 괄목할만한 성장이라 볼 수 있겠다.
다음 포스트는 BLEU Score와 Beam sarch로 번역 품질을 좀 더 향상시킬 수 있는 방안에 대해 탐구하고자 한다.
https://github.com/tbvjvsladla/Seq2Seq
이번 포스트의 실습 내용은

위 깃허브 저장소에 업로드하였습니다.
NLP_pp.py, Seq_trainer.py파일은 이전포스트에서 일부 함수가 업데이트 되었기에
이전포스트의 실습을 수행할 시 버전을 잘 확인 부탁드립니다.
감사합니다.