위 구조와 같이 기존의 MVP 모델을 변경하였다.
Express.js는 RN와 FastAPI와 연결되는 허브이며, FastAPI
는 ML Pipeline만을 담당한다.
비동기 처리를 지원하며, 실시간 처리에 적합한 FastAPI로 AI-Serving 서버를 변경하였다.
pip install fastapi
pip install "uvicorn[standard]"
필요한 파이썬 패키지들을 설치해준다.
from typing import Union
from fastapi import FastAPI
from starlette.responses import FileResponse
from fastapi.staticfiles import StaticFiles
app = FastAPI()
app.mount("/images", StaticFiles(directory="images"), name='images')
@app.get("/")
def read_root():
return FileResponse('./templates/index.html')
이후 위와 같이 테스트 서버 코드를 작성하고 아래 명령어로 실행한다.
python -m uvicorn main:app --reload
python -m uvicorn main:app --host 0.0.0.0 --port 8000 --reload
현재 AI Model들을 구동하기 위해 별도의 PC를 로컬 네트워크에 설치해두었다.
따라서 0.0.0.0
호스트로 서버를 호스팅해준다.
위와 같은 플로우를 가지는 Real-Time STT를 구현한다.
RN <> Express
소켓별로 NLP 데이터를 검증해야 하기에 Socket.io를 사용한다.
Express <> FastAPI
실시간 audioBlob 데이터, Buffer 데이터를 주고받기에 WebSocket을 사용한다.
pip install git+https://github.com/openai/whisper.git
먼저 whisper
라이브러리를 직접 설치한다.
from fastapi import FastAPI
from routers import websocket
from starlette.responses import FileResponse
from fastapi.staticfiles import StaticFiles
app = FastAPI()
app.include_router(websocket.router)
app.mount("/images", StaticFiles(directory="images"), name='images')
@app.get("/")
def read_root():
return FileResponse('./templates/index.html')
이후 위와 같이 websocket.router
를 연결하여준다.
from fastapi import APIRouter, WebSocket
router = APIRouter()
@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
print("Configuring BE Socket")
await websocket.accept()
print("BE Socket Accepted")
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
테스트를 위해 간단하게 WebSocket
을 연결할 수 있는 코드를 작성한다.
npm install socket.io-client
const socketIO = require('socket.io');
const ioClient = require('socket.io-client');
const Queue = require('better-queue');
const processAudioData = require("./processAudioData");
const processNLText = require("./processNLText");
function initSocket(server, app) {
// Configure FE Socket
console.log('Configuring FE Socket');
const io = socketIO(server, {
// Connection Timeout
// Only for previous connection
pingTimeout: 10000,
cors: {
credentials: true,
},
allowEIO3: true,
});
app.set('io', io);
// Configure FastAPI Socket
console.log('Configuring FastAPI Socket');
const fastAPISocket = ioClient(process.env.FASTAPI_HOST + '/ws', {
transports: ['websocket'],
withCredentials: true,
});
io.on('connection', (clientSocket) => {
console.log('FE Client [' + clientSocket.handshake.headers.origin + '] Socket.io Connected');
const nlpQueue = new Queue((task, done) => {
processNLText(task.clientSocket, task.textData)
.then(() => done())
.catch(err => done(err));
}, { concurrent: 1 });
clientSocket.on('transcription', async (audioBlob) => {
processAudioData(clientSocket, fastAPISocket, audioBlob);
});
clientSocket.on('nlProcessing', async (textData) => {
nlpQueue.push({ clientSocket, textData });
});
clientSocket.on('disconnect', () => {
console.log('FE Client [' + clientSocket.handshake.headers.origin + '] Socket.io disconnected');
});
});
fastAPISocket.on('connect', () => {
console.log('FastAPI WebSocket Connected');
});
}
module.exports = initSocket;
이후 Socket.io
를 통해 FastAPI 서버에 연결한다.
INFO: ('x.x.x.x', 62037) - "WebSocket /socket.io/?EIO=4&transport=websocket" 403
INFO: connection rejected (403 Forbidden)
INFO: connection closed
이때 위와 같이 connection rejected
오류가 발생하여 이를 해결하였다.
먼저 AI Model을 위한 PC에서 8000
번 포트에 대한 인바운드 규칙을 허용해주었다.
npm install ws
이때 Socket.io
와 WebSocket
간의 호환성 문제일 것이라 판단하여
const socketIO = require('socket.io');
const WebSocket = require('ws');
const Queue = require('better-queue');
const processAudioData = require("./processAudioData");
const processNLText = require("./processNLText");
function initSocket(server, app) {
// Configure FE Socket.io
// FE Client 데이터를 관리할 필요가 있어 Socket.io 사용
console.log('Configuring FE Socket');
const io = socketIO(server, {
// Connection Timeout
// Only for previous connection
pingTimeout: 10000,
cors: {
credentials: true,
},
allowEIO3: true,
});
app.set('io', io);
// Configure FastAPI WebSocket
// 빈번하게 ML Pipeline과의 데이터 교환이 있어 WebSocket 사용
console.log('Configuring FastAPI Socket');
fastAPIURL = process.env.FASTAPI_HOST + '/ws';
const fastAPISocket = new WebSocket(fastAPIURL);
fastAPISocket.on('open', function open() {
console.log('FastAPI WebSocket Connected');
fastAPISocket.send('Hello Server!');
});
fastAPISocket.on('message', function incoming(data) {
console.log('Received: %s', data);
});
fastAPISocket.on('close', function close() {
console.log('FastAPI Web Socket Disconnected');
});
fastAPISocket.on('error', function error(error) {
console.error('FastAPI Web Socket Error: ', error);
});
...
위와 같이 WebSocket으로 Express.js에서 FastAPI로 연결하는 코드를 구현하여 문제를 해결하였다.
FastAPI WebSocket Connected
FE Client [http://localhost:8081] Socket.io Connected
Received: Echo: Hello Server!
// sockets/webSocketManager.js
const WebSocket = require('ws');
// WebSocket Connection
class WebSocketManager {
constructor() {
this.socket = null;
}
connect(token) {
console.log('Configuring FastAPI WebSocket')
const fastAPIURL = process.env.FASTAPI_HOST + '/ws';
this.socket = new WebSocket(fastAPIURL);
// 미리 self에 this를 할당
// scope와 무관하게 class의 socket을 활용할 수 있도록 함
const self = this;
this.socket.on('open', function open() {
console.log('FastAPI WebSocket Connected');
// 최초 연결시 websocketToken 전송
self.socket.send(token);
});
this.socket.on('close', function close() {
console.log('FastAPI Web Socket Disconnected');
// 연결이 닫힐 때 자동으로 재연결 시도
setTimeout(() => self.connect(token), 1000);
});
this.socket.on('error', function error(error) {
console.error('FastAPI Web Socket Error');
// 에러 발생시 연결 닫기
self.socket.close();
});
}
getSocket() {
return this.socket;
}
}
const wsManager = new WebSocketManager();
module.exports = wsManager;
이후 WebSocket
을 실시간 음성처리로 활용하기 위해 위와 같이 WebSocketManager
클래스를 작성하였다.
또한 추가적으로 연결이 닫힐 때 자동으로 재연결 시도하는 로직을 작성해 연결성을 확보하였다
// /sockets/socketio.js
function initSocket(server, app) {
// BE Client 식별자 Token
const websocketToken = generateRandomToken(10);
// Configure FE Socket.io
// FE Client 데이터를 관리할 필요가 있어 Socket.io 사용
console.log('Configuring FE Socket.io');
const io = socketIO(server, {
// Connection Timeout
// Only for previous connection
pingTimeout: 10000,
cors: {
credentials: true,
},
allowEIO3: true,
});
app.set('io', io);
// Configure FastAPI WebSocket
// 빈번하게 ML Pipeline과의 데이터 교환이 있어 WebSocket 사용
// WebSocketManager로 연결성 확보
wsManager.connect(websocketToken);
...
/sockets/socketio.js
에서 wsManager
을 통해 연결하게 코드를 수정해주었다.
Configuring FastAPI WebSocket
Server is listening at 3000
FastAPI WebSocket Connected
FE Client [http://localhost:8081] Socket.io Connected
FastAPI Web Socket Disconnected
Configuring FastAPI WebSocket
FastAPI WebSocket Connected
FastAPI Web Socket Disconnected
Configuring FastAPI WebSocket
FastAPI WebSocket Connected
mkdir venvs
cd venvs
python -m venv hearus
cd hearus/Scripts
./activate
원활한 개발환경 구축을 위해 Python 가상환경을 설정해주었다.
source ./venvs/hearus/Scripts/activate
Git Bash의 경우 위 source
명령어를 사용하여 가상환경에 접속한다.
# FastAPI
fastapi
uvicorn
# Whisper Model Requirement
argparse
torch
torchvision
torchaudio
numpy
git+https://github.com/openai/whisper.git
이후 requirements.txt
를 위와 같이 다시 한번 더 정리해주었다
File HEARUS-AI-SERVING\routers\websocket.py", line 6, in <module>
import whisper
File "HEARUS-AI-SERVING\venvs\hearus\Lib\site-packages\whisper\__init__.py", line 8, in <module>
import torch
File "HEARUS-AI-SERVING\venvs\hearus\Lib\site-packages\torch\__init__.py", line 141, in <module>
raise err
OSError: [WinError 126] 지정된 모듈을 찾을 수 없습니다. Error loading "HEARUS-AI-SERVING\venvs\hearus\Lib\site-packages\torch\lib\c10.dll" or one of its dependencies.
이때 \torch\lib\c10.dll
관련 지정된 모듈을 찾을 수 없는 에러는 아래 vc_redist
를 설치하여 해결하였다.
https://aka.ms/vs/16/release/vc_redist.x64.exe
from fastapi import APIRouter, WebSocket
import argparse
import os
import numpy as np
import torch
import whisper
import asyncio
from datetime import datetime, timedelta
from queue import Queue
from time import sleep
import threading
router = APIRouter()
# Thread safe Queue for passing data from the threaded recording callback.
data_queue = Queue()
# EventObject for Stopping Thread
stop_event = threading.Event()
Threading을 통해 WebSocket
, Whisper
의 기능을 다르게 하기 위한 모듈과 전역변수이다.
data_queue
는 웹소켓에서 들어온 데이터를 push하여 지속적인 변환을 가능하게 하고
stop_event
는 Thread를 중지하여 메모리 낭비를 막기 위해 사용한다.
def speechToText(whisper_model, websocket):
print("[Whisper] STT Thread Executed")
# The last time a recording was retrieved from the queue.
phrase_time = None
# Set Timeout, Transition List
phrase_timeout = 2
transcription = ['']
while not stop_event.is_set():
sleep(0.25)
try:
now = datetime.utcnow()
# Pull raw recorded audio from the queue.
if not data_queue.empty():
phrase_complete = False
# If enough time has passed between recordings, consider the phrase complete.
# Clear the current working audio buffer to start over with the new data.
if phrase_time and now - phrase_time > timedelta(seconds=phrase_timeout):
phrase_complete = True
# This is the last time we received new audio data from the queue.
phrase_time = now
# Combine audio data from queue
audio_data = b''.join(data_queue.queue)
data_queue.queue.clear()
# Convert in-ram buffer to something the model can use directly without needing a temp file.
# Convert data from 16 bit wide integers to floating point with a width of 32 bits.
# Clamp the audio stream frequency to a PCM wavelength compatible default of 32768hz max.
audio_np = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0
# Read the transcription.
result = whisper_model.transcribe(audio_np, fp16=torch.cuda.is_available(), language="ko")
text = result['text'].strip()
# If we detected a pause between recordings, add a new item to our transcription.
# Otherwise edit the existing one.
if phrase_complete:
transcription.append(text)
for line in transcription:
print(line)
print("----------------------")
else:
transcription[-1] = text
except Exception as e:
print(f"Error processing audio data: {e}")
break
위 코드는 phrase_timeout
을 통해 phrase가 끝났는지 여부를 판단하고 지속적으로 텍스트를 변환하고, 변환이 완료되지 않은 경우에는 text를 수정하여 정확도를 높인다.
https://github.com/davabase/whisper_real_time/tree/master
위 Repository를 참고하여 개발을 진행하였다.
@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
print("[WebSocket] Configuring BE Client WebSocket")
await websocket.accept()
# Load Model
model = "medium"
whisper_model = whisper.load_model(model)
print("[Whisper] Model Loaded Successfully")
# Accept WebSocket
data = await websocket.receive_text()
print("[WebSocket] BE Client [" + data + "] Accepted")
# Execute STT Thread until WebSocket Disconnected
stt_thread = threading.Thread(target=speechToText, args=(whisper_model, stop_event))
stt_thread.start()
# Receive AudioBlob
try:
while True:
audioBlob = await websocket.receive_bytes()
data_queue.put(audioBlob)
except Exception as e:
print(f"[WebSocket] WebSocket error: {e}")
finally:
print("[WebSocket] Connection Closed")
stop_event.set()
stt_thread.join()
print("[Whisper] STT Thread Destroyed")
또한 WebSocket
연결 이후 stt_thread
를 실행시키고, audioBlob
데이터를 받아 data_queue
에 지속적으로 push한다.
최종적으로는 Connection이 Closed 되었을 경우 stop_event.set()
으로 메모리 낭비를 방지한다.
const WebSocket = require('ws');
const ffmpeg = require('fluent-ffmpeg');
const stream = require('stream');
// WebSocket Connection
class WebSocketManager {
constructor() {
this.socket = null;
this.clientSocket = null;
}
...
// 메모리 내 버퍼 데이터를 PCM 형식으로 변환
// 이후 WebSocket을 통해 전달
transcribeBufferAndSend(bufferData) {
try {
const self = this;
const readableStream = new stream.PassThrough();
// 스트림에 버퍼 데이터 전달 및 종료
readableStream.end(bufferData);
let buffers = [];
const command = ffmpeg(readableStream)
.inputFormat('webm')
.audioCodec('pcm_s16le')
.audioFrequency(16000)
.audioChannels(1)
.format('s16le')
.on('error', (err) => {
console.error('An error occurred: ' + err.message);
})
.on('end', () => {
console.log('Transcoding succeeded!');
// 수집된 데이터 청크를 하나의 버퍼로
const audioData = Buffer.concat(buffers);
if (self.socket.readyState === WebSocket.OPEN) {
// 합쳐진 버퍼 데이터를 WebSocket을 통해 전송
self.socket.send(audioData);
}
});
command.pipe(new stream.Writable({
write(chunk, encoding, callback) {
buffers.push(chunk);
callback();
}
}));
} catch (error) {
console.error('Error during audio processing:', error);
}
}
getSocket() {
return this.socket;
}
}
const wsManager = new WebSocketManager();
module.exports = wsManager;
Express.js에서는 WebSocketManager
에서 데이터를 전송할 때 transcribeBufferAndSend
함수를 구현하여 Whisper
에서 사용할 수 없는 테스트용 Vue.js의 MediaRecorder
의 webm
코덱의 데이터를 PCM
형식으로 변환하고 전송하여 FastAPI 서버에서 바로 활용할 수 있도록 한다.
https://ffmpeg.org/download.html#build-windows
해당 함수를 사용하기 위해서는 ffmpeg를 설치해야 한다.
// Configure FastAPI WebSocket
// 빈번하게 ML Pipeline과의 데이터 교환이 있어 WebSocket 사용
// WebSocketManager로 연결성 확보
wsManager.connect(websocketToken);
// FE socket.io connection
io.on('connection', (clientSocket) => {
...
clientSocket.on('transcription', async (audioBlob) => {
// wsManager를 통해 FastAPI에 audioBlob 데이터 전송
wsManager.transcribeBufferAndSend(audioBlob);
});
wsManager.transcribeBufferAndSend(audioBlob)
으로 데이터를 전송하면
위 이미지와 같이 이전보다 더욱 향상된 정확도로 실시간 STT 처리를 하는 것을 볼 수 있다.
def speechToText(whisper_model, stop_event):
print("[Whisper] STT Thread Executed")
while not stop_event.is_set():
sleep(0.25)
try:
...
# Read the transcription.
result = whisper_model.transcribe(audio_np, fp16=torch.cuda.is_available(), language="ko")
transcrition_result = result['text'].strip()
print("[Whisper] Transition Result '" + transcrition_result + "'")
if transcrition_result != '':
result_queue.put(transcrition_result)
위와 같이 Thread Safe한 result_queue
에 transcrition_result
를 push한다
@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
...
# Receive AudioBlob
try:
while True:
audioBlob = await websocket.receive_bytes()
data_queue.put(audioBlob)
while not result_queue.empty():
print('[WebSocket] Send Result from Result_Queue')
result = result_queue.get()
await websocket.send_text(result)
# Sleep for other async functions
await asyncio.sleep(0)
except Exception as e:
print(f"[WebSocket] WebSocket error: {e}")
finally:
stop_event.set()
stt_thread.join()
# clear stop_event for next Socket Connection
stop_event.clear()
while not data_queue.empty():
data_queue.get()
while not result_queue.empty():
result_queue.get()
print("[WebSocket] Connection Closed")
websocket.receive_bytes()
이후 result_queue
를 검사하여 데이터를 전송한다.
또한 stop_event
를 stt_thread.join()
이후에 초기화하여 Express.js의 연결이 끊겼을 때 지속적으로 연결할 수 있도록 코드를 수정한다.
class WebSocketManager {
constructor() {
this.socket = null;
this.clientSocket = null;
}
// Set Fe Client Socket.io
setClientSocket(clientSocket) {
console.log("[WebSocket] WebSocketManager Client Socket Configured");
this.clientSocket = clientSocket;
}
connect(token) {
...
// message를 받으면 Client Socket으로 전송
this.socket.on('message', (result) => {
console.log("[WebSocket] Transition Result : " + result);
self.clientSocket.emit('transitionResult', result.toString());
});
...
}
또한 WebSocketManager
에서 데이터를 받았을 때 FE로 데이터를 전송하여 Socket.io와 WebSocket STT 로직 구현을 완료한다.