구글 speech to text를 하기는 했다. 근데 일단 혼자있어서 내일 다른 사람이랑 같이 했을 때 화자 구분이 되는지를 확인해봐야한다. 그래도 정확도가 많이 좋다. 그리고 UI도 실제 채팅처럼 수정하고 시간도 띄워줘야하고, 생체 그래프랑도 연동해야하고.... 할게 많네..
코드는 아래에 보여드릴게요. 다른 블로그나 이런 곳 보면 부분만 보여주는 사람이 많아서 불편해서 저는 전체 코드로 올릴게요! 이게 불편하시다면 다른 글을 참조해주세요.. ㅎㅎ

import os
import asyncio
import json
import io
from websockets import serve
from google.cloud import speech_v1p1beta1 as speech
from pydub import AudioSegment
# Google Cloud Speech-to-Text 설정
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = " 직접 채우셔야합니다." # 서비스 계정 키 파일 경로 설정
# WebSocket 연결 처리 함수
async def handle_connection(websocket, path):
client = speech.SpeechClient()
async for audio_data in websocket:
try:
print(f"Received audio data of length: {len(audio_data)}")
# 오디오 데이터를 메모리 내에서 변환
audio = AudioSegment.from_file(io.BytesIO(audio_data), format="webm")
audio = audio.set_frame_rate(16000).set_channels(1)
# 16-bit PCM 형식으로 메모리 내 오디오 파일 저장
audio_bytes = io.BytesIO()
audio.export(audio_bytes, format="wav", parameters=["-acodec", "pcm_s16le"]) # Ensure 16-bit PCM
audio_bytes.seek(0)
# Google Cloud Speech-to-Text 요청 생성
audio_content = audio_bytes.read()
audio = speech.RecognitionAudio(content=audio_content)
diarization_config = speech.SpeakerDiarizationConfig(
enable_speaker_diarization=True,
min_speaker_count=1,
max_speaker_count=2
)
config = speech.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=16000,
language_code="ko-KR",
diarization_config=diarization_config
)
response = client.recognize(config=config, audio=audio)
# 인식된 결과 처리
result = []
if response.results:
words_info = response.results[-1].alternatives[0].words
current_speaker = None
current_text = ""
start_time = None
for word_info in words_info:
# 화자 태그가 바뀌면 현재 문장 저장
if word_info.speaker_tag != current_speaker:
if current_speaker is not None and current_text:
result.append({
"start": start_time,
"end": word_info.start_time.total_seconds(),
"speaker": f"SPEAKER_{current_speaker}",
"text": current_text.strip()
})
# 새 화자 시작
current_speaker = word_info.speaker_tag
current_text = word_info.word
start_time = word_info.start_time.total_seconds()
else:
current_text += f" {word_info.word}"
# 마지막 화자에 대한 정보 추가
if current_text:
result.append({
"start": start_time,
"end": words_info[-1].end_time.total_seconds(),
"speaker": f"SPEAKER_{current_speaker}",
"text": current_text.strip()
})
# 결과 전송
await websocket.send(json.dumps(result))
print(f"Sent transcription result: {result}")
except Exception as e:
print(f"Error processing audio data: {e}")
await websocket.send(json.dumps({"error": str(e)}))
# WebSocket 서버 시작
async def main():
async with serve(handle_connection, "localhost", 8765):
print("WebSocket server started...")
await asyncio.Future() # 서버 계속 실행
# 서버 실행
asyncio.run(main())
그리고 flutter 코드
import 'dart:html' as html;
import 'dart:js' as js;
import 'package:flutter/material.dart';
import 'dart:typed_data';
import 'dart:convert';
import 'package:web_socket_channel/web_socket_channel.dart';
final WebSocketChannel channel =
WebSocketChannel.connect(Uri.parse('ws://127.0.0.1:8765'));
void main() {
runApp(MaterialApp(
home: SpeakerChatPage(),
));
}
class SpeakerChatPage extends StatefulWidget {
@override
_SpeakerChatPageState createState() => _SpeakerChatPageState();
}
class _SpeakerChatPageState extends State<SpeakerChatPage> {
bool isRecording = false;
html.MediaRecorder? mediaRecorder;
List<Map<String, dynamic>> messages = [];
@override
void initState() {
super.initState();
js.context.callMethod('startAudioProcessing'); // JavaScript VAD 함수 호출
html.window.addEventListener('audioStarted', (event) {
if (!isRecording) {
startRecording();
}
});
html.window.addEventListener('audioStopped', (event) {
if (isRecording) {
stopRecording();
}
});
// WebSocket 데이터 수신 리스너
channel.stream.listen((message) {
final List<dynamic> decodedMessage = json.decode(message);
setState(() {
messages.addAll(decodedMessage.map((data) => {
"speaker": data['speaker'],
"text": data['text'],
"start": data['start'],
"end": data['end'],
}));
});
print("Received data from server: $decodedMessage");
});
}
void startRecording() async {
final stream =
await html.window.navigator.mediaDevices!.getUserMedia({'audio': true});
mediaRecorder = html.MediaRecorder(stream);
mediaRecorder!.addEventListener('dataavailable', (event) {
final blob = (event as html.BlobEvent).data;
if (blob != null) {
final reader = html.FileReader();
reader.readAsArrayBuffer(blob);
reader.onLoadEnd.listen((e) {
final bytes = Uint8List.fromList(reader.result as List<int>);
// WebSocket을 통해 데이터 전송 및 로그 출력
print("Sending audio data of length: ${bytes.length}");
channel.sink.add(bytes);
print("Audio data sent to server.");
});
} else {
print("No data available in the blob.");
}
});
mediaRecorder!.start(); // 녹음 시작 (자동으로 VAD에 의해 중지됨)
setState(() {
isRecording = true;
});
print("Recording started.");
}
void stopRecording() {
mediaRecorder?.stop();
setState(() {
isRecording = false;
});
print("Recording stopped.");
}
@override
void dispose() {
channel.sink.close();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('Speaker Chat')),
body: messages.isEmpty
? Center(child: Text('No data available'))
: ListView.builder(
itemCount: messages.length,
itemBuilder: (context, index) {
final message = messages[index];
return ListTile(
title: Text(
'Speaker ${message['speaker']}',
style: TextStyle(
fontWeight: FontWeight.bold,
color: message['speaker'] == 'SPEAKER_00'
? Colors.blue
: Colors.green,
),
),
subtitle: Text(message['text']),
trailing: Text(
'${message['start'].toStringAsFixed(2)}s - ${message['end'].toStringAsFixed(2)}s',
),
);
},
),
);
}
}