[프로젝트]챗봇 만들기

sobing·2022년 8월 3일
0

project

목록 보기
3/4

해당 포스팅은 텐서플로와 머신러닝으로 시작하는 자연어 처리를 참고하여 작성되었습니다.
책 구매링크는 (http://www.yes24.com/Product/Goods/69334316?pid=123482&cosemkid=nc15495305736503587)입니다.

트랜스포머에 대해 자세한 내용은 이전 포스팅 트랜스포머 논문리뷰에 있습니다.
https://velog.io/@sobing/딥러닝NLPTransformerAttention-Is-All-You-Need

이번 포스팅은 구현에만 초점을 둘 예정입니다.

데이터셋

https://github.com/songys/Chatbot_data 에 있는 데이터셋을 사용했다. 총 11876개의 데이터로 각 데이터는 질문, 대답, 라벨값이다. 라벨값은 0,1,2는 차례대로 중간,긍정,부정을 나타낸다.

import pandas as pd
data=pd.read_csv('ChatbotData.csv')

모델구현

소스구조

|-- data_in
	|-- ChatBotData.csv
    |-- ChatBotData.csv_short	#테스트용도의 축소데이터
|-- data_out
	|-- vocabularyData.voc 		#사전파일	
    |-- check_point			    #체크포인트 저장공간
|-- configs.py
|-- data.py
|-- main.py
|-- model.py
|-- predict.py

configs.py

#주피터노트북 사용시 커널에 전달하기 위해 사용
#tf.app.flags.DEFINE_string('f','','kernel')


tf.app.flags.DEFINE_integer('batch_size', 64, 'batch size')  # 배치 크기
tf.app.flags.DEFINE_integer('train_steps', 20000, 'train steps')  # 학습 에포크
tf.app.flags.DEFINE_float('dropout_width', 0.5, 'dropout width')  # 드롭아웃 크기
tf.app.flags.DEFINE_integer('embedding_size', 128, 'embedding size')  # 가중치 크기 # 논문 512 사용
tf.app.flags.DEFINE_float('learning_rate', 1e-3, 'learning rate')  # 학습률
tf.app.flags.DEFINE_integer('shuffle_seek', 1000, 'shuffle random seek')  # 셔플 시드값
tf.app.flags.DEFINE_integer('max_sequence_length', 25, 'max sequence length')  # 시퀀스 길이
tf.app.flags.DEFINE_integer('model_hidden_size', 128, 'model weights size')  # 모델 가중치 크기
tf.app.flags.DEFINE_integer('ffn_hidden_size', 512, 'ffn weights size')  # ffn 가중치 크기
tf.app.flags.DEFINE_integer('attention_head_size', 4, 'attn head size')  # 멀티 헤드 크기
tf.app.flags.DEFINE_integer('layer_size', 2, 'layer size')  # 논문은 6개 레이어이나 2개 사용 학습 속도 및 성능 튜닝
tf.app.flags.DEFINE_string('data_path', '../data_in/ChatBotData.csv', 'data path')  # 데이터 위치
tf.app.flags.DEFINE_string('vocabulary_path', './data_out/vocabularyData.voc', 'vocabulary path')  # 사전 위치
tf.app.flags.DEFINE_string('check_point_path', './data_out/check_point', 'check point path')  # 체크 포인트 위치
tf.app.flags.DEFINE_boolean('tokenize_as_morph', False, 'set morph tokenize')  # 형태소에 따른 토크나이징 사용 유무
tf.app.flags.DEFINE_boolean('xavier_initializer', True, 'set xavier initializer')  # xavier initializer를 사용할 것인지에 대한 

# Define FLAGS
DEFINES = tf.app.flags.FLAGS

사용될 하이퍼파라미터 값들이 지정되어있다. 이값들은 각 코드파일에서 다음과 같이 불러와 사용하면 된다.

from configs ipmort DEFINES

data.py

이 파일에는 데이터를 불러와 전처리를 하는 과정이다.
만약 코랩에서 사용하는 경우 다음코드를 통해 konlpy다운 받아야 한다.

!apt-get update
!apt-get install g++ openjdk-8-jdk 
!pip install konlpy JPype1-py3
!bash <(curl -s https://raw.githubusercontent.com/konlpy/konlpy/master/scripts/mecab.sh)
!pip install konlpy

다음은 필요한 모듈들이다.

from konlpy.tag import Twitter
import pandas as pd
import tensorflow as tf
import enum
import os
import re
from sklearn.model_selection import train_test_split
import numpy as np
from configs import DEFINES #config파일 사용위해

from tqdm import tqdm #진행속도를 알려줌

정규표현식에 사용할 필터들, 사용할 토큰들을 정의한다.

FILTERS = "([~.,!?\"':;)(])" #나중에 특수문자 제거할때 사용
PAD = "<PAD>"
STD = "<SOS>"
END = "<END>"
UNK = "<UNK>"

PAD_INDEX = 0
STD_INDEX = 1
END_INDEX = 2
UNK_INDEX = 3

MARKER = [PAD, STD, END, UNK]
CHANGE_FILTER = re.compile(FILTERS) #미리 컴파일을 해두면 패턴사용시 시간을 줄일 수 있다.

이제 데이터를 로드한다.

def load_data():
    
    data_df = pd.read_csv(DEFINES.data_path, header=0)
    #질문과 답변으로 나눈다.
    question, answer = list(data_df['Q']), list(data_df['A'])
    
    #학습데이터와 검증용 데이터로 나눈다.
    train_input, eval_input, train_label, eval_label = train_test_split(question, answer, test_size=0.3,random_state=42)
    
    return train_input, train_label, eval_input, eval_label

형태소로 분리하는 작업을 하는 함수를 만들어준다. 이 함수는 configs.py파일을 통해 사용할지 말지 결정할 수 있다.

def prepro_like_morphlized(data):
    morph_analyzer = Twitter() #형태소 분석하는 객체
    result_data = list()
    for seq in tqdm(data): #매 문장마다 형태소분석을 하는 반복문
        
        morphlized_seq = " ".join(morph_analyzer.morphs(seq.replace(' ', '')))
        result_data.append(morphlized_seq)

    return result_data

이제 데이터를 인코더 부분과 디코더 부분에 대해 각각 다르게 전처리를 해야한다.
아래 코드를 보면 value,dictionary를 받는데, 형태소를 기준으로 토크나이징을 할지 띄어쓰기 기준이로 토크나이징할지에 따라 어떻게 처리되는지 나뉜다.

#value가 단어이고 단어값을 기준으로 딕셔너리에서 인덱스를 받아옴
def enc_processing(value, dictionary):
    #시퀀스의 인풋 인덱스
    sequences_input_index = []
    
    # 형태소 토크나이징 사용 유무
    if DEFINES.tokenize_as_morph:
        value = prepro_like_morphlized(value)

    
    for sequence in value:
        #특수문자 제거
        sequence = re.sub(CHANGE_FILTER, "", sequence)
        
        #하나의 문장을 인코딩할때
        sequence_index = []
        
        #문장을 띄어쓰기 단어로 잘라서 처리
        for word in sequence.split():
            #단어들이 딕셔너리에 존재하면 인덱스값을 추가
            if dictionary.get(word) is not None:
                sequence_index.extend([dictionary[word]])
            #존재하지 않으면 UNK토큰 사용
            else:
                sequence_index.extend([dictionary[UNK]])
        # 최대 길이보다 긴문장인 경우
        if len(sequence_index) > DEFINES.max_sequence_length:
            sequence_index = sequence_index[:DEFINES.max_sequence_length]
        
        
        sequences_length.append(len(sequence_index))
        
        #짧은 문장인 경우
        sequence_index += (DEFINES.max_sequence_length - len(sequence_index)) * [dictionary[PAD]]
        
        # 인덱스화 되어 있는 값을 sequences_input_index에 넣어 준다.
        sequences_input_index.append(sequence_index)
    
    return np.asarray(sequences_input_index), sequences_length

위 코드를 살펴보면 특수문자를 모두 제거후 단어사전을 이용해 단어를 인덱스로 바꾼다. 만약 포함되어있지 않으면 UNK토큰을 사용한다. 그 후 모델에 적용할 최대 길이보다 긴 문장일 경우에는 잘라내고 짧은 문장일 경우에는 뒷부분에 패딩값으로 채워준다.

이제 디코더 부분에 필요한 전처리 함수가 필요하다. 디코더에는 두가지 전처리 함수가 필요하다. 하나는 디코더 입력값을 만들고, 또다른 하나는 디코더 타깃값을 위한 함수이다.
입력값에는 문장의 시작과 끝에 start,padding토큰이 있고, 타깃값에는 문장의 끝에 end,padding토큰이 있다.

먼저 디코더의 입력값을 만드는 함수먼저 보자. 시작토큰을 넣어주고 나머지는 인코더 입력을 위한 전처리 함수와 매우 비슷하다.

def dec_output_processing(value, dictionary):
    
    sequences_output_index = []
    sequences_length = []
    
    # 형태소 토크나이징 사용 유무
    if DEFINES.tokenize_as_morph:
        value = prepro_like_morphlized(value)
    
    for sequence in value:
        
        sequence = re.sub(CHANGE_FILTER, "", sequence)
        sequence_index = []
        
        # start토큰을 문장의 앞에 넣어주고 띄어쓰기 단위별로 인덱스화 시켜준다.
        sequence_index = [dictionary[STD]] + [dictionary[word] for word in sequence.split()]
        # 문장길이 제한길이보다 긴 경우
        if len(sequence_index) > DEFINES.max_sequence_length:
            sequence_index = sequence_index[:DEFINES.max_sequence_length]
        
        sequences_length.append(len(sequence_index))
        # 문장길이 짧은 경우
        sequence_index += (DEFINES.max_sequence_length - len(sequence_index)) * [dictionary[PAD]]
        sequences_output_index.append(sequence_index)
    
    return np.asarray(sequences_output_index), sequences_length

이제 디코더 타깃값을 만드는 전처리 함수를 보자. 토큰만 달라지기때문에 매우 비슷하다.


def dec_target_processing(value, dictionary):
    
    sequences_target_index = []
    
    if DEFINES.tokenize_as_morph:
        value = prepro_like_morphlized(value)
    
    for sequence in value:
        
        sequence = re.sub(CHANGE_FILTER, "", sequence)
        sequence_index = [dictionary[word] for word in sequence.split()]
        #문장이 길어진 경우 자르고 end토큰 넣어준다.
        if len(sequence_index) >= DEFINES.max_sequence_length:
            sequence_index = sequence_index[:DEFINES.max_sequence_length - 1] + [dictionary[END]]
        else:
            sequence_index += [dictionary[END]]
        # 짧은경우
        sequence_index += (DEFINES.max_sequence_length - len(sequence_index)) * [dictionary[PAD]]
        
        sequences_target_index.append(sequence_index)
    
    return np.asarray(sequences_target_index)

이제 학습한 모델을 통해 예측할때 사용하는 함수이다. 예측결과는 단어의 인덱스 벡터 형태이기 때문에 실제 단어들로 바꿔주는 함수이다.

def pred2string(value, dictionary):
    # 결과값을 저장할 리스트
    sentence_string = []
    
    # 인덱스 배열 하나를 꺼내서 v에 넘겨준다.
    for v in value:
        # 딕셔너리에 있는 단어로 변경해서 배열에 담는다.
        sentence_string = [dictionary[index] for index in v['indexs']]

    
    print(sentence_string)
    
    answer = ""
    # 패딩은 모두 스페이스 처리 한다.
    for word in sentence_string:
        if word not in PAD and word not in END:
            answer += word
            answer += " "
    # 결과를 출력한다.
    print(answer)
    return answer

이제 다음 문장을 예측하는 함수이다. 위 함수와 배우 비슷하다.

def pred_next_string(value, dictionary):
    
    sentence_string = []
    is_finished = False #문장이 끝났는지 아닌지 표시

    
    for v in value:
        
        sentence_string = [dictionary[index] for index in v['indexs']]

    answer = ""
    
    for word in sentence_string:
    	#end토큰을 만나면 문장이 끝났으니 is_finished도 True로 바꾼뒤 반복문을 빠져나온다.
        if word == END:
            is_finished = True
            break

        if word != PAD and word != END:
            answer += word
            answer += " "

    # 결과를 출력한다.
    return answer, is_finished

단어 사전 또한 인덱스화 되어야한다. 이를 만들기 위해서 데이터를 전처리해 단어 리스트로 먼저 만들어야 한다.

def data_tokenizer(data):
    # 토크나이징 해서 담을 배열 생성
    words = []
    for sentence in data:
    	sentence = re.sub(CHANGE_FILTER, "", sentence)
        for word in sentence.split():
            words.append(word)
    return [word for word in words if word]

이제 전체 데이터의 모든 단어를 포함하는 단어 리스트로 만든다. 단어사전을 만드는 과정이다.

def load_vocabulary():
    # 사전을 담을 배열
    vocabulary_list = []
    # 우선적으로 경로에 단어 사전 파일이 있다면 불러와서 사용하고 없다면 만든다.
    if (not (os.path.exists(DEFINES.vocabulary_path))):
        # 단어 사전이 존재하지 않으므로 만들어야 한다. 데이터셋은 config파일에서 파일경로에서 가져온다.
        if (os.path.exists(DEFINES.data_path)):
            data_df = pd.read_csv(DEFINES.data_path, encoding='utf-8')
            question, answer = list(data_df['Q']), list(data_df['A'])
            if DEFINES.tokenize_as_morph:  # 형태소에 따른 토크나이져 처리
                question = prepro_like_morphlized(question)
                answer = prepro_like_morphlized(answer)
            data = []
            #데이터 모두 가져와 모든 단어들을 포함하게 만든다.
            data.extend(question)
            data.extend(answer)
            words = data_tokenizer(data)
            words = list(set(words)) #중복제거
            
            # MAKER로 정의한 토큰들도 사전에 추가
            words[:0] = MARKER
        # 해당 경로에 저장
        with open(DEFINES.vocabulary_path, 'w', encoding='utf-8') as vocabulary_file:
            for word in words:
                vocabulary_file.write(word + '\n')

    # 사전 파일이 존재할때 사전 읽어오기
    with open(DEFINES.vocabulary_path, 'r', encoding='utf-8') as vocabulary_file:
        for line in vocabulary_file:
            vocabulary_list.append(line.strip())

    # make_vocabulary함수를 적용한다. 아래에 정의되어있다.
    char2idx, idx2char = make_vocabulary(vocabulary_list)
    # 두가지 형태의 키와 값이 있는 형태를 리턴한다. 
    # 단어: 인덱스 , 인덱스: 단어)
    return char2idx, idx2char, len(char2idx)
def make_vocabulary(vocabulary_list):
    
    char2idx = {char: idx for idx, char in enumerate(vocabulary_list)}
    idx2char = {idx: char for idx, char in enumerate(vocabulary_list)}
    # 두개의 딕셔너리를 넘겨 준다.
    return char2idx, idx2char

하나는 단어에 대한 인덱스를 나타내고, 하나는 인덱스에 대한 단어를 나타내도록 하는 딕셔너리이다.
이제 데이터 전처리 함수는 모두 끝났다.

학습과 평가에 사용할 데이터셋을 만들기 전에 다음 함수를 먼저 정의한다.

def rearrange(input, output, target):
    features = {"input": input, "output": output}
    return features, target

모델에 3개의 데이터가 적용이 되는데, 2개는 각각 인코더와 디코더에 적용되는 입력값이고 하나는 타깃값으로 디코더에서 사용될 값이다. 이 함수는 3개의 인자값을 받고 2개의 입력값은 하나의 딕셔너리 형태로 묶는다.

텐서플로 모델에 데이터를 적용하기위해 데이터를 만드는 함수를 만들어보자. 다음은 학습에 들어가 배치 데이터를 만드는 함수이다.


def train_input_fn(train_input_enc, train_output_dec, train_target_dec, batch_size):
    # Dataset을 생성하는 부분으로써 from_tensor_slices부분은 인자들을 각각 한 문장으로 자른다
    dataset = tf.data.Dataset.from_tensor_slices((train_input_enc, train_output_dec, train_target_dec))
    
    dataset = dataset.shuffle(buffer_size=len(train_input_enc))
    
    # 배치 인자 값이 없다면  에러를 발생 시킨다.
    assert batch_size is not None, "train batchSize must not be None"
    
    # from_tensor_slices를 통해 나눈것을 배치크기 만큼 묶어 준다.
    dataset = dataset.batch(batch_size, drop_remainder=True)
    
    # 데이터 각 요소에 대해서 rearrange 함수를 통해서 요소를 변환하여 맵으로 구성한다.
    dataset = dataset.map(rearrange)
    
    # repeat()함수에 아무 인자도 없다면 무한으로 이터레이터 된다.
    dataset = dataset.repeat()
    
    # make_one_shot_iterator를 통해 이터레이터를 만들어 준다.
    iterator = dataset.make_one_shot_iterator()
    
    # 이터레이터를 통해 다음 항목의 개체를 넘겨준다.
    return iterator.get_next()    

인자값으로는 인코더에 적용될 입력값, 디코더에 적용될 입력값, 학습 시 디코더에서 사용될 타깃값이다.
assert문을 통해 배치크기를 인자로 전달하지 않는 경우에는 오류를 발생시킨다.

이제 다음은 평가에 들어가 배치 데이터를 만드는 함수이다. 위 함수에 매우 비슷하다.


def eval_input_fn(eval_input_enc, eval_output_dec, eval_target_dec, batch_size):
   
    dataset = tf.data.Dataset.from_tensor_slices((eval_input_enc, eval_output_dec, eval_target_dec))
    dataset = dataset.shuffle(buffer_size=len(eval_input_enc))
    assert batch_size is not None, "eval batchSize must not be None"
    dataset = dataset.batch(batch_size, drop_remainder=True)
    dataset = dataset.map(rearrange)
    
    # 평가이므로 1회만 동작 시킨다.
    dataset = dataset.repeat(1)
    iterator = dataset.make_one_shot_iterator()
    
    return iterator.get_next()

이제 마지막 코드를 추가해주면 data.py는 완성되었다.

def main(self):
    char2idx, idx2char, vocabulary_length = load_vocabulary()


if __name__ == '__main__':
    tf.logging.set_verbosity(tf.logging.INFO)
    tf.app.run(main)

model.py

필요한 모듈 임포트해준다.

import tensorflow as tf
import sys

from configs import DEFINES
import numpy as np

Residual Connection

입력정보 x가 있고 신경망을 거쳐나온 정보가 F(x)일때 전달되는 값을 x+F(x)로 주는 것이다.

def layer_norm(inputs, eps=1e-6):
    feature_shape = inputs.get_shape()[-1:]
    #  평균과 표준편차을 넘겨 준다.
    mean = tf.keras.backend.mean(inputs, [-1], keepdims=True)
    std = tf.keras.backend.std(inputs, [-1], keepdims=True)
    beta = tf.Variable(tf.zeros(feature_shape), trainable=False)
    gamma = tf.Variable(tf.ones(feature_shape), trainable=False)

    return gamma * (inputs - mean) / (std + eps) + beta


def sublayer_connection(inputs, sublayer, dropout=0.2):
    
    outputs = layer_norm(inputs + tf.keras.layers.Dropout(dropout)(sublayer))
    return outputs

sublayer_connection에서 residual learning을 구현한뒤 layer_norm에서 layer normalization을 구현한다.
residual connection뒤 dropout하고 layer normalization을 하는 구조이다.
layer_norm에서 eps을 사용하는 이유는 0으로 나눠지면 오류가 발생하기 때문이다.

Position-wise Feed Forward Network

FFN(x) = max(0, xW1 + b1)W2 + b2
FFN의 식은 위와 같다. 다음과 같이 구현한다.

def feed_forward(inputs, num_units):
    
    feature_shape = inputs.get_shape()[-1]
    inner_layer = tf.keras.layers.Dense(num_units, activation=tf.nn.relu)(inputs)
    outputs = tf.keras.layers.Dense(feature_shape)(inner_layer)

    return outputs

두번의 선형결합과 활성화 함수 ReLU를 사용한다.
선형결합 후 ReLu를 통과시킨뒤에 선형결합을 한번더 하는 구조이다.

Positional Encoding

PE(pos,2i)=sin(pos/100002i/dmodel)PE(pos,2i) = sin(pos/10000^{{2i}/d_{model}})
PE(pos,2i+1)=cos(pos/100002i/dmodel)PE(pos,2i+1) = cos(pos/10000^{{2i}/d_{model}})
식은 위와 같다. 인덱스가 짝수이면 사인함수를 사용하고 홀수이면 코사인함수를 이용한다.

def positional_encoding(dim, sentence_length):
    encoded_vec = np.array([pos / np.power(10000, 2 * i / dim)
                            for pos in range(sentence_length) for i in range(dim)])
    encoded_vec[::2] = np.sin(encoded_vec[::2])
    encoded_vec[1::2] = np.cos(encoded_vec[1::2])
    return tf.constant(encoded_vec.reshape([sentence_length, dim]), dtype=tf.float32)

인덱스의 홀짝에 따라 encoded_vec를 다르게 할당해 포지셔널 인코딩을 한다.

Attention

def scaled_dot_product_attention(query, key, value, masked=False):
    # Attention(Q, K, V ) = softmax(QKt / root dk)V
    key_dim_size = float(key.get_shape().as_list()[-1])
    key = tf.transpose(key, perm=[0, 2, 1])
    outputs = tf.matmul(query, key) / tf.sqrt(key_dim_size)

    if masked:
        diag_vals = tf.ones_like(outputs[0, :, :])
        tril = tf.linalg.LinearOperatorLowerTriangular(diag_vals).to_dense()
        masks = tf.tile(tf.expand_dims(tril, 0), [tf.shape(outputs)[0], 1, 1])

        paddings = tf.ones_like(masks) * (-2 ** 32 + 1)
        outputs = tf.where(tf.equal(masks, 0), paddings, outputs)

    attention_map = tf.nn.softmax(outputs)

    return tf.matmul(attention_map, value)
def multi_head_attention(query, key, value, num_units, heads, masked=False):
    query = tf.keras.layers.Dense(num_units, activation=tf.nn.relu)(query)
    key = tf.keras.layers.Dense(num_units, activation=tf.nn.relu)(key)
    value = tf.keras.layers.Dense(num_units, activation=tf.nn.relu)(value)

    query = tf.concat(tf.split(query, heads, axis=-1), axis=0)
    key = tf.concat(tf.split(key, heads, axis=-1), axis=0)
    value = tf.concat(tf.split(value, heads, axis=-1), axis=0)

    attention_map = scaled_dot_product_attention(query, key, value, masked)

    attn_outputs = tf.concat(tf.split(attention_map, heads, axis=0), axis=-1)
    attn_outputs = tf.keras.layers.Dense(num_units, activation=tf.nn.relu)(attn_outputs)

    return attn_outputs

attention연산에 대한 자세한 내용과 연산방식은 맨위 링크에 설명해 놓았다.

Encoder

def encoder_module(inputs, model_dim, ffn_dim, heads)def encoder_module(inputs, model_dim, ffn_dim, heads)`:
    self_attn = sublayer_connection(inputs, multi_head_attention(inputs, inputs, inputs,
                                                                 model_dim, heads))
    outputs = sublayer_connection(self_attn, feed_forward(self_attn, ffn_dim))
    return outputs
    
def encoder(inputs, model_dim, ffn_dim, heads, num_layers):
    outputs = inputs
    for i in range(num_layers):
        outputs = encoder_module(outputs, model_dim, ffn_dim, heads)

    return outputs

Decoder

def decoder_module(inputs, encoder_outputs, model_dim, ffn_dim, heads):
    masked_self_attn = sublayer_connection(inputs, multi_head_attention(inputs, inputs, inputs,
                                                                        model_dim, heads, masked=True))
    self_attn = sublayer_connection(masked_self_attn, multi_head_attention(masked_self_attn, encoder_outputs,
                                                                           encoder_outputs, model_dim, heads))
    outputs = sublayer_connection(self_attn, feed_forward(self_attn, ffn_dim))

    return outputs

def decoder(inputs, encoder_outputs, model_dim, ffn_dim, heads, num_layers):
    outputs = inputs
    for i in range(num_layers):
        outputs = decoder_module(outputs, encoder_outputs, model_dim, ffn_dim, heads)

    return outputs

인코더와 디코더에 대한 자세한 구조도 맨위 링크에 설명해 놓았다.

Model

def Model(features, labels, mode, params):
    TRAIN = mode == tf.estimator.ModeKeys.TRAIN
    EVAL = mode == tf.estimator.ModeKeys.EVAL
    PREDICT = mode == tf.estimator.ModeKeys.PREDICT

    position_encode = positional_encoding(params['embedding_size'], params['max_sequence_length'])

    if params['xavier_initializer']:
        embedding_initializer = 'glorot_normal'
    else:
        embedding_initializer = 'uniform'

    embedding = tf.keras.layers.Embedding(params['vocabulary_length'],
                                          params['embedding_size'],
                                          embeddings_initializer=embedding_initializer)

    x_embedded_matrix = embedding(features['input']) + position_encode
    y_embedded_matrix = embedding(features['output']) + position_encode

    encoder_outputs = encoder(x_embedded_matrix, params['model_hidden_size'], params['ffn_hidden_size'],
                              params['attention_head_size'], params['layer_size'])
    decoder_outputs = decoder(y_embedded_matrix, encoder_outputs, params['model_hidden_size'],
                              params['ffn_hidden_size'],
                              params['attention_head_size'], params['layer_size'])

    logits = tf.keras.layers.Dense(params['vocabulary_length'])(decoder_outputs)

    predict = tf.argmax(logits, 2)

    if PREDICT:
        predictions = {
            'indexs': predict,
            'logits': logits,
        }
        return tf.estimator.EstimatorSpec(mode, predictions=predictions)

    # 정답 차원 변경을 한다. [배치 * max_sequence_length * vocabulary_length]  
    # logits과 같은 차원을 만들기 위함이다.
    labels_ = tf.one_hot(labels, params['vocabulary_length'])
    loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=logits, labels=labels_))

    accuracy = tf.metrics.accuracy(labels=labels, predictions=predict)

    metrics = {'accuracy': accuracy}
    tf.summary.scalar('accuracy', accuracy[1])

    if EVAL:
        return tf.estimator.EstimatorSpec(mode, loss=loss, eval_metric_ops=metrics)

    assert TRAIN

    # lrate = d−0.5 *  model · min(step_num−0.5, step_num · warmup_steps−1.5)
    optimizer = tf.train.AdamOptimizer(learning_rate=params['learning_rate'])
    train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())

    return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

먼저 임베딩 행렬을 생성을 두개 한다. 하나는 위치정보를 포함하는 포지션 인코딩 행렬이고 하나는 각 단어를 담은 임베딩 매트릭스 이다.

인코더와 디코더의 입력값을 임베딩 처리해야하기 때문에 앞에 정의한 임베딩 객체에 각 입력값을 적용해 임베딩 값을 받아온다. 그후 임베딩된 값에 포지셔널 인코딩값을 더해주는 것이다.

먼저 정의한 인코더모듈과 디코더모듈을 이용해 인코더와 디코더를 만든다.
인코더는 입력으로 앞에 만든 임베딩된 매트릭스와 모델차원, 피드포워드 차원수, 어텐션모듈 head개수, 인코더를 몇개 쌓을지결정한다.
디코더는 입력으로 인코더의 출력값과 나머지 인자값은 인코더처럼 params를 이용하면 된다.

predict.py

import tensorflow as tf
import data
import os
import sys
import model as ml

from configs import DEFINES

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'


if __name__ == '__main__':
    tf.logging.set_verbosity(tf.logging.ERROR)
    arg_length = len(sys.argv)

    if (arg_length < 2):
        raise Exception("Don't call us. We'll call you")

    # 데이터를 통한 사전 구성
    char2idx, idx2char, vocabulary_length = data.load_vocabulary()

    # 테스트용 데이터 만드는 부분
    # 인코딩 부분 
    input = ""
    for i in sys.argv[1:]:
        input += i
        input += " "

    print(input)
    predic_input_enc, predic_input_enc_length = data.enc_processing([input], char2idx)
    
    predic_output_dec, predic_output_dec_length = data.dec_output_processing([""], char2idx)
    
    predic_target_dec = data.dec_target_processing([""], char2idx)

    # 에스티메이터 구성
    classifier = tf.estimator.Estimator(
        model_fn=ml.Model,  # 모델정의
        model_dir=DEFINES.check_point_path,  # 체크포인트 위치 등록한다.
        params={  # 모델 쪽으로 파라미터 전달
            'embedding_size': DEFINES.embedding_size,
            'model_hidden_size': DEFINES.model_hidden_size,  # 가중치 크기 
            'ffn_hidden_size': DEFINES.ffn_hidden_size,
            'attention_head_size': DEFINES.attention_head_size,
            'learning_rate': DEFINES.learning_rate,  # 학습률
            'vocabulary_length': vocabulary_length,  # 딕셔너리 크기
            'embedding_size': DEFINES.embedding_size,  # 임베딩 크기
            'layer_size': DEFINES.layer_size,
            'max_sequence_length': DEFINES.max_sequence_length,
            'xavier_initializer': DEFINES.xavier_initializer
        })

    for i in range(DEFINES.max_sequence_length):
        if i > 0:
            predic_output_dec, predic_output_decLength = data.dec_output_processing([answer], char2idx)
            predic_target_dec = data.dec_target_processing([answer], char2idx)
        # 예측
        predictions = classifier.predict(
            input_fn=lambda: data.eval_input_fn(predic_input_enc, predic_output_dec, predic_target_dec, 1))

        answer, finished = data.pred_next_string(predictions, idx2char)

        if finished:
            break

    # 예측한 값을 텍스트로 변경하는 부분이다.
    print("answer: ", answer)

main.py

import tensorflow as tf
import model as ml
import data
import numpy as np
import os
import sys

from configs import DEFINES

DATA_OUT_PATH = './data_out/'


def main(self):
    data_out_path = os.path.join(os.getcwd(), DATA_OUT_PATH)
    os.makedirs(data_out_path, exist_ok=True)
    
    char2idx, idx2char, vocabulary_length = data.load_vocabulary()
    
    train_input, train_label, eval_input, eval_label = data.load_data()

    # 훈련셋 인코딩
    train_input_enc, train_input_enc_length = data.enc_processing(train_input, char2idx)
    # 훈련셋 디코딩 입력 부분 
    train_output_dec, train_output_dec_length = data.dec_output_processing(train_label, char2idx)
    # 훈련셋 디코딩 출력 부분 
    train_target_dec = data.dec_target_processing(train_label, char2idx)

    # 평가셋 인코딩 
    eval_input_enc, eval_input_enc_length = data.enc_processing(eval_input, char2idx)
    # 평가셋 디코딩 입력 부분 
    eval_output_dec, eval_output_dec_length = data.dec_output_processing(eval_label, char2idx)
    # 평가셋 디코딩 출력 부분 
    eval_target_dec = data.dec_target_processing(eval_label, char2idx)

    # 체크 포인트를 저장한 디렉토리를 설정한다.
    check_point_path = os.path.join(os.getcwd(), DEFINES.check_point_path)
    # 디렉토리를 만드는 함수이며 두번째 인자 exist_ok가 True이면 디렉토리가 이미 존재해도 OSError가 발생하지 않는다.
    # exist_ok가 False이면 이미 존재하면 OSError가 발생
    os.makedirs(check_point_path, exist_ok=True)

    # 에스티메이터 구성
    classifier = tf.estimator.Estimator(
        model_fn=ml.Model,  # 모델 등록한다.
        model_dir=DEFINES.check_point_path,  # 체크포인트 위치 등록한다.
        params={  # 모델 쪽으로 파라메터 전달한다.
            'embedding_size': DEFINES.embedding_size,
            'model_hidden_size': DEFINES.model_hidden_size,  
            'ffn_hidden_size': DEFINES.ffn_hidden_size,
            'attention_head_size': DEFINES.attention_head_size,
            'learning_rate': DEFINES.learning_rate,  
            'vocabulary_length': vocabulary_length,  
            'embedding_size': DEFINES.embedding_size,  
            'layer_size': DEFINES.layer_size,
            'max_sequence_length': DEFINES.max_sequence_length,
            'xavier_initializer': DEFINES.xavier_initializer
        })

    # 학습 실행
    classifier.train(input_fn=lambda: data.train_input_fn(
        train_input_enc, train_output_dec, train_target_dec, DEFINES.batch_size), steps=DEFINES.train_steps)

    eval_result = classifier.evaluate(input_fn=lambda: data.eval_input_fn(
        eval_input_enc, eval_output_dec, eval_target_dec, DEFINES.batch_size))
    print('\nEVAL set accuracy: {accuracy:0.3f}\n'.format(**eval_result))

    
    predic_input_enc, predic_input_enc_length = data.enc_processing(["가끔 궁금해"], char2idx)
    
    predic_output_dec, predic_output_decLength = data.dec_output_processing([""], char2idx)
    
    predic_target_dec = data.dec_target_processing([""], char2idx)

    for i in range(DEFINES.max_sequence_length):
        if i > 0:
            predic_output_dec, predic_output_decLength = data.dec_output_processing([answer], char2idx)
            predic_target_dec = data.dec_target_processing([answer], char2idx)
        # 예측을 하는 부분이다.
        predictions = classifier.predict(
            input_fn=lambda: data.eval_input_fn(predic_input_enc, predic_output_dec, predic_target_dec, 1))

        answer, finished = data.pred_next_string(predictions, idx2char)

        if finished:
            break

    # 예측한 값을 텍스트로 변경하는 부분이다.
    print("answer: ", answer)


if __name__ == '__main__':
    tf.logging.set_verbosity(tf.logging.INFO)
    tf.app.run(main)

tf.logging.set_verbosity

0개의 댓글