python - google STT 정확도 테스트

0
post-thumbnail

Google STT를 이용하면서 일부 무료로 사용할 수 있어 감사하게 쓰고 있으면서도 한 편으로는 낮은 정확도와 소수점 첫째 자리까지밖에 지원하지 않는 Timestamp값에 불편함을 감수하고 사용하고 있기도 합니다. 그렇기에 어떻게 하면 더 정확도를 높일 수 있을지 고민하던 중에 음성을 잘라서 분석한다면 더 잘 분석할 수 있지 않을까? 라는 결론에 도달합니다. STT의 작동 원리를 잘 모르긴 하지만 계산해야 할 앞뒤 문맥이 적어지면 더 잘 분석 할 수 있지 않을까라는 가설입니다. 그래서 내가 가지고 있는 자료 중 문학작품을 낭송한 자료가 있어서 코드를 짜 테스트를 해보았고 그 과정을 공유해봅니다.


def get_stt_time_offsets(wave: bytes) -> list:
    '''
    음성을 STT 적용하여 문장과 word_time_offsets을 구합니다.
    :param wave: bytes 형태의 wave
    :retrun: sentesnce, word_list: STT문장, word_list = [단어, start_time, end_time]
    '''
    # Instantiates a client
    client = speech.SpeechClient()
    sample_rate = 44100

    audio = speech.RecognitionAudio(content=wave)

    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=sample_rate,
        language_code="ko-KR",
        enable_word_time_offsets=True,
    )

    # Detects speech in the audio wave
    response = client.recognize(config=config, audio=audio)
    sentence = []
    word_list = []
    for result in response.results:
        alternative = result.alternatives[0]
        sentence.append(result.alternatives[0].transcript)
        for word_info in alternative.words:
            word_time = []
            word = word_info.word
            start_time = word_info.start_time.total_seconds()
            end_time = word_info.end_time.total_seconds()
            word_time.append(word)
            word_time.append(start_time)
            word_time.append(end_time)
            word_list.append(word_time)

    return sentence, word_list

이 함수는 google STT 사용 예제를 조금 변형한 것이고, 어절별 timestamp를 구하기 위해서 만든 함수입니다. 입력으로 wav를 bytes로 변환한 것을 사용합니다. wav를 filename으로 사용하려면 "content=wave"부분을 filename으로 바꾸면 됩니다. 여기서 return값으로 sentence만 사용할 것입니다.


def get_wav_list(path) -> list :
    """
    디렉토리의 wav 파일 리스트를 구한다.
    """
    file_list = os.listdir(path)
    sort_file_list = sorted(file_list)
    wav_list = [file for file in sort_file_list if file.endswith('.wav')]
    return wav_list


def get_json_list(path) -> list :
    """
    디렉토리의 json 파일 리스트를 구한다.
    """    
    file_list = os.listdir(path)
    sort_file_list = sorted(file_list)
    json_list = [file for file in sort_file_list if file.endswith('.json')]

    corpus_list = []
    for i in json_list:
        with open((path+i), "r", encoding='UTF8') as f:
            json_object = json.load(f)
            corpus_list.append(json_object)
    return corpus_list

그다음 분석에 사용할 음성 wav파일의 리스트와 음성 텍스트 데이터가 있는 json 파일 리스트를 구하는 함수를 작성하였습니다.

그런데 문장 단위로 분할된 여러 음성 파일 중에서 같은 원음성끼리 비교를 해야하기 때문에 이름이 같은 것끼리 모아야 합니다. 그래서 같은 이름을 가진 파일끼리 분류하는 함수를 작성했습니다.

def get_same_id(split_wav_list) -> list:
    """
    디렉토리에서 같은 wav에서 쪼개진 것들끼리 모아준다. 
    """
    doc_id = []
    doc_id2 = ""
    f_doc_id = []

    for idx, split in enumerate(split_wav_list):
        if idx != 0:
            if split[:11] == split_wav_list[idx-1][:11]:
                if doc_id2 != "":
                    doc_id.append(doc_id2)
                    doc_id.append(split)
                    doc_id2 = ""
                else:
                    doc_id.append(split)
                if idx == len(split_wav_list)-1:
                    f_doc_id.append(doc_id)
                    doc_id = []
            else:
                doc_id2 = split
                f_doc_id.append(doc_id)
                doc_id = []
        else:
            if split[:11] == split_wav_list[idx][:11]:
                doc_id.append(split)

    return f_doc_id

네, 이제 준비는 끝났습니다. 위에서 만든 3개의 함수를 가지고 wav파일을 text로 변환해봅시다.
총 3단계로 구성하였습니다.

문단단위(전체) wav 파일 STT
문장단위(쪼갠) wav 파일 STT
출력된 것 data 저장 - json


if __name__ == '__main__':
    path1 = '/home/stt_wav/'
    path2 = '/home/stt_split_file/'
    wav_list = get_wav_list(path1)
    split_wav_list = get_wav_list(path2)
    json_list = get_json_list(path2)
    # split된 디렉토리에서 이름이 같은 대본끼리 2차원 리스트로 저장하기
    f_doc_id = same_id(split_wav_list)
    data = []

먼저 데이터를 준비해봅시다. wav 리스트, json 객체를 불러옵니다.

1. 전체 wav 파일

    sentences  = []
    for wav in wav_list:
        wave = get_audio_bytes(path1+wav)
        sent, stt_time_offsets = get_stt_time_offsets(wave)
        sentences.append(sent)

전체 wav파일 리스트를 반복문으로 STT해서 text를 구합니다.

2. split_wav To stt

sentences_split = []
    for doc_id in f_doc_id:
        split_sent = []
        for id in doc_id:
            wave = get_audio_bytes(path2+id)
            sent, stt_time_offsets = get_stt_time_offsets(wave)
            split_sent.append(sent)
        sentences_split.append(split_sent)

쪼갠 wav 리스트를 반복문으로 STT해서 text를 구합니다.

3. original_text 특수문자 제거

    ori_text = []
    for txt in json_list:
        tt = []
        text = txt['text']['original']
        text2 = text.split('\n')
        for t in text2:
            text3 = re.findall(r'[A-Za-z0-9가-힣]+', t) 
            join_txt = " ".join(text3)
            tt.append(join_txt)
        ori_text.append(tt)

STT결과가 원문 텍스트와 일치하는지 확인하기 위해서 원본 텍스트에서 문장부호를 제거합니다. 왜냐하면 STT 출력값에는 한글만 출력되기 때문입니다. 텍스트를 어절별로 split하고 정규식을 사용하여 text3 변수에 한글만 뽑은 뒤, 다시 join 해서 저장합니다.

4. 전체 wav, 쪼갠 wav의 stt, 원문 text를 zip으로 묶어서 data에 append

    for txt, sent, cut in zip(ori_text, sentences, sentences_split):
        split_sent = cut   
        for idx, (t, c) in enumerate(zip(txt, cut)):
            if len(c) != 0:    
                match_rate = f'{SequenceMatcher(None, t, c[0]).ratio()*100:.1f}%'
                split_sent[idx].append(match_rate)
            else:
                split_sent[idx].append('0')

        data.append({
            "original_text": txt,
            "stt_all_Time": sent,
            "stt_split_sent": split_sent
        })

    with open('ip_stt_test.json', 'w', encoding='utf-8') as make_file:
        json.dump(data, make_file, ensure_ascii=False, indent="\t")

자, 이제 전체 wav, 쪼갠 wav의 STT, 원문 text를 zip으로 묶어서 data에 append합니다. 그리고 data를 json으로 저장합니다. 여기서 SequenceMatcher 라이브러리를 사용하여 두 문자열의 유사성을 수치화할 수 있습니다. STT 출력값에서 데이터에 빈값이 있을 수 있으므로 유사도는 0으로 넣어줬습니다.


"original_text": [
            "그런데 기이하게도 그 의문은 고통스러운 것이었다",
            "그의 삶을 아니 삶 이상의 것을 바쳤던 모든 문제가 하나의 환상같이만 보였다",
            "날 좀 내버려 둬 이 짐승 같은 놈들아",
            "그는 냉혹하게 말했다",
            "당신들은 그 독주로 영혼을 망가뜨려서 짐승이 돼 버린 거야",
            "당신들하고는 이미 끝났소",
            "오래전에 당신들 가슴 속을 다 뒤져보고 거기서 내가 추구하던 것을 찾지 못했소",
            "다들 꺼져 버리쇼"
        ],
        "stt_all_Time": [
            "그런데 기이하게도 그 임무는 고통스러운 것이었다
            그의 삶을 아니 3 이상의 것을 바쳤던 모든 문제가 하나의 환상 같지만 
            날 좀 내버려 둬 짐승 같은 놈들아
            - 4 / 말했다 
            당신들은 그 독주로 영혼을 망가뜨려서 짐승이 돼 버린 거야
            당신들하고는 이미 끝났어
            오래전에 당신들 가슴속을 다 뒤져 보고 거기서 추구하던 것을 찾지 못했어
            아들 꺼져 버렸어"
        ],
        "stt_split_sent": [
            [
                "그런데 기이하게도 그 임무는 고통스러운 것이었다",
                "88.5%"
            ],
            [
                "그의 삶을 아리 참 이상해 것을 받쳤던 모든 문제가 하나의 환상 같지만 보였다",
                "87.1%"
            ],
            [
                "날 좀 내버려 둬 이짐승 같은 놈들아",
                "97.6%"
            ],
            [
                "84 / 말했다",
                "52.6%"
            ],
            [
                "당신들은 그 독주로 영혼을 망가뜨려서 짐승이 돼 버린 거야",
                "100.0%"
            ],
            [
                "당신들 하고는 이미 끝났어",
                "88.9%"
            ],
            [
                "오래 전에 당신들 가슴속을 다 뒤져보고 거기서 내가 추구하던 것을 찾지 못했어",
                "95.3%"
            ],
            [
                "다들 꺼져 버려",
                "82.4%"
            ]

이 분석에 20개의 wav파일을 사용했기 때문에 통계적으로 일반화할 수는 없습니다. 다만, 보면 알겠지만, 전체 wav파일을 통으로 stt하는 것보다 문장, 발화 단위로 쪼개는 것이 더 정확도가 높아보입니다.

분석에 사용된 코드

import os 
import json
import re
from difflib import SequenceMatcher
from typing import List
from google.cloud import speech


def get_stt_time_offsets(wave: bytes) -> list:
    '''
    음성을 STT 적용하여 문장과 word_time_offsets을 구한다.
    :param wave: bytes 형태의 wave
    :retrun: sentesnce, word_list: STT문장, word_list = [단어, start_time, end_time]
    '''
    # Instantiates a client
    client = speech.SpeechClient()
    sample_rate = 44100

    audio = speech.RecognitionAudio(content=wave)

    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=sample_rate,
        language_code="ko-KR",
        enable_word_time_offsets=True,
    )

    # Detects speech in the audio wave
    response = client.recognize(config=config, audio=audio)
    sentence = []
    word_list = []
    for result in response.results:
        alternative = result.alternatives[0]
        sentence.append(result.alternatives[0].transcript)
        for word_info in alternative.words:
            word_time = []
            word = word_info.word
            start_time = word_info.start_time.total_seconds()
            end_time = word_info.end_time.total_seconds()
            word_time.append(word)
            word_time.append(start_time)
            word_time.append(end_time)
            word_list.append(word_time)

    return sentence, word_list


def get_wav_list(path) -> list :
    file_list = os.listdir(path)
    sort_file_list = sorted(file_list)
    wav_list = [file for file in sort_file_list if file.endswith('.wav')]
    return wav_list


def get_json_list(path) -> list :
    """
    디렉토리의 json 파일 리스트를 구한다.
    """    
    # 디렉토리 경로 지정
    file_list = os.listdir(path)
    sort_file_list = sorted(file_list)
    json_list = [file for file in sort_file_list if file.endswith('.json')]

    corpus_list = []
    for i in json_list:
        with open((path+i), "r", encoding='UTF8') as f:
            json_object = json.load(f)
            corpus_list.append(json_object)
    return corpus_list


def same_id(split_wav_list) -> list:
    """
    디렉토리에서 같은 wav에서 쪼개진 것들끼리 모아준다. 
    """
    doc_id = []
    doc_id2 = ""
    f_doc_id = []

    for idx, split in enumerate(split_wav_list):
        if idx != 0:
            if split[:11] == split_wav_list[idx-1][:11]:
                if doc_id2 != "":
                    doc_id.append(doc_id2)
                    doc_id.append(split)
                    doc_id2 = ""
                else:
                    doc_id.append(split)
                if idx == len(split_wav_list)-1:
                    f_doc_id.append(doc_id)
                    doc_id = []
            else:
                doc_id2 = split
                f_doc_id.append(doc_id)
                doc_id = []
        else:
            if split[:11] == split_wav_list[idx][:11]:
                doc_id.append(split)

    return f_doc_id


if __name__ == '__main__':
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="speechtotext-fa4625d173.json"
    path1 = '/home/ubuntu/yugwon/IP_boundary/stt_wav/'
    path2 = '/home/ubuntu/yugwon/IP_boundary/stt_split_file/'
    wav_list = get_wav_list(path1)
    split_wav_list = get_wav_list(path2)
    json_list = get_json_list(path2)
    # split된 디렉토리에서 이름이 같은 대본끼리 2차원 리스트로 저장하기
    f_doc_id = same_id(split_wav_list)
    data = []

    # 1. 전체 wav 파일 stt
    sentences  = []
    for wav in wav_list:
        wave = get_audio_bytes(path1+wav)
        sent, stt_time_offsets = get_stt_time_offsets(wave)
        sentences.append(sent)

    # 2. split_wav To stt
    sentences_split = []
    for doc_id in f_doc_id:
        split_sent = []
        for id in doc_id:
            wave = get_audio_bytes(path2+id)
            sent, stt_time_offsets = get_stt_time_offsets(wave)
            split_sent.append(sent)
        sentences_split.append(split_sent)

    # 3. original_text 특수문자 제거
    ori_text = []
    for txt in json_list:
        tt = []
        text = txt['text']['original']
        text2 = text.split('\n')
        for t in text2:
            text3 = re.findall(r'[A-Za-z0-9가-힣]+', t) 
            join_txt = " ".join(text3)
            tt.append(join_txt)
        ori_text.append(tt)

    # 전체 wav, 쪼갠 wav의 stt, 원문 text zip으로 묶어서 data에 append하기 
    for txt, sent, cut in zip(ori_text, sentences, sentences_split):
        split_sent = cut   
        for idx, (t, c) in enumerate(zip(txt, cut)):
            if len(c) != 0:    
                match_rate = f'{SequenceMatcher(None, t, c[0]).ratio()*100:.1f}%'
                split_sent[idx].append(match_rate)
            else:
                split_sent[idx].append('0')

        data.append({
            "original_text": txt,
            "stt_all_Time": sent,
            "stt_split_sent": split_sent
        })

    with open('ip_stt_test.json', 'w', encoding='utf-8') as make_file:
        json.dump(data, make_file, ensure_ascii=False, indent="\t")
profile
나 응애👶 개발자, 딥린이👨‍💻, 언어 연구자 👨‍🎓

0개의 댓글