[Chatbot] Data Preprocessing

박경민·2023년 4월 5일
0

[ChatBot Project]

목록 보기
11/11
post-custom-banner
# Building a Chatbot 

#라이브러리 가져오기 
import numpy as np
#import tensorflow as tf 
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
import re
import time 


#### 1. Data Preprocessing #### 
lines = open('movie_lines.txt', encoding= 'utf-8', errors = 'ignore').read().split('\n')
conversations = open('movie_conversations.txt', encoding= 'utf-8', errors = 'ignore').read().split('\n')

# 각각의 대사와 아이디를 매칭하는 딕셔너리
id2line = {}
for line in lines:
    _line = line.split(' +++$+++ ')
    if len(_line) == 5:
        id2line[_line[0]] = _line[4] 
 

# 모든 대사 번호 리스트인 conversations_ids 
conversations_ids = []

for conversation in conversations[:-1]:
    _conversation = conversation.split(' +++$+++ ')[-1][1:-1].replace("'", "").replace(" ", "")
    conversations_ids.append(_conversation.split(','))
    
# 질문과 응답 목록 개별 생성 
questions, answers = [], []

for conversation in conversations_ids:
    for i in range(len(conversation)-1): #+1 이용할 것이므로 -1
        # 딕셔너리에서 매핑해서 어펜드 (i를 왜 중복해서 넣는지 잘 모르겠지만, 일단 넘어가자.)
        questions.append(id2line[conversation[i]]) # 질문 
        answers.append(id2line[conversation[i+1]]) # 질문 다음이 무조건 대답이라고 봄.


# 첫 텍스트 클리닝 함수
def clean_text(text):
    text = text.lower()
    text = re.sub(r"i'm","i am", text)
    text = re.sub(r"he's", "he is", text)
    text = re.sub(r"she's", "she is", text)
    text = re.sub(r"that's", "that is", text)
    text = re.sub(r"what's", "what is", text)
    text = re.sub(r"where's", "where is", text)
    text = re.sub(r"\'ll", " will", text)
    text = re.sub(r"\'ve", " have", text)
    text = re.sub(r"\'re", " are", text)
    text = re.sub(r"\'d", " would", text)
    text = re.sub(r"won't", "will not", text)
    text = re.sub(r"can't", "cannot", text)
    text = re.sub(r"[-()\"#/@;:<>{}+=~|.?,]", "", text)
    return text 
    
# questions cleaning 
clean_questions = []
for question in questions:
    clean_questions.append(clean_text(question))

# answers cleaning 
clean_answers = []
for answer in answers:
    clean_answers.append(clean_text(answer))

# 단어 횟수 세어 딕셔너리에 저장.
word2count = {}

# 정제된 질문, 답변 목록을 순회하며 빈도수 확인
for question in clean_questions:
    for word in question.split(): 
        if word not in word2count:
            word2count[word] = 1
        else:
            word2count[word] += 1 
            
for answer in clean_answers:
    for word in answer.split(): 
        if word not in word2count:
            word2count[word] = 1
        else:
            word2count[word] += 1 

# 임곗값 설정하고 임계값 아래 수 버리기, 정수로 매핑 
threshold = 20 

questionwords2int = {}

word_number = 0 
for word, count in word2count.items():
    if count >= threshold: # 빈도가 20 이상인 단어에 대해서
        questionwords2int[word] = word_number # 리스트에 입력, 이때 값은 더이상 빈도가 아닌 고유한 값.
        word_number += 1 

answerswords2int = {}

word_number = 0 
for word, count in word2count.items():
    if count >= threshold: # 빈도가 20 이상인 단어에 대해서
        answerswords2int[word] = word_number # 리스트에 입력, 이때 값은 더이상 빈도가 아닌 고유한 값.
        word_number += 1 

# 두 딕셔너리에 last tokens 만들기, 그러면 seq2seq 모델에 사용가능
# 시작할 때 SOS, 끝낼 때 EOS 로 인코딩 진행. 
# 바로 위 딕셔너리를 생성할 때 제외했던 임곗값이 넘지않은 단어들에 대해 (5%미만) 이들을 하나로 묶어 토큰으로 추가해 마지막에 넣어주자. 
tokens = ['<PAD>', '<EOS>', '<OUT>', '<SOS>']

for token in tokens:
    # 파이널 토큰을 생성하여 각 딕셔너리에 추가. 
    questionwords2int[token] = len(questionwords2int) + 1 
    answerswords2int[token] = len(answerswords2int) + 1 

# 모델을 구축할 때 인버스 매핑(숫자 <> 단어)하므로 기존 딕셔너리를 뒤집는다. 
# answerwords2int 만 뒤집는다. 
# 딕셔너리 인버스 요령을 알아두자. 

# 기존 딕셔너리의 키 w, 기존 딕셔너리의 값 w_i 을 가져와 w_i: w 로 키 <> 값 바꿔줌. 
answerints2word = {w_i: w for w, w_i in answerswords2int.items()}
# int > word 인 딕셔너리가 만들어졌다. 



# 문자열 끝 토큰인 EOS 를 추가해야함. 
# 모든 응답의 끝에 EOS 추가 (디코딩 레이어의 종단에 EOS 필요. EOS가 답변의 끝과 디코딩 부분 지정함.)
for i in range(len(clean_answers)): #문장 하나를 가져오기 위해 i 사용
    clean_answers[i] += ' <EOS>' #문장 끝나고 하나 띄우고.


# 모든 질문과 응답의 각 단어를 고유한 정수로 변환. 
# 후에 훈련 성능 최적화를 위해 길이별로 정렬
# out 토큰으로 필터링된 모든 단어를 교체

# clean question 의 클린 질문들에 대해 qustion_to_int 의 이차원 정수 배열로 각각 변환.
questions_into_int = [] # 정수로 나타낼 새로운 리스트 
for question in clean_questions:
    ints = [] #문장 단위 리스트 (결국 문장 단위로 다뤄야 하므로 이렇게 한다.)
    for word in question.split():
        if word not in questionwords2int: # clean 에는 있는데 딕셔너리에 없는 단어로써, 빈도가 안되어 없는 단어에 대해
            ints.append(questionwords2int['<OUT>']) #이 단어는 그냥 '<OUT>' 의 정수에 해당하는 값을 int 에 추가.
        else: 
            ints.append(questionwords2int[word]) # 그렇지 않은 경우 해당단어의 정수 값을 매핑해 추가. 
    questions_into_int.append(ints) # 하나의 문장이 다 끝나면 넣고 새로운 문장을 받으러 int 는 초기화. 
        
# clean_answers 의 클린 답변 문장들에 대해 answer_to_int 의 이차원 정수 배열로 각각 변환.
answers_into_int = []
for answer in clean_answers:
    ints = []
    for word in answer:
        if word not in answerswords2int:
            ints.append(answerswords2int['<OUT>'])
        else:
            ints.append(answerswords2int[word])
    answers_into_int.append(ints)

# 질문과 답변의 길이별로 질문과 답변 정렬(훈련 중 패딩의 양 줄임.)
sorted_clean_questions = []
sorted_clean_answers = []

for length in range(1, 25 + 1):
    for i in enumerate(questions_into_int):
        if len(i[1]) == length: 
            sorted_clean_questions.append(questions_into_int[i[0]])
            sorted_clean_answers.append(answers_into_int[i[0]])
            
profile
Mathematics, Algorithm, and IDEA for AI research🦖
post-custom-banner

0개의 댓글