FastAPI 서버 구축

SangYeon Min·2024년 2월 24일
0

PROJECT-HEARUS

목록 보기
2/12
post-thumbnail

Service Architecture

위 구조와 같이 기존의 MVP 모델을 변경하였다.
Express.js는 RN와 FastAPI와 연결되는 허브이며, FastAPI는 ML Pipeline만을 담당한다.
비동기 처리를 지원하며, 실시간 처리에 적합한 FastAPI로 AI-Serving 서버를 변경하였다.


FastAPI 서버 구축

pip install fastapi
pip install "uvicorn[standard]"

필요한 파이썬 패키지들을 설치해준다.

from typing import Union
from fastapi import FastAPI
from starlette.responses import FileResponse
from fastapi.staticfiles import StaticFiles 

app = FastAPI()

app.mount("/images", StaticFiles(directory="images"), name='images')

@app.get("/")
def read_root():
    return FileResponse('./templates/index.html')

이후 위와 같이 테스트 서버 코드를 작성하고 아래 명령어로 실행한다.

python -m uvicorn main:app --reload
python -m uvicorn main:app --host 0.0.0.0 --port 8000 --reload

현재 AI Model들을 구동하기 위해 별도의 PC를 로컬 네트워크에 설치해두었다.
따라서 0.0.0.0 호스트로 서버를 호스팅해준다.


Real-Time STT 구현

위와 같은 플로우를 가지는 Real-Time STT를 구현한다.
RN <> Express 소켓별로 NLP 데이터를 검증해야 하기에 Socket.io를 사용한다.
Express <> FastAPI 실시간 audioBlob 데이터, Buffer 데이터를 주고받기에 WebSocket을 사용한다.

Express <> FastAPI

pip install git+https://github.com/openai/whisper.git

먼저 whisper 라이브러리를 직접 설치한다.

from fastapi import FastAPI
from routers import websocket
from starlette.responses import FileResponse
from fastapi.staticfiles import StaticFiles 

app = FastAPI()

app.include_router(websocket.router)
app.mount("/images", StaticFiles(directory="images"), name='images')

@app.get("/")
def read_root():
    return FileResponse('./templates/index.html')

이후 위와 같이 websocket.router를 연결하여준다.

from fastapi import APIRouter, WebSocket

router = APIRouter()

@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    print("Configuring BE Socket")
    await websocket.accept()
    print("BE Socket Accepted")
    
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"Echo: {data}")

테스트를 위해 간단하게 WebSocket을 연결할 수 있는 코드를 작성한다.

npm install socket.io-client
const socketIO = require('socket.io');
const ioClient = require('socket.io-client');
const Queue = require('better-queue');

const processAudioData = require("./processAudioData");
const processNLText = require("./processNLText");

function initSocket(server, app) {
    // Configure FE Socket
    console.log('Configuring FE Socket');
    const io = socketIO(server, {
        // Connection Timeout
        // Only for previous connection
        pingTimeout: 10000,
        cors: {
            credentials: true,
        },
        allowEIO3: true,
    });
    app.set('io', io);

    // Configure FastAPI Socket
    console.log('Configuring FastAPI Socket');
    const fastAPISocket = ioClient(process.env.FASTAPI_HOST + '/ws', {
        transports: ['websocket'],
        withCredentials: true,
    });

    io.on('connection', (clientSocket) => {
        console.log('FE Client [' + clientSocket.handshake.headers.origin + '] Socket.io Connected');

        const nlpQueue = new Queue((task, done) => {
            processNLText(task.clientSocket, task.textData)
                .then(() => done())
                .catch(err => done(err));
        }, { concurrent: 1 });

        clientSocket.on('transcription', async (audioBlob) => {
            processAudioData(clientSocket, fastAPISocket, audioBlob);
        });

        clientSocket.on('nlProcessing', async (textData) => {
            nlpQueue.push({ clientSocket, textData });
        });

        clientSocket.on('disconnect', () => {
            console.log('FE Client [' + clientSocket.handshake.headers.origin + '] Socket.io disconnected');
        });
    });

    fastAPISocket.on('connect', () => {
        console.log('FastAPI WebSocket Connected');
    });
}

module.exports = initSocket;

이후 Socket.io를 통해 FastAPI 서버에 연결한다.

Troubleshooting : connection rejected

INFO:     ('x.x.x.x', 62037) - "WebSocket /socket.io/?EIO=4&transport=websocket" 403
INFO:     connection rejected (403 Forbidden)
INFO:     connection closed

이때 위와 같이 connection rejected 오류가 발생하여 이를 해결하였다.
먼저 AI Model을 위한 PC에서 8000번 포트에 대한 인바운드 규칙을 허용해주었다.

npm install ws

이때 Socket.ioWebSocket간의 호환성 문제일 것이라 판단하여

const socketIO = require('socket.io');
const WebSocket = require('ws');
const Queue = require('better-queue');

const processAudioData = require("./processAudioData");
const processNLText = require("./processNLText");

function initSocket(server, app) {
    // Configure FE Socket.io
    // FE Client 데이터를 관리할 필요가 있어 Socket.io 사용
    console.log('Configuring FE Socket');
    const io = socketIO(server, {
        // Connection Timeout
        // Only for previous connection
        pingTimeout: 10000,
        cors: {
            credentials: true,
        },
        allowEIO3: true,
    });
    app.set('io', io);

    // Configure FastAPI WebSocket
    // 빈번하게 ML Pipeline과의 데이터 교환이 있어 WebSocket 사용
    console.log('Configuring FastAPI Socket');
    fastAPIURL = process.env.FASTAPI_HOST + '/ws';
    const fastAPISocket = new WebSocket(fastAPIURL);

    fastAPISocket.on('open', function open() {
        console.log('FastAPI WebSocket Connected');
        fastAPISocket.send('Hello Server!');
    });

    fastAPISocket.on('message', function incoming(data) {
        console.log('Received: %s', data);
    });

    fastAPISocket.on('close', function close() {
        console.log('FastAPI Web Socket Disconnected');
    });

    fastAPISocket.on('error', function error(error) {
        console.error('FastAPI Web Socket Error: ', error);
    });
  ...

위와 같이 WebSocket으로 Express.js에서 FastAPI로 연결하는 코드를 구현하여 문제를 해결하였다.

FastAPI WebSocket Connected
FE Client [http://localhost:8081] Socket.io Connected
Received: Echo: Hello Server!

WebSocketManager 구현

// sockets/webSocketManager.js
const WebSocket = require('ws');

// WebSocket Connection
class WebSocketManager {
    constructor() {
        this.socket = null;
    }

    connect(token) {
        console.log('Configuring FastAPI WebSocket')

        const fastAPIURL = process.env.FASTAPI_HOST + '/ws';
        this.socket = new WebSocket(fastAPIURL);

        // 미리 self에 this를 할당
        // scope와 무관하게 class의 socket을 활용할 수 있도록 함
        const self = this;

        this.socket.on('open', function open() {
            console.log('FastAPI WebSocket Connected');
            // 최초 연결시 websocketToken 전송
            self.socket.send(token);
        });

        this.socket.on('close', function close() {
            console.log('FastAPI Web Socket Disconnected');
            // 연결이 닫힐 때 자동으로 재연결 시도
            setTimeout(() => self.connect(token), 1000);
        });

        this.socket.on('error', function error(error) {
            console.error('FastAPI Web Socket Error');
            // 에러 발생시 연결 닫기
            self.socket.close();
        });
    }

    getSocket() {
        return this.socket;
    }
}

const wsManager = new WebSocketManager();

module.exports = wsManager;

이후 WebSocket을 실시간 음성처리로 활용하기 위해 위와 같이 WebSocketManager 클래스를 작성하였다.
또한 추가적으로 연결이 닫힐 때 자동으로 재연결 시도하는 로직을 작성해 연결성을 확보하였다

// /sockets/socketio.js
function initSocket(server, app) {
    // BE Client 식별자 Token
    const websocketToken = generateRandomToken(10);

    // Configure FE Socket.io
    // FE Client 데이터를 관리할 필요가 있어 Socket.io 사용
    console.log('Configuring FE Socket.io');
    const io = socketIO(server, {
        // Connection Timeout
        // Only for previous connection
        pingTimeout: 10000,
        cors: {
            credentials: true,
        },
        allowEIO3: true,
    });
    app.set('io', io);

    // Configure FastAPI WebSocket
    // 빈번하게 ML Pipeline과의 데이터 교환이 있어 WebSocket 사용
    // WebSocketManager로 연결성 확보
    wsManager.connect(websocketToken);
  ...

/sockets/socketio.js에서 wsManager을 통해 연결하게 코드를 수정해주었다.

Configuring FastAPI WebSocket
Server is listening at  3000
FastAPI WebSocket Connected
FE Client [http://localhost:8081] Socket.io Connected
FastAPI Web Socket Disconnected
Configuring FastAPI WebSocket
FastAPI WebSocket Connected
FastAPI Web Socket Disconnected
Configuring FastAPI WebSocket
FastAPI WebSocket Connected

STT Flow 구축

mkdir venvs
cd venvs

python -m venv hearus
cd hearus/Scripts
./activate

원활한 개발환경 구축을 위해 Python 가상환경을 설정해주었다.

source ./venvs/hearus/Scripts/activate

Git Bash의 경우 위 source 명령어를 사용하여 가상환경에 접속한다.

# FastAPI
fastapi
uvicorn

# Whisper Model Requirement
argparse
torch
torchvision
torchaudio
numpy
git+https://github.com/openai/whisper.git

이후 requirements.txt를 위와 같이 다시 한번 더 정리해주었다

  File HEARUS-AI-SERVING\routers\websocket.py", line 6, in <module>
    import whisper
  File "HEARUS-AI-SERVING\venvs\hearus\Lib\site-packages\whisper\__init__.py", line 8, in <module>
    import torch
  File "HEARUS-AI-SERVING\venvs\hearus\Lib\site-packages\torch\__init__.py", line 141, in <module>
    raise err
OSError: [WinError 126] 지정된 모듈을 찾을 수 없습니다. Error loading "HEARUS-AI-SERVING\venvs\hearus\Lib\site-packages\torch\lib\c10.dll" or one of its dependencies.

이때 \torch\lib\c10.dll관련 지정된 모듈을 찾을 수 없는 에러는 아래 vc_redist를 설치하여 해결하였다.
https://aka.ms/vs/16/release/vc_redist.x64.exe

from fastapi import APIRouter, WebSocket

import argparse
import os
import numpy as np
import torch
import whisper

import asyncio
from datetime import datetime, timedelta
from queue import Queue
from time import sleep
import threading

router = APIRouter()

# Thread safe Queue for passing data from the threaded recording callback.
data_queue = Queue()

# EventObject for Stopping Thread
stop_event = threading.Event()

Threading을 통해 WebSocket, Whisper의 기능을 다르게 하기 위한 모듈과 전역변수이다.

data_queue는 웹소켓에서 들어온 데이터를 push하여 지속적인 변환을 가능하게 하고
stop_event는 Thread를 중지하여 메모리 낭비를 막기 위해 사용한다.

def speechToText(whisper_model, websocket):
    print("[Whisper] STT Thread Executed")

    # The last time a recording was retrieved from the queue.
    phrase_time = None

     # Set Timeout, Transition List
    phrase_timeout = 2
    transcription = ['']

    while not stop_event.is_set():
        sleep(0.25)
        try:
            now = datetime.utcnow()
            # Pull raw recorded audio from the queue.
            if not data_queue.empty():
                phrase_complete = False
                # If enough time has passed between recordings, consider the phrase complete.
                # Clear the current working audio buffer to start over with the new data.
                if phrase_time and now - phrase_time > timedelta(seconds=phrase_timeout):
                    phrase_complete = True
                # This is the last time we received new audio data from the queue.
                phrase_time = now
                
                # Combine audio data from queue
                audio_data = b''.join(data_queue.queue)
                data_queue.queue.clear()
                
                # Convert in-ram buffer to something the model can use directly without needing a temp file.
                # Convert data from 16 bit wide integers to floating point with a width of 32 bits.
                # Clamp the audio stream frequency to a PCM wavelength compatible default of 32768hz max.
                audio_np = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0

                # Read the transcription.
                result = whisper_model.transcribe(audio_np, fp16=torch.cuda.is_available(), language="ko")
                text = result['text'].strip()

                # If we detected a pause between recordings, add a new item to our transcription.
                # Otherwise edit the existing one.
                if phrase_complete:
                    transcription.append(text)
                    for line in transcription:
                        print(line)
                    print("----------------------")
                else: 
                    transcription[-1] = text
        except Exception as e:
            print(f"Error processing audio data: {e}")
            break

위 코드는 phrase_timeout을 통해 phrase가 끝났는지 여부를 판단하고 지속적으로 텍스트를 변환하고, 변환이 완료되지 않은 경우에는 text를 수정하여 정확도를 높인다.

https://github.com/davabase/whisper_real_time/tree/master
위 Repository를 참고하여 개발을 진행하였다.

@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    print("[WebSocket] Configuring BE Client WebSocket")
    await websocket.accept()

    # Load Model
    model = "medium"
    whisper_model = whisper.load_model(model)
    print("[Whisper] Model Loaded Successfully")

    # Accept WebSocket
    data = await websocket.receive_text()
    print("[WebSocket] BE Client [" + data + "] Accepted")

    # Execute STT Thread until WebSocket Disconnected
    stt_thread = threading.Thread(target=speechToText, args=(whisper_model, stop_event))
    stt_thread.start()

    # Receive AudioBlob
    try:
        while True:
            audioBlob = await websocket.receive_bytes()
            data_queue.put(audioBlob)
    except Exception as e:
        print(f"[WebSocket] WebSocket error: {e}")
    finally:
        print("[WebSocket] Connection Closed")
        stop_event.set()
        stt_thread.join()
        print("[Whisper] STT Thread Destroyed")

또한 WebSocket연결 이후 stt_thread를 실행시키고, audioBlob 데이터를 받아 data_queue에 지속적으로 push한다.

최종적으로는 Connection이 Closed 되었을 경우 stop_event.set()으로 메모리 낭비를 방지한다.

const WebSocket = require('ws');
const ffmpeg = require('fluent-ffmpeg');
const stream = require('stream');

// WebSocket Connection
class WebSocketManager {
    constructor() {
        this.socket = null;
        this.clientSocket = null;
    }

    ...

    // 메모리 내 버퍼 데이터를 PCM 형식으로 변환
    // 이후 WebSocket을 통해 전달
    transcribeBufferAndSend(bufferData) {
        try {
            const self = this;

            const readableStream = new stream.PassThrough();
            // 스트림에 버퍼 데이터 전달 및 종료
            readableStream.end(bufferData);

            let buffers = [];
            const command = ffmpeg(readableStream)
                .inputFormat('webm')
                .audioCodec('pcm_s16le')
                .audioFrequency(16000)
                .audioChannels(1)
                .format('s16le')
                .on('error', (err) => {
                    console.error('An error occurred: ' + err.message);
                })
                .on('end', () => {
                    console.log('Transcoding succeeded!');
                    // 수집된 데이터 청크를 하나의 버퍼로
                    const audioData = Buffer.concat(buffers);
                    if (self.socket.readyState === WebSocket.OPEN) {
                        // 합쳐진 버퍼 데이터를 WebSocket을 통해 전송
                        self.socket.send(audioData);
                    }
                });

            command.pipe(new stream.Writable({
                write(chunk, encoding, callback) {
                    buffers.push(chunk);
                    callback();
                }
            }));
        } catch (error) {
            console.error('Error during audio processing:', error);
        }
    }

    getSocket() {
        return this.socket;
    }
}

const wsManager = new WebSocketManager();

module.exports = wsManager;

Express.js에서는 WebSocketManager에서 데이터를 전송할 때 transcribeBufferAndSend함수를 구현하여 Whisper에서 사용할 수 없는 테스트용 Vue.js의 MediaRecorderwebm 코덱의 데이터를 PCM 형식으로 변환하고 전송하여 FastAPI 서버에서 바로 활용할 수 있도록 한다.

https://ffmpeg.org/download.html#build-windows
해당 함수를 사용하기 위해서는 ffmpeg를 설치해야 한다.

    // Configure FastAPI WebSocket
    // 빈번하게 ML Pipeline과의 데이터 교환이 있어 WebSocket 사용
    // WebSocketManager로 연결성 확보
    wsManager.connect(websocketToken);

    // FE socket.io connection
    io.on('connection', (clientSocket) => {
        ...

        clientSocket.on('transcription', async (audioBlob) => {
            // wsManager를 통해 FastAPI에 audioBlob 데이터 전송
            wsManager.transcribeBufferAndSend(audioBlob);
        });

wsManager.transcribeBufferAndSend(audioBlob)으로 데이터를 전송하면
위 이미지와 같이 이전보다 더욱 향상된 정확도로 실시간 STT 처리를 하는 것을 볼 수 있다.

FE Transporting

FastAPI

def speechToText(whisper_model, stop_event):
    print("[Whisper] STT Thread Executed")

    while not stop_event.is_set():
        sleep(0.25)
        try:
            ...

                # Read the transcription.
                result = whisper_model.transcribe(audio_np, fp16=torch.cuda.is_available(), language="ko")
                transcrition_result = result['text'].strip()
                print("[Whisper] Transition Result '" + transcrition_result + "'")

                if transcrition_result != '':
                    result_queue.put(transcrition_result)

위와 같이 Thread Safe한 result_queuetranscrition_result 를 push한다

@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    ...

    # Receive AudioBlob
    try:
        while True:
            audioBlob = await websocket.receive_bytes()
            data_queue.put(audioBlob)

            while not result_queue.empty():
                print('[WebSocket] Send Result from Result_Queue')
                result = result_queue.get()
                await websocket.send_text(result)
                
            # Sleep for other async functions
            await asyncio.sleep(0)

    except Exception as e:
        print(f"[WebSocket] WebSocket error: {e}")
    finally:
        stop_event.set()
        stt_thread.join()

        # clear stop_event for next Socket Connection
        stop_event.clear()

        while not data_queue.empty():
                data_queue.get()

        while not result_queue.empty():
                result_queue.get()

        print("[WebSocket] Connection Closed")

websocket.receive_bytes()이후 result_queue를 검사하여 데이터를 전송한다.

또한 stop_eventstt_thread.join()이후에 초기화하여 Express.js의 연결이 끊겼을 때 지속적으로 연결할 수 있도록 코드를 수정한다.

Express.js

class WebSocketManager {
    constructor() {
        this.socket = null;
        this.clientSocket = null;
    }

    // Set Fe Client Socket.io
    setClientSocket(clientSocket) {
        console.log("[WebSocket] WebSocketManager Client Socket Configured");
        this.clientSocket = clientSocket;
    }

    connect(token) {
        ...

        // message를 받으면 Client Socket으로 전송
        this.socket.on('message', (result) => {
            console.log("[WebSocket] Transition Result : " + result);
            self.clientSocket.emit('transitionResult', result.toString());
        });
      ...
    }

또한 WebSocketManager에서 데이터를 받았을 때 FE로 데이터를 전송하여 Socket.io와 WebSocket STT 로직 구현을 완료한다.

0개의 댓글