NLP- Seq to Seq (4) : 번역

안상훈·2024년 11월 4일

AI핵심기술

목록 보기
16/21
post-thumbnail

개요

본 블로그 포스팅은 수도권 ICT 이노베이션 스퀘어에서 진행하는 AI 핵심 기술 집중 클래스의 자연어처리(NLP) 강좌 내용을 필자가 다시 복기한 내용에 관한 것입니다.


1. seq2seq

Seq to Seq Learning with NN 줄여서 seq2seq는 자연어 처리 분야 중 '기계번역'부분에서 획기적인 변화를 가져온 논문으로 아래의 그림처럼 2개의 언어모델을 인코더 - 디코더로 연결하는 구조를 제안했다.

위 사진으로 표현한 seq2seq 구조로 알 수 있듯이
seq2seq인코더언어모델, 디코더언어모델 두개를 엮어 모델을 설계했으며

해당 모델을 학습시키는 방법은 지도학습
원문과 정답지(원문에 대한 올바른 번역문)은 각각
문서의 접두, 접미에 특별한 토큰 : SOS, EOS을 삽입해 문서의 시작과 끝을 표기한다.

seq2seq를 구성하는 언어모델 중 인코더RNN(LSTM/GRU)계열의 언어모델을 사용하며
입력된 원문에 대한 압축된 정보 Context Vector를 생성한다.

Context Vector디코더 언어모델의 초기 hidden_state로 활용해 정답 시퀀스(번역문)을 생성할 수 있도록 학습을 진행한다.

seq2seq 논문이 제안되면서 기존의 NLP-기계번역 분야에서 가져온 개선사항을 요약하면 아래와 같다.

💎 기존의 기계번역 모델은 통계적 기계 번역(SMT)를 베이스로 SMT를 개선하기 위한 여러 규칙 및 통계적 방법론을 추가하는 식이었으나, seq2seq 논문을 통해 본격적으로 기계번역에 언어 모델을 도입하기 시작함

인코더 - 디코더구조를 통해 번역 대상 원문을 문맥 정보가 포함된 Context Vector로 압축 \rightarrow 이를 디코더가 학습하여, 번역된 문장이 문맥을 고려, 더 올바른 번역문을 생성함

가변길이 시퀀스 처리 : 논문에서는 고정 길이 문장을 번역하는 SMT 모델에 대한 비판을 수행했으나, SMT 모델의 개념까지 설명하기에는 좀 길어서 쉽게 NER 모델을 바탕으로 seq2seq의 장점을 설명하고자 한다.

위 사진처럼 NER을 수행하는 Bi-LSTM/GRU모델로도 기계번역을 수행할 수는 있다.

단어장에 각 단어별로 원문 \Leftrightarrow 번역문의 조합이 가능하니
이를 word(token) \Leftrightarrow tag 관계로 설정하여 NER tagging을 수행하는 언어모델을 그대로 적용해 번역 task를 수행이 가능하다

그러나, 위 사진에서 알 수 있듯이 번역 품질이 좋지 못한데

첫번째로, 입력 원문의 토큰 개수랑 고정되는 번역문 시퀀스가 출력되며,
다음으로, 한글\leftrightarrow영어 번역처럼

한글 : 주어 - 목적어 - 동사 (SOV) 구조
영어 : 주어 - 동사 - 목적어 (SVO) 구조

일 때는 번역문의 구조가 깨지는 문제가 발생한다.

즉, 서순이 망가진다

서순은 역시 중요하다.
따라서, seq2seq는 인코더, 디코더의 시퀀스 길이가 가변되니 이에 따라 Recurrent(순환) 횟수를 달리 해도 모델이 정상적으로 동작하고, Context Vector(문맥정보)를 바탕으로 번역문을 생성하기에 대체로 서순을 준수한 번역문을 생성한다.

💎 Beam searchBLEU Score 적용을 통한 더 나은 번역문의 생성 : Seq to Seq Learning with NN 논문에서 제안하는 더 올바른 번역문을 만들기 위한 추가 기법 중 하나로
Beam search는 후보 시퀀스를 고려하여 더 올바른 번역문을 출력하는 과정
BLEU Score는 출력한 번역문이 올바르게 번역되었는지에 대한 평가지표로써, 기존 평가지표인 WER(Word Error Rate), F-Score 대비 더 번역품질을 평가하는데 적합하며, 인간평가(Human Evaluation) 대비 번역품질평가를 주관적인 정성평가 영억에서 객관적이고 정량 평가 영역으로 가져온 새로운 패러다임을 제시했다 볼 수 있다.

Beam search, BLEU Score의 적용 방법은 향후 포스팅을 진행하며, 이번 포스트에서는 seq2seqpytorch로 설계하고 기계번역을 실습하는 것에 집중하려 한다.



2. seq2seq 모델 설계

seq2seq의 모델은 그렇게 어려운 편은 아니나, Techer forcing(교사강요), Beam search, BLEU, 가변 seq 처리 방법, Attention 등 여러가지 성능 향상 기법이 함께 사용되기에
vainilla seq2seq모델을 설계한 뒤 하나하나 기술을 적용해가면서 성능향상을 꾀하고자 한다.


인코더, 디코더 설계

인코더는 언어모델 : Many-to-One 방식으로
디코더는 언어모델 : Many-to-Many 방식으로 동작하면 되며, 이때 디코더의 초기 hidden_state : h0h_0 이 부분만 인코더Context Vector를 받아오면 된다.

RNN계열의 언어모델 소개는 그간 충분히 진행했으니 이해하는데 어려운 편은 아닐것이라 생각한다.

참고로 인코더의 입력 인자값 중 src_emb_dim
디코더tar_emb_dim와 다른 값을 가져도 상관은 없으나

인코더 디코더bi, num_layer은 같은값으로 해야한다.
안그러면 Context Vector의 차원 맞추는게 상당히 까다로워진다.


Seq2Seq 클래스 설계

seq2seq는 정답지(tar)이 존재하는 훈련모드와
정답지가 없는 평가(추론)모드에 따라 모델의 구동방식이 판이하게 달라진다.

대략 그림으로 도식화 한다면 위와 같아지는데
훈련 모드에서는 정답지(tar)문서와
인코더에서 원문(src)문서를 압축한 Context Vector를 같이 입력으로 받은 뒤 이를 디코더가 예측한 번역문(ypredy_{pred})를 출력하는 형식이다.

위 과정으로 계속 학습을 진행한 뒤
정답지(tar)문서가 없는 평가/추론 모드로 진입할 시에는

맨 처음 스페셜 토큰 SOSContext Vector를 입력하여
디코더 모델이 자가 회귀(Autoregressive Generation) 방식으로 계속해서 다음에 올 단어를 예측하다
이제 번역을 종료해도 될거 같으면 스페셜 토큰 EOS를 뱉어내는 식이다.
그 이후부터는 설정한 생성 seq 최대길이 max_len 값에 맞춰서 PAD토큰을 붙여나간다.

여기서 '평가/추론'과정에 속하는 코드의 경우 작성자들 마다 스타일이 판이하게 다르기에 해당부분은 도식과 함께 설명을 진행하도록 하겠다.

먼저 추론과정의 초기화 부분은 위 사진에서 표현한 것처럼
src에서 주요 정보 추출, 디코더의 첫 입력은 SOS로 채워진 (BS,1)차원 데이터가 입력 등의 설정을 진행한다.

다음으로 아래의 반복문 동작이 조금 이해하기가 어려울 듯 한데 이 반복문 동작 구문은 gif로 동작원리를 같이 첨부하니 코드리뷰에 도움이 되길 바란다

seq2seq의 가변 seq를 생성하는 코드는 정말 사람마다 코드짜는 스타일이 다 다르고 천차만별이다.

필자는 flag변수를 사용하는게 친숙해서 이를 활용한 것이지만 각자의 스타일에 맞춰서 seq2seq 클래스 설계나 동작원리를 코드화 하는것은 연습할 필요성이 있다...



3. seq2seq 실습 - 전처리

다음은 seq2seq모델을 활용한 인공 신경망 기계번역 (Neural Machine Translation; NMT) 실습을 수행하고자 한다.

실습 데이터셋으로는 https://www.aihub.or.kr/ AIHub에서 제공하는
방송컨텐츠 한국어-유럽어 번역 말뭉치 데이터셋을 활용하고자 한다.

유럽에서 사용하는 언어 중 독일어, 프랑스어, 이탈리아어에 대하여
한국어 \leftrightarrow 독일어,프랑스어,이탈리아어 번역기를 설계하는데 학습데이터로 활용이 가능하다.

데이터 파싱하기

파일구조가 *.json이고, 여러개의 *.json파일을 열람해서 각 파일당 필요 정보를 추출해야 하니 아래의 코드로
정보추출 \rightarrow Dataframe 타입으로 변환한다.

import os

path_1 = './data/TL_다국어-한국어_deko_기타'
path_2 = './data/TL_다국어-한국어_deko_다큐교양'
path_3 = './data/TL_다국어-한국어_deko_영화드라마'

file_1_list = os.listdir(path_1)
file_2_list = os.listdir(path_2)
file_3_list = os.listdir(path_3)

# 전체 파일 경로 리스트 생성
file_1_paths = [os.path.join(path_1, file) for file in file_1_list]
file_2_paths = [os.path.join(path_2, file) for file in file_2_list]
file_3_paths = [os.path.join(path_3, file) for file in file_3_list]

# 독일어 -> 한국어 데이터의 총 파일경로는 약 12만개
file_paths = file_1_paths + file_2_paths + file_3_paths
import json

parsered_data = [] # 파싱 데이터를 저장할 변수

for file_path in file_paths:
    #json파일 불러오기
    with open(file_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

        # 문서 정보 중 필요항목을 딕셔너리에 담고 리스트에 추가
        raw_dict = {
            "문서ID" : data['ID'],
            "번역" : "독->한",
            "원문(src)" : data['원문'],
            "번역문(tar)" : data['최종번역문']
        }
        parsered_data.append(raw_dict)
import pandas as pd

# 파싱이 완료된 데이터를 Dataframe 형식으로 변환
raw_data = pd.DataFrame(parsered_data)

display(raw_data)


데이터 전처리

데이터 전처리의 수행은 결측치&중복치 제거 및 간단한 데이터 클리닝 작업만 수행한다.

# 데이터셋읜 결측치 & 중복치 제거 함수 실행
raw_data = df_cleaning(raw_data, '원문(src)')
raw_data = df_cleaning(raw_data, '번역문(tar)')
import re

def regex_sub_DE(sent):
    # 입력 텍스트를 소문자로 변환
    sent = sent.lower()
    # 단어와 구두점 사이에 공백을 추가
    sent = re.sub(r"([?.!,¿])", r" \1", sent)
    # 다수 개의 공백을 하나의 공백으로 치환
    sent = re.sub(r"\s+", " ", sent)
    return sent.strip()

def regex_sub_KR(sent):
    # 단어와 구두점 사이에 공백을 추가
    sent = re.sub(r"([?.!,¿])", r" \1", sent)
    # 단순하게 텍스트에서 다수 개의 공백을 하나의 공백으로 치환
    sent = re.sub(r"\s+", " ", sent)
    return sent.strip()
# 독일어, 한국어에 대한 데이터 클리닝 수행
raw_data['원문(src)'] = raw_data['원문(src)'].apply(regex_sub_DE)
raw_data['번역문(tar)'] = raw_data['번역문(tar)'].apply(regex_sub_KR)

데이터 클리닝을 수행한 총 데이터 개수는 약 12만개임을 기억하자

# 인코더의 입력 데이터인 src 데이터
raw_src_data = raw_data['원문(src)'].values.tolist()
# 디코더의 정답지 데이터인 tar 데이터
raw_tar_data = raw_data['번역문(tar)'].values.tolist()

텍스트 전처리 - 토크나이저

이번 실습에서는 다국어에 대한 단어 분절(토크나이징)을 수행해야 하니 기존 프스트에서 자주 사용한 nltkmecab 같은 기계학습 기반의 특정 언어에만 특화된 토크나이저는 더이상 사용하기에는 어려움이 있다.

따라서 이제부터는 본격적으로 AI모델 기반 토크나이저를 사용하고자 한다.

사용할 딥러닝 기반 토크나이저는 https://huggingface.co/ Hugging Face라는 플랫폼 서비스를 활용하고자 하며,
Hugging Face는 AI가 적용가능한 다양한 작업을 수행하는데 특화된 여러 Pre-trained AI model을 호스팅 서비스하고 있는 오픈소스 플랫폼이다.

해당 플랫폼에는 Transformer(트랜스포머) 계열의 모델이 주로 배포되고 있으며, 여러 연구자들이 본인들이 각각의 작업에 사용하고자 설계한 사전학습한 AI 모델을 업로드, 다른 개발자들이 해당 모델을 다운로드 받는 것이 가능하다.
적용 가능한 작업 분야는 자연어 전처리(NLP) - 토크나이저 뿐만 아니라 AI번역, 텍스트 생성, 감정분석등의 다양한 작업에 활용가능한 모델들도 꽤나 많이 업로드 되어 있다.

이때 Hugging Face는 사전학습 모델의 사용을 용이하게 하기 위해 Inference API, Transfoms라이브러리 등의 도구를 함께 제공해 꽤 손쉽게 사용할 수 있는 장점이 있는 플랫폼이다.

https://huggingface.co/transformers/v3.0.2/index.html 먼저 사용방법에 대해 알아보자면

NLP분야에 대해서 사용가능한 Transformer(트랜스포머) 계열의 아키텍쳐는 Bert, GPT-2, T5 등을 사용할 수 있으며, 각 아키텍쳐별 사용작업에 적합한 Pre-trained AI model을 다운로드 받아 작업(토크나이징)을 수행하면 된다.

필자는 Bert아키텍처에 적용가능한 사전학습 모델을 다운로드받아 이를 활용하고자 하며

https://huggingface.co/models

위 사진처럼 Models 페이지에 접속하여 아키텍쳐명(Bert), NLP작업 대상 언어(German, Kor)을 입력하면 적용하고자 하는 작업에 활용된 적이 있는
사전학습 모델 리스트가 추천순으로 정렬된다.

필자는 독일어 : dbmdz/bert-base-german-uncased
한국어 : kykim/bert-kor-base 모델을 사용하고자 한다.

사용방법에 대한 코드를 살펴본다면

from_pretrained : 사전학습 모델을 다운로드 받는 코드
tokenize : 다운로드한 모델이 지원하는 여러 기능 중 토크나이징 작업 수행
convert_tokens_to_ids 토큰화된 결과값을 정수인코딩까지 수행해주는 메서드

이렇게 3가지 쓰기 쉽게 메서드가 정리되어 있다.

필자는 단어장을 따로 제작할 예정이기에 convert_tokens_to_ids를 제외한 나머지 메서드만 사용하고자 한다.

이때 필자가 NLP전처리 작업에 사용하는 모듈화 라이브러리
NLP_pp.py도 토크나이저 아키텍쳐가 여러 종류가 사용되게 변경되었으니
코드 업데이트를 수행한다.

from transformers import BertTokenizer

# 독일어 토크나이저 모델: dbmdz/bert-base-german-uncased
german_tokenizer = BertTokenizer.from_pretrained("dbmdz/bert-base-german-uncased")
# 한국어 토크나이저 모델: kykim/bert-kor-base
korean_tokenizer = BertTokenizer.from_pretrained("kykim/bert-kor-base")
# 원문 토큰화 수행
tokenized_src_data = tokenize(raw_src_data, german_tokenizer, arch='Bert')
# 번역문 토큰화 수행
tokenized_tar_data = tokenize(raw_tar_data, korean_tokenizer, arch='Bert')

토큰화 수행 결과를 보면 Bert 아키텍쳐 계열은 서브워드 단위로 토큰화를 수행하기에 이에 서브워드 인디케이터인 ## 같은 특수문자가 붙어서 나온다
아키텍쳐벌로 토큰화 알고리즘이 다르다 생각하면 되고 해당 인디케이터는 후처리로 삭제하면 안되는 것이니 유념하자


텍스트 전처리 - 단어장 생성

단어장은
src(원문)용 단어장 \rightarrow 희소단어 삭제 \rightarrow spec token 추가
tar(번역문)용 단어장 \rightarrow 희소단어 삭제 \rightarrow spec token 추가

각각 따로 수행하며, 여기서 spec token은 동일한 토큰을 적용한다.

먼저 단어장을 생성하기 전 훈련/검증/평가 데이터셋으로 분리를 수행하는데 각각 80%/15%/5% 로 수행한다

from sklearn.model_selection import train_test_split

# 훈련/검증/평가를 80%, 15%, 5%로 분할을 수행
# random_state -> 데이터셋을 내누는데 '재현성' 유지를 위해 넣음 -> 안넣어도 됨
# stratify -> y 클래스 비율을 알기 어렵기에 해당 항목은 없앰
src_train, src_etc, tar_train, tar_etc = train_test_split(
    tokenized_src_data, tokenized_tar_data, test_size=0.20
)

# 그 외 데이터셋을 반반으로 Val, Test로 나눔
src_val, src_test, tar_val, tar_test = train_test_split(
    src_etc, tar_etc, test_size=0.25
)

이제 두개의 단어장 src_word_list, tar_word_list을 생성하는데 각각 훈련/검증 데이터만을 활용해 단어장 생성을 수행하자.

from collections import Counter

src_word_list = []
tar_word_list = []
# train항목을 워드 리스트에 입력
for src_sent, tar_sent in zip(src_train, tar_train):
    for word in src_sent:
        src_word_list.append(word)
    for word in tar_sent:
        tar_word_list.append(word)
# val항목을 워드 리스트에 입력
for src_sent, tar_sent in zip(src_train, tar_train):
    for word in src_sent:
        src_word_list.append(word)
    for word in tar_sent:
        tar_word_list.append(word)

# 단어와 해당 단어의 출몰 빈도를 함께 저장하는
# Counter 타입의 변수 생성
src_word_counts = Counter(src_word_list)
tar_word_counts = Counter(tar_word_list)

다음으로 희소단어 분석 및 배제 작업을 수행하자

rare_th = 3 #희소단어의 등장 빈도를 결정하는 파라미터
# 희소단어 등장 빈도를 바탕으로 희소 단어를 배제하기 위해 준비 함수
print(f'---원문(src)에 대한 희소단어 분석---')
src_tot_vocab_cnt, src_rare_vocab_cnt = set_rare_vocab(src_word_counts, rare_th, report=True)
print(f'\n---번역문(tar)에 대한 희소단어 분석---')
tar_tot_vocab_cnt, tar_rare_vocab_cnt = set_rare_vocab(tar_word_counts, rare_th, report=True)

#등장 빈도가 높은 단어 순으로 정렬하기
src_vocab = sorted(src_word_counts, key=src_word_counts.get, reverse=True)
tar_vocab = sorted(tar_word_counts, key=tar_word_counts.get, reverse=True)

# 원문(src)에 대한 희소단어 배제 & 정렬 작업 수행
src_vocab_size = src_tot_vocab_cnt - src_rare_vocab_cnt
src_vocab = src_vocab[:src_vocab_size]
# 번역문(tar에 대한 희소단어 배제 & 정렬 작업 수행
tar_vocab_size = tar_tot_vocab_cnt - tar_rare_vocab_cnt
tar_vocab = tar_vocab[:tar_vocab_size]

이제 희소단어 분석을 완료했으니 거울쌍 딕셔너리 단어장을 생성하도록 하자

# 스페셜 토큰 선언
spec_token = ['<PAD>', '<UNK>', '<SOS>', '<EOS>']
# 스페셜 토큰을 포함한 {단어:단어idx}의 딕셔너리 생성
print(f'---원문(src)에 대한 단어장 분석---')
src_to_idx, idx_to_src = set_word_to_idx(spec_token, src_vocab, 
                                         report=True)
print(f'\n---번역문(tar)에 대한 단어장 분석---')
tar_to_idx, idx_to_tar = set_word_to_idx(spec_token, tar_vocab, 
                                         report=True)


텍스트 전처리 - 정수인코딩

# 원문(src) 데이터셋의 정수 인코딩 수행
e_src_train = text_to_sequences(src_train, src_to_idx)
e_src_val = text_to_sequences(src_val, src_to_idx)
e_src_test = text_to_sequences(src_test, src_to_idx)

# 번역문(tar) 데이터셋의 정수 인코딩 수행
e_tar_train = text_to_sequences(tar_train, tar_to_idx)
e_tar_val = text_to_sequences(tar_val, tar_to_idx)
e_tar_test = text_to_sequences(tar_test, tar_to_idx)

이제 번역문(tar)에 스페셜 토큰인 SOSEOS를 접두/접미에 붙이는 작업을 수행하자.

def head_tail_insert(e_docs, spec_token):
    en_sos = spec_token.index('<SOS>')
    en_eos = spec_token.index('<EOS>')

    res_e_doc = []
    for e_doc in e_docs:
        e_doc.insert(0, en_sos) #맨앞에 sos토큰추가
        e_doc.append(en_eos) #맨 뒤에 eos토큰 추가

        res_e_doc.append(e_doc)

    return res_e_doc
# 번역문(tar)의 접두/접미에 SOS, EOS 토큰 추가
e_tar_train = head_tail_insert(e_tar_train, spec_token)
e_tar_val = head_tail_insert(e_tar_val, spec_token)
e_tar_test = head_tail_insert(e_tar_test, spec_token)

이렇게 해야 디코딩을 수행할 때 문장의 시작/끝을 제대로 감지하는 것이 가능하다.


텍스트 전처리 - 문장 패딩

참고로 문장 패딩의 경우 목적이 원할한 번역이기에
sre_seq_len, tar_seq_len은 각각 데이터셋 내 모든 문서의 가장 최대값으로 설정하거나 그것보다 더 큰 값을 설정하는게 적합하다.

현재 원문 데이터셋은 가장 길이가 긴 문서가 57
번역문 데이터셋의 가장 길이가 긴 문서는 55이니

각각 60, 55를 적용한다.

# 원문(src)의 문장 패딩(정수인코딩 완료)
padded_src_train = pad_seq_x(e_src_train, src_seq_len)
padded_src_val = pad_seq_x(e_src_val, src_seq_len)
padded_src_test = pad_seq_x(e_src_test, src_seq_len)

# 번역문(tar)의 문장 패딩(정수인코딩 완료)
padded_tar_train = pad_seq_x(e_tar_train, tar_seq_len)
padded_tar_val = pad_seq_x(e_tar_val, tar_seq_len)
padded_tar_test = pad_seq_x(e_tar_test, tar_seq_len)


텍스트 전처리 - 데이터로더 변환

import torch
bs = 256 # Batch_size 하이퍼 파라미터

# 정수(원핫)인코딩 데이터를 데이터로더로 변환
trainloader = set_dataloader(padded_src_train, padded_tar_train, bs, 
                             content='훈련', report=True)
valloader = set_dataloader(padded_src_val, padded_tar_val, bs,
                           content='검증', report=True)
testloader = set_dataloader(padded_src_test, padded_tar_test, bs, 
                            content='평가', report=True)

여기까지 수행했다면 이제 seq2seq번역을 수행하기 위한 전처리는 모두 마친게 된다.

이제 모델 학습 후 번역이 어느정도 잘 되는지 확인해보자



4. seq2seq 실습 - 학습/검증

이제 본격적으로 Seq2Seq 모델을 학습시켜보자

주요 하이퍼 파라미터 정의

# 주요 하이퍼 파라미터 정리
src_VS = len(src_to_idx) # 원문 단어장 개수
tar_VS = len(tar_to_idx) # 번역문 단어장 개수
src_SL = src_seq_len # 원문의 문장 길이
# 번역문의 문장 길이를 디코더(생성)문장 길이로 쓰자
tar_SL = tar_seq_len # 번역문 문장 길이

EMB_DIM = 256 # 인코더/디코더의 임베딩 레이어 차원
# unit_dim은 인코더-디코더 사이의 히든레이어처럼 생각하는게 편함
UNIT_DIM = 256 # 인코더와 디코더의 rnn_out 차원값

NUM_Layers = 2 # 인코더/디코더의 셀은 2층으로 만들자
BI_DIR = False # 단방향으로만 학습
from tabulate import tabulate

# 출력할 데이터를 리스트 형식으로 준비
data = [
    ["원문 단어장 개수", f"{src_VS}개"],
    ["번역문 단어장 개수", f"{tar_VS}개"],
    ["원문 문장 길이", f"{src_SL}토큰"],
    ["번역문 문장 길이", f"{tar_SL}토큰"],
    ["원문/번역문 임베딩 차원", EMB_DIM],
    ["인코더-디코더 연결 차원", UNIT_DIM],
    ["셀 층 개수", f"{NUM_Layers}층"],
    ["양방향/단방향", "단방향 학습"],
]

# 표 형식으로 출력
print(tabulate(data, headers=["하이퍼 파라미터", "값"], 
               tablefmt="grid"))

항상 잊지말고 주요 하이퍼 파라미터는 나중에 따로 정리를 해두는 편이 좋다.

이래야 나중에 여러사람이랑 협업을 하거나 Wandb와 같은 웹 사이트에서 모델 훈련 결과를 확인하는 API와 연동할 때 여러모로 안 헤멘다.


모델 설계

Seq2Seq 모델을 설계하면 항상 LSTM을 백본 셀로 활용하는 예제가 많아서
비교군으로 GRU가 백본 셀인 모델도 같이 만들어봤다.

import torch.nn as nn

class Encoder_LSTM(nn.Module):
    def __init__(self, src_vocab, src_emb_dim, rnn_dim, 
                 num_layer=1, bi=False):
        super(Encoder_LSTM, self).__init__()
        # <PAD>토큰의 인덱싱을 지정하면 해당 idx(0)은
        # word_vector을 만들 때 모두 0으로 채워지게 만들어준다.
        self.embed = nn.Embedding(src_vocab, src_emb_dim,
                                  padding_idx=0)
        self.lstm = nn.LSTM(input_size=src_emb_dim,#언어모델 입력차원
                            hidden_size=rnn_dim,   #언어모델 출력차원
                            num_layers=num_layer,  #언어모델 몇층?
                            bidirectional=bi,      #양방향학습 On?
                            batch_first=True)      #왠만하면 True
    
    def forward(self, x): # x의 차원 : (BS, src_seq_len)
        emb = self.embed(x) # (BS, src_seq_len, src_emb_dim)
        # 인코더에 양방향 학습을 적용한다
        # rnn_out : (bs, src_seq_len, hidden_dim * 2)
        # hidden : (num_layer * 2, bs, hidden_dim)
        rnn_out , (hidden, cell) = self.lstm(emb)
        #인코더의 출력은 context_vector
        return hidden, cell 
import torch.nn as nn

class Decoder_LSTM(nn.Module):
    def __init__(self, tar_vocab, tar_emb_dim, rnn_dim,
                 num_layer=1, bi=False):
        super(Decoder_LSTM, self).__init__()
        # <PAD>토큰의 인덱싱을 지정하면 해당 idx(0)은
        # word_vector을 만들 때 모두 0으로 채워지게 만들어준다.
        self.embed = nn.Embedding(tar_vocab, tar_emb_dim,
                                  padding_idx=0)
        self.lstm = nn.LSTM(input_size=tar_emb_dim,#언어모델 입력차원
                            hidden_size=rnn_dim,   #언어모델 출력차원
                            num_layers=num_layer,  #언어모델 몇층?
                            bidirectional=bi,      #양방향학습 On?
                            batch_first=True)      #왠만하면 True
        # 디코더의 출력은 정답(번역문)의 seq_len이 되게 해야함
        # 맞춰야 하는 클래스 개수는 정답지의 단어 개수임
        if bi : #양방향으로 학습시에는 FC 레이어 입력차원이 두배
            self.fc = nn.Linear(rnn_dim*2, tar_vocab)
        else:
            self.fc = nn.Linear(rnn_dim, tar_vocab)
    
    # 디코더는 인코더의 context_vector을 초기 hidden으로 입력받는다.
    def forward(self, x, hidden, cell): # x의 차원 : (BS, tar_seq_len)
        emb = self.embed(x) # (BS, tar_seq_len, tar_emb_dim)
        # 디코더에 양방향 학습을 적용한다
        # rnn_out : (bs, tar_seq_len, hidden_dim * 2)
        # hidden : (num_layer * 2, bs, hidden_dim)
        rnn_out, (hidden, cell) = self.lstm(emb, (hidden,cell))
        
        output = self.fc(rnn_out)
        # 최종 출력은 (bs, seq_len, tar_vocab)
        return output, hidden, cell
import torch
import torch.nn as nn

class Seq2Seq_LSTM(nn.Module):
    def __init__(self, encoder, decoder, max_len=None):
        super(Seq2Seq_LSTM, self).__init__()
        self.encode = encoder
        self.decode = decoder
        # 최대 디코딩 길이 설정
        self.max_len = max_len

    def forward(self, src, tar=None, spec_token=None):
        # 인코더의 출력 = context_vector
        context_h, context_c = self.encode(src)

        if tar is not None:
            # 훈련시의 디코더 출력 : 고정길이로 출력한다.
            outputs, _, _ = self.decode(tar, context_h, context_c)
            # 훈련시 출력의 seq_len은 tar(번역문)의 seq_len이다.
            return outputs #(BS, seq_len, tar_vocab)
        
        else: # 정답지가 없는 평가모드
            # 배치사이즈, 연산위치 정보 추출
            BS, src_seq_len = src.size() 
            device = src.device

            # 스페셜 토큰에서 SOS, EOS, PAD의 정수인코딩값 추출
            en_sos = spec_token.index('<SOS>')
            en_eos = spec_token.index('<EOS>')
            en_pad = spec_token.index('<PAD>')

            # 디코더에 입력할 첫번째 토큰을 (bs, 1) 차원에
            # 토큰값은 '<SOS>'토큰으로 채움
            input_token = torch.tensor([[en_sos]] * BS).to(device)
            outputs = [] # 출력 디코드 결과를 저장

            # 최대 디코딩 길이 지정 안했으면 원문 seq_len을 쓰자
            if self.max_len is None:
                self.max_len = src_seq_len
            
            # EOS를 샘플별로 예측하면 그 뒷단을 PAD로 채우는 flag 텐서
            pad_mask_flag = torch.zeros(BS, dtype=torch.bool, 
                                        device=device)
            # 만약에 Batch내 샘플이 <EOS>예측했다면 해당 샘플의 flag가 올라간다.

            # 토큰 단위로 예측이니 자가 회귀 방식임
            h, c = context_h, context_c
            for _ in range(self.max_len):
                # 토큰단위로 입력이니 출력은 (bs, 1, tar_vocab)이다.
                out_tokens, h, c = self.decode(input_token, h, c)

                # 추론 과정이니 next_token은
                # Greedy Search-> (bs, 1)결과가 됨
                next_token = out_tokens.argmax(dim=-1)
                outputs.append(next_token)

                # 배치 내 샘플이 <EOS>토큰을 예측한다면 해당샘플의 flag를 True로 올린다.
                # 이때 next_token = (bs, 1) -> (bs)로 차원 축소 후 인디케이터 연산해야함
                eos_indices = (next_token.squeeze(1) == en_eos)
                # OR연산이니까 EOS예측된 마스크는 계속 True로 남는다.
                pad_mask_flag = pad_mask_flag | eos_indices

                # 모든 샘플에서 EOS를 예측한 경우, 더이상 수행하지 않고 탈출
                if pad_mask_flag.all():
                    break

                # 자가 회귀방식이니 다음 입력을 갱신한다.
                input_token = torch.where(pad_mask_flag.unsqueeze(1), 
                                          torch.full_like(next_token, en_pad), 
                                          next_token)
            
            # 예측된 단어들을 모아서 최종 출력으로 반환
            # 이때 최종 출력은 # (BS, max_len) 규격을 맞추기 위해
            # PAD토큰을 채우는 과정을 수행한다.
            while len(outputs) < self.max_len:
                outputs.append(torch.full((BS, 1), en_pad, 
                                          device=device))

            outputs = torch.cat(outputs, dim=1)  # (BS, max_len)
            return outputs

여기까지가 Seq2Seq - LSTM버전이고

import torch.nn as nn

class Encoder_GRU(nn.Module):
    def __init__(self, src_vocab, src_emb_dim, rnn_dim, 
                 num_layer=1, bi=False):
        super(Encoder_GRU, self).__init__()
        # <PAD>토큰의 인덱싱을 지정하면 해당 idx(0)은
        # word_vector을 만들 때 모두 0으로 채워지게 만들어준다.
        self.embed = nn.Embedding(src_vocab, src_emb_dim,
                                  padding_idx=0)
        self.gru = nn.GRU(input_size=src_emb_dim,#언어모델 입력차원
                            hidden_size=rnn_dim,   #언어모델 출력차원
                            num_layers=num_layer,  #언어모델 몇층?
                            bidirectional=bi,      #양방향학습 On?
                            batch_first=True)      #왠만하면 True
    
    def forward(self, x): # x의 차원 : (BS, src_seq_len)
        emb = self.embed(x) # (BS, src_seq_len, src_emb_dim)
        # 인코더에 양방향 학습을 적용한다
        # rnn_out : (bs, src_seq_len, hidden_dim * 2)
        # hidden : (num_layer * 2, bs, hidden_dim)
        rnn_out , hidden = self.gru(emb)
        #인코더의 출력은 context_vector
        return hidden
import torch.nn as nn

class Decoder_GRU(nn.Module):
    def __init__(self, tar_vocab, tar_emb_dim, rnn_dim,
                 num_layer=1, bi=False):
        super(Decoder_GRU, self).__init__()
        # <PAD>토큰의 인덱싱을 지정하면 해당 idx(0)은
        # word_vector을 만들 때 모두 0으로 채워지게 만들어준다.
        self.embed = nn.Embedding(tar_vocab, tar_emb_dim,
                                  padding_idx=0)
        self.gru = nn.GRU(input_size=tar_emb_dim,#언어모델 입력차원
                            hidden_size=rnn_dim,   #언어모델 출력차원
                            num_layers=num_layer,  #언어모델 몇층?
                            bidirectional=bi,      #양방향학습 On?
                            batch_first=True)      #왠만하면 True
        # 디코더의 출력은 정답(번역문)의 seq_len이 되게 해야함
        # 맞춰야 하는 클래스 개수는 정답지의 단어 개수임
        if bi : #양방향으로 학습시에는 FC 레이어 입력차원이 두배
            self.fc = nn.Linear(rnn_dim*2, tar_vocab)
        else:
            self.fc = nn.Linear(rnn_dim, tar_vocab)
    
    # 디코더는 인코더의 context_vector을 초기 hidden으로 입력받는다.
    def forward(self, x, hidden): # x의 차원 : (BS, tar_seq_len)
        emb = self.embed(x) # (BS, tar_seq_len, tar_emb_dim)
        # 디코더에 양방향 학습을 적용한다
        # rnn_out : (bs, tar_seq_len, hidden_dim * 2)
        # hidden : (num_layer * 2, bs, hidden_dim)
        rnn_out, hidden = self.gru(emb, hidden)
        
        output = self.fc(rnn_out)
        # 최종 출력은 (bs, seq_len, tar_vocab)
        return output, hidden
import torch
import torch.nn as nn

class Seq2Seq_GRU(nn.Module):
    def __init__(self, encoder, decoder, max_len=None):
        super(Seq2Seq_GRU, self).__init__()
        self.encode = encoder
        self.decode = decoder
        # 최대 디코딩 길이 설정
        self.max_len = max_len

    def forward(self, src, tar=None, spec_token=None):
        # 인코더의 출력 = context_vector
        context_h = self.encode(src)

        if tar is not None:
            # 훈련시의 디코더 출력 : 고정길이로 출력한다.
            outputs, _ = self.decode(tar, context_h)
            # 훈련시 출력의 seq_len은 tar(번역문)의 seq_len이다.
            return outputs #(BS, seq_len, tar_vocab)
        
        else: # 정답지가 없는 평가모드
            # 배치사이즈, 연산위치 정보 추출
            BS, src_seq_len = src.size() 
            device = src.device

            # 스페셜 토큰에서 SOS, EOS, PAD의 정수인코딩값 추출
            en_sos = spec_token.index('<SOS>')
            en_eos = spec_token.index('<EOS>')
            en_pad = spec_token.index('<PAD>')

            # 디코더에 입력할 첫번째 토큰을 (bs, 1) 차원에
            # 토큰값은 '<SOS>'토큰으로 채움
            input_token = torch.tensor([[en_sos]] * BS).to(device)
            outputs = [] # 출력 디코드 결과를 저장

            # 최대 디코딩 길이 지정 안했으면 원문 seq_len을 쓰자
            if self.max_len is None:
                self.max_len = src_seq_len
            
            # EOS를 샘플별로 예측하면 그 뒷단을 PAD로 채우는 flag 텐서
            pad_mask_flag = torch.zeros(BS, dtype=torch.bool, 
                                        device=device)
            # 만약에 Batch내 샘플이 <EOS>예측했다면 해당 샘플의 flag가 올라간다.

            # 토큰 단위로 예측이니 자가 회귀 방식임
            h = context_h
            for _ in range(self.max_len):
                # 토큰단위로 입력이니 출력은 (bs, 1, tar_vocab)이다.
                out_tokens, h = self.decode(input_token, h)

                # 추론 과정이니 next_token은
                # Greedy Search-> (bs, 1)결과가 됨
                next_token = out_tokens.argmax(dim=-1)
                outputs.append(next_token)

                # 배치 내 샘플이 <EOS>토큰을 예측한다면 해당샘플의 flag를 True로 올린다.
                # 이때 next_token = (bs, 1) -> (bs)로 차원 축소 후 인디케이터 연산해야함
                eos_indices = (next_token.squeeze(1) == en_eos)
                # OR연산이니까 EOS예측된 마스크는 계속 True로 남는다.
                pad_mask_flag = pad_mask_flag | eos_indices

                # 모든 샘플에서 EOS를 예측한 경우, 더이상 수행하지 않고 탈출
                if pad_mask_flag.all():
                    break

                # 자가 회귀방식이니 다음 입력을 갱신한다.
                input_token = torch.where(pad_mask_flag.unsqueeze(1), 
                                          torch.full_like(next_token, en_pad), 
                                          next_token)
            
            # 예측된 단어들을 모아서 최종 출력으로 반환
            # 이때 최종 출력은 # (BS, max_len) 규격을 맞추기 위해
            # PAD토큰을 채우는 과정을 수행한다.
            while len(outputs) < self.max_len:
                outputs.append(torch.full((BS, 1), en_pad, 
                                          device=device))

            outputs = torch.cat(outputs, dim=1)  # (BS, max_len)
            return outputs

여기는 Seq2Seq - GRU 버전이다.

모델 2개를 선언하는 이유는 원래 Seq to Seq Learning with NN 논문에서는 비교군으로 SMT번역모델을 사용했지만
필자는 SMT는 소개할 생각이 없어서
비교군으로 백본 셀만 변경한 모델로 진행한다.


학습 준비

# 학습 실험 조건을 구분하기 위한 키
model_key = ['LSTM', 'GRU']
metrics_key = ['Loss', '정확도']
# Seq2Seq의 LSTM버전 인스턴스화
encoder_lstm = Encoder_LSTM(src_VS, EMB_DIM, UNIT_DIM, 
                            num_layer=NUM_Layers, bi=BI_DIR)
eecoder_lstm = Decoder_LSTM(tar_VS, EMB_DIM, UNIT_DIM,
                            num_layer=NUM_Layers, bi=BI_DIR)
Translater_lstm = Seq2Seq_LSTM(encoder_lstm, eecoder_lstm,max_len=tar_SL)

# Seq2Seq의 GRU버전 인스턴스화
encoder_gru = Encoder_GRU(src_VS, EMB_DIM, UNIT_DIM, 
                            num_layer=NUM_Layers, bi=BI_DIR)
eecoder_gru = Decoder_GRU(tar_VS, EMB_DIM, UNIT_DIM,
                            num_layer=NUM_Layers, bi=BI_DIR)
Translater_gru = Seq2Seq_GRU(encoder_gru, eecoder_gru,max_len=tar_SL)

각각의 버전별 모델을 인스턴스화 한 뒤,

# GPU사용 가능 유/무 확인
device = torch.device("cuda" if torch.cuda.is_available() else 'cpu')
models = {} # 딕셔너리

models[model_key[0]] = Translater_lstm.to(device)
models[model_key[1]] = Translater_gru.to(device)
import torch.optim as optim
# 로스함수 및 옵티마이저 설계
# 로스함수에서 <PAD> 토큰의 정수인덱스 번호 -> 0번에 대해서는
# 틀리건 맞건 무시하겠다 : ignore_index에 해당 정수 인덱스 번호 기입
ignore_class_idx = spec_token.index('<PAD>')
criterion = nn.CrossEntropyLoss(ignore_index=ignore_class_idx)

LR = 0.001 # 러닝레이트는 통일
optimizers = {}

optimizers[model_key[0]] = optim.Adam(Translater_lstm.parameters(), lr=LR)
optimizers[model_key[1]] = optim.Adam(Translater_gru.parameters(), lr=LR)

학습을 위한 나머지 하이퍼 파라미터 및 옵티마이저, 로스함수 등을 인스턴스화 하자.


학습 함수 설계

Seq2Seq는 따지고 보면 최종 출력물이 RNN : Many-to-Many 옵션이기에
이전 포스트 NLP-LSTM, GRU (3) : 개체명 인식(NER : Named Entity Recognition)에서 설계한

model_train, model_evaluate를 살짝 조정해서 사용하면 된다.

그러나 Seq2Seq는 추가로 설명해야 할 개념에 따른 포스트할게 몇개 좀 더 있기도 하고 변수명도 좀 번역기의 훈련/평가에 적합하게 변경하여 Seq_trainer.py을 새로 만들고 이를 계속 업데이트를 진행하고자 한다.

해당 파일은 역시 총 4개의 함수
data_reshape, cal_correct, model_train, model_evaluate이 구성되어 있다.

import torch
from tqdm import tqdm
def data_reshape(tar_pred, tar_data, tar_mask):
    # 입력되는 tar_pred : (Batch_size, tar_seq_len, tar_vocab_dim)
    # 입력되는 tar_data : (Batch_size, tar_seq_len)
    BS, tar_seq_len, tar_vocab_dim = tar_pred.size()

    re_tar_pred = tar_pred.view(-1, tar_vocab_dim) # (bs*seq, tar_vocab_dim)
    re_tar_data = tar_data.view(-1) #(bs*seq)
    re_tar_mask = tar_mask.view(-1) #마스크는 라벨이랑 차원변환과정이 동일하게 수행됨

    return re_tar_pred, re_tar_data, re_tar_mask
# 정답을 맞출 때 '무시'해야 할 클래스가 있을때 동작하는 함수
def cal_correct(tar_pred, tar_data, tar_mask=None):
    # 이게 Greedy Search임
    G_pred = tar_pred.argmax(dim=1) #가장 높은 예측값 하나 추출

    if tar_mask is not None: #마스크된 항목이 존재할 때
        correct = G_pred.eq(tar_data).masked_select(tar_mask).sum().item()
        total = tar_mask.sum().item() # 전체 원소 개수중 마스크처리된것만 분모
    else:
        correct = G_pred.eq(tar_data).sum().item()
        total = tar_data.numel() # 전체 원소 수를 분모로 처리함
    
    # 수치적 안정성을 보장하면서 연산을 수행하자
    iter_cor = correct / total if total > 0 else 0
    return iter_cor
def model_train(model, data_loader, loss_fn, optimizer_fn, 
                epoch, epoch_step, ignore_class=None):
    # 1개의 epoch내 batch단위(iter)로 연산되는 값이 저장되는 변수들
    iter_size, iter_loss, iter_correct = 0, 0, 0

    device = next(model.parameters()).device # 모델의 연산위치 확인
    model.train() # 모델을 훈련모드로 설정

    #특정 epoch_step 단위마다 tqdm 진행바가 생성되게 설정
    if (epoch+1) % epoch_step == 0 or epoch == 0:
        tqdm_loader = tqdm(data_loader)
    else:
        tqdm_loader = data_loader

    for src_data, tar_data in tqdm_loader:
        src_data, tar_data = src_data.to(device), tar_data.to(device)
        # 번역문의 PAD토큰은 마스크 처리하자
        tar_mask = (tar_data != ignore_class)
        # Forward, 모델이 예측값을 만들게 함
        tar_pred = model(src_data, tar_data) 

        # 데이터의 적절한 구조변환 수행
        tar_pred, tar_data, tar_mask = data_reshape(tar_pred, tar_data, tar_mask)
        # 구조변환을 해야 LossFn에 입력가능한 차원이 됨
        loss = loss_fn(tar_pred, tar_data)

        #backward 과정 수행
        optimizer_fn.zero_grad()
        loss.backward()
        optimizer_fn.step() # 마지막에 스케줄러 있으면 업뎃코드넣기

        BS = src_data.size(0) # Batch내 샘플(iter)연산용 인자값 추출
        # 현재 batch 내 샘플 개수당 correct, loss, 수행 샘플 개수 구하기
        iter_correct += cal_correct(tar_pred, tar_data, tar_mask) * BS
        iter_loss += loss.item() * BS
        iter_size += BS

        # tqdm에 현재 진행상태를 출력하기 위한 코드
        if (epoch+1) % epoch_step == 0 or epoch == 0:
            prograss_loss = iter_loss / iter_size
            prograss_acc = iter_correct / iter_size
            desc = (f"[훈련중]로스: {prograss_loss:.3f}, "
                    f"정확도: {prograss_acc:.3f}")
            tqdm_loader.set_description(desc)

    #현재 epoch에 대한 종합적인 정확도/로스 계산
    epoch_acc = iter_correct / iter_size
    epoch_loss = iter_loss / len(data_loader.dataset)
    return epoch_loss, epoch_acc
def model_evaluate(model, data_loader, loss_fn,
                    epoch, epoch_step, ignore_class=None):
    # 1개의 epoch내 batch단위(iter)로 연산되는 값이 저장되는 변수들
    iter_size, iter_loss, iter_correct = 0, 0, 0

    device = next(model.parameters()).device # 모델의 연산위치 확인
    model.eval() # 모델을 평가 모드로 설정

    #특정 epoch_step 단위마다 tqdm 진행바가 생성되게 설정
    if (epoch+1) % epoch_step == 0 or epoch == 0:
        tqdm_loader = tqdm(data_loader)
    else:
        tqdm_loader = data_loader

    with torch.no_grad(): #평가모드에서는 그래디언트 계산 중단
        for src_data, tar_data in tqdm_loader:
            src_data, tar_data = src_data.to(device), tar_data.to(device)
            # 번역문의 PAD토큰은 마스크 처리하자
            tar_mask = (tar_data != ignore_class)
            # Forward, 모델이 예측값을 만들게 함
            tar_pred = model(src_data, tar_data) 

            # 데이터의 적절한 구조변환 수행
            tar_pred, tar_data, tar_mask = data_reshape(tar_pred, tar_data, tar_mask)
            # 구조변환을 해야 LossFn에 입력가능한 차원이 됨
            loss = loss_fn(tar_pred, tar_data)

            BS = src_data.size(0) # Batch내 샘플(iter)연산용 인자값 추출
            # 현재 batch 내 샘플 개수당 correct, loss, 수행 샘플 개수 구하기
            iter_correct += cal_correct(tar_pred, tar_data, tar_mask) * BS
            iter_loss += loss.item() * BS
            iter_size += BS

    #현재 epoch에 대한 종합적인 정확도/로스 계산
    epoch_acc = iter_correct / iter_size
    epoch_loss = iter_loss / len(data_loader.dataset)
    return epoch_loss, epoch_acc

위 4개의 함수를 Seq_trainer.py 에 넣어 모듈화를 진행했으니
메인 구문에서는 아래의 코드로 불러오면 된다.

from Seq_trainer import *

num_epoch = 8 #총 훈련/검증 epoch값
ES = 2 # 디스플레이용 에포크 스텝

학습/검증

# 학습/검증 정보 저장
history = {key: {metric: [] 
                for metric in metrics_key} 
           for key in model_key}
for key in model_key:
    print(f"\n--현재 훈련중인 조건: [Seq2Seq_{key}]--") # 조건에 맞는 실험시작
    for epoch in range(num_epoch): #에포크별 모델 훈련/검증
        # 모델 훈련
        train_loss, train_acc = model_train(
            models[key], trainloader, criterion,
            optimizers[key], epoch, ES,
            ignore_class=ignore_class_idx #무시할 클래스 인덱스
        ) 
        #모델 검증
        val_loss, val_acc = model_evaluate(
            models[key], valloader, criterion,
            epoch, ES, 
            ignore_class=ignore_class_idx #무시할 클래스 인덱스
        )

        # 손실 및 성과 지표를 history에 저장
        history[key]['Loss'].append((train_loss, val_loss))
        history[key]['정확도'].append((train_acc, val_acc))

        # Epoch_step(ES)일 때마다 print수행
        if (epoch+1) % ES == 0 or epoch == 0:
            print(f"epoch {epoch+1:03d}," + "\t" + 
                f"훈련 [Loss: {train_loss:.3f}, " +
                f"Acc: {train_acc*100:.2f}%]")
            print(f"epoch {epoch+1:03d}," + "\t" + 
                f"검증 [Loss: {val_loss:.3f}, " +
                f"Acc: {val_acc*100:.2f}%]")
    print(f"--[Seq2Seq_{key}] 훈련 종료--\n") # 조건에 맞는 실험종료

학습/검증 결과 분석

import matplotlib.pyplot as plt
# 한글 사용을 위한 폰트 포함
plt.rcParams['font.family'] = 'Malgun Gothic'
plt.rcParams['axes.unicode_minus'] = False
# 학습/검증 결과 데이터를 재배치
res_data = {}

for key in model_key:
    res_data[key] = {}
    for metric in metrics_key:
        # 각 모델의 메트릭 데이터 추출
        metric_data = history[key][metric]
        # 훈련 및 검증 값 분리
        train_values = [tup[0] for tup in metric_data]
        val_values = [tup[1] for tup in metric_data]
        res_data[key][f'훈련_{metric}'] = train_values
        res_data[key][f'검증_{metric}'] = val_values
# 손실 및 정확도 그래프 그리기 그래프 생성
fig, axes = plt.subplots(2, 2, figsize=(12, 12))
axes = axes.flatten()  # 2차원 배열을 1차원으로 변환하여 인덱싱 쉽게 함

# 손실 그래프 그리기
for idx, key in enumerate(model_key):
    ax = axes[idx*2] #손실 그래프는 0, 2번째에 위치
    ax.plot(res_data[key]['훈련_Loss'], label='훈련 로스')
    ax.plot(res_data[key]['검증_Loss'], label='검증 로스')

    ax.set_title(f'Seq2Seq_{key} 번역 Loss', fontsize=16)
    ax.set_xlabel('Epoch')
    ax.set_ylabel('Loss')
    ax.legend()

# 정확도 그래프 그리기
for idx, key in enumerate(model_key):
    ax = axes[idx*2 + 1] #정확도 그래프는 1, 3번째에 위치
    ax.plot(res_data[key]['훈련_정확도'], label='훈련 정확도')
    ax.plot(res_data[key]['검증_정확도'], label='검증 정확도')
    ax.set_title(f'Seq2Seq_{key} 번역 정확도', fontsize=16)
    ax.set_xlabel('Epoch')
    ax.set_ylabel('정확도')
    ax.legend()


plt.tight_layout()
plt.show()

# 모든 모델의 손실과 정확도를 비교하는 그래프 생성
fig, axes = plt.subplots(1, 2, figsize=(16, 8))

# 모든 모델의 손실 그래프
ax = axes[0]
for key in model_key:
    ax.plot(res_data[key]['훈련_Loss'], label=f'Seq2Se2_{key} 훈련 로스')
    ax.plot(res_data[key]['검증_Loss'], label=f'Seq2Se2_{key} 검증 로스', linestyle='--')
ax.set_title('모든 조건별 모델의 훈련/검증 Loss', fontsize=16)
ax.set_xlabel('Epoch')
ax.set_ylabel('Loss')
ax.legend(fontsize=16)  # 범례의 폰트 크기 설정

# 모든 모델의 정확도 그래프
ax = axes[1]
for key in model_key:
    ax.plot(res_data[key]['훈련_정확도'], label=f'Seq2Se2_{key} 훈련 정확도')
    ax.plot(res_data[key]['검증_정확도'], label=f'Seq2Se2_{key} 검증 정확도', linestyle='--')
ax.set_title('모든 조건별 모델의 훈련/검증 정확도', fontsize=16)
ax.set_xlabel('Epoch')
ax.set_ylabel('정확도')
ax.legend(fontsize=16)  # 범례의 폰트 크기 설정

plt.tight_layout()
plt.show()

훈련/검증 결과를 보면
둘다 100%에 Loss는 거의 0에 가까운 값으로 떨어져서 정말 번역성능이 좋아보인다.

이제 실제 추론과정을 통해서 정말 번역이 잘 되는지를
확인해야 한다.



4. Seq2Seq 추론

seq2seq의 추론작업을 수행하기 전 준비작업을 먼저 진행하자


모델 저장 및 불러오기

# 학습된 모델 저장
path = {} #모델별 경로명 저장
for mk in model_key:
    path[mk] = f'Seq2Seq_{mk}.pth'
    torch.save(models[mk].state_dict(), path[mk])
# 저장된 모델 불러오기
load_model = {
    'LSTM': Translater_lstm,  # LSTM 모델 인스턴스 생성
    'GRU': Translater_gru     # GRU 모델 인스턴스 생성
}
for mk in model_key:
    load_model[mk].load_state_dict(torch.load(path[mk], weights_only=True))
    #추론기는 CPU에서 돌리자
    load_model[mk] = load_model[mk].to('cpu')

추론 작업은 위 학습/검증이 끝난 seq2seq를 저장 후 불러오기 방식으로 사용하며
이때 불러온 모델은 CPU에서 추론작업이 진행되게 설정한다.


데이터 샘플링

정수인코딩까지 완료한 Test 데이터 군 중 일부 데이터만 추출하여 추론작업을 수행한다.

import random
# 테스트 데이터셋에서 샘플을 추출
# 전체 테스트 데이터 개수정보를 추출
num_test = padded_src_test.shape[0]
sample_epoch = 10 #추출할 샘플 개수 정의
indices = random.sample(range(num_test), sample_epoch)

# 추출한 샘플번호를 바탕으로 Test 데이터셋에서 무작위 추출
S_src_test = padded_src_test[indices]
S_tar_test = padded_tar_test[indices]

# 원문 데이터만 텐서 자료형으로 변환
TS_src_test = torch.tensor(S_src_test, dtype=torch.long)

총 10개의 Test 샘플을 추출하여 추론작업을 수행한다.


추론 함수 설계

앞서 model_train, model_evaluate 함수를 설계했으니
이번에는 model_inference함수를 설계하여 이를 사용하고자 한다.

def model_inference(model, src_data, spec_token):

    device = next(model.parameters()).device # 모델의 연산위치 확인
    model.eval() # 모델을 평가 모드로 설정

    with torch.no_grad(): #평가모드에서는 그래디언트 계산 중단
        src_data = src_data.to(device)
        # 추론에서 입력되는 데이터 구조는 (1, src_seq_len)이다.
        # 추론 과정이기에 정답지(tar_data)는 입력하지 않는다.
        tar_infer = model(src_data, spec_token=spec_token)
        # 추론결과는 (BS=1, max_len) -> numpy자료형 변환
        nd_tar_infer = tar_infer.cpu().numpy()

    return nd_tar_infer[0] #BS 차원을 날림

설계한 model_inferenceSeq_trainer.py에 삽입하여 업데이트 한다.


모델 추론 시작

# 추론 결과를 저장할 딕셔너리
tar_dict = {key: []  for key in model_key}

for key in model_key:
    print(f"\n--현재 추론 조건: [Seq2Seq_{key}]--") # 조건에 맞는 실험시작
    for idx in tqdm(range(sample_epoch)): #추론 에포크별 추론 시작
        # 입력되는 원문 차원을 (1, src_seq_len)으로 만들기 위한 코드
        iter_src_data = TS_src_test[idx].unsqueeze(0)
        # 모델 추론 -> 추론결과는 (bs, max_len) ndarray타입임
        tar_infer_doc = model_inference(load_model[key], 
                            iter_src_data, spec_token)
        tar_dict[key].append(tar_infer_doc)

추론이 완료되면 각 모델별 결과물이 tar_dict에 기록된다.


추론 결과 확인

추론한 결과를 확인하려면 디코딩(후처리) 작업을 수행해야 한다.

이를 위한 함수 설계를 먼저 수행하자

def post_processor(encode_doc, idx_to_word, spec_token):
    decode_doc = ['<SOS>'] #코드 안정성을 위해 맨앞에 토큰 추가
    for en_word in encode_doc:
        #인코딩된 단어 -> 디코드화
        #디코딩한 단어가 없는 경우 스페셜토큰의 <UNK>로 변경
        de_word = idx_to_word.get(en_word, spec_token[1])  
        # 디코딩한 단어가 subword인 경우
        if de_word.startswith('##'):
            decode_doc[-1] += de_word[2:]
        # 디코딩한 결과가 스페셜 토큰이 아닌것만 문장에 추가
        elif de_word not in spec_token:
            decode_doc.append(de_word)
    decode_doc.pop(0) #리스트의 맨 앞 원소 삭제
    return ' '.join(decode_doc)

설계한 후처리 함수를 사용하여 번역결과를 확인하자

idx_list = range(1, sample_epoch+1)
for idx, src, pred_lstm, pred_gru, tar in zip(idx_list, S_src_test, 
                                              tar_dict['LSTM'], 
                                              tar_dict['GRU'], 
                                              S_tar_test):
    # 원문, 모델번역문_1, 모델번역문_2, 정답번역문 순으로 디코딩
    decode_src = post_processor(src, idx_to_src, spec_token)
    de_pred_lstm = post_processor(pred_lstm, idx_to_tar, spec_token)
    de_pred_gru = post_processor(pred_gru, idx_to_tar, spec_token)
    decode_tar = post_processor(tar, idx_to_tar, spec_token)

    print(f"{idx}번째 번역 결과 확인")
    print(f"원문(src) : {decode_src}")
    print(f"seqLSTM번역 : {de_pred_lstm}")
    print(f"seq-GRU번역 : {de_pred_gru}")
    print(f"번역문(tar) : {decode_tar}")
    print("==============================\n")

번역이 왜 이따구니...

다음 포스트에서는 학습/검증 성능은 우수하게 나왔으나
실제 추론성능은 형편없게 나온 이유에 대한 분석과

이를 막기 위한 Teacher forcing 비율조정에 대해 포스팅을 진행하겠다.


금일 포스팅한 실습 코드 및 전처리 함수 등은
https://github.com/tbvjvsladla/Seq2Seq
에 업로드 하였습니다

본 포스팅에 사용한 실습 내용은 seq2seq실습-01바닐라버전.ipynb파일을 참조 바랍니다.

profile
자율차 공부중

0개의 댓글