Spring Boot 실시간 음성 전송 로직

SangYeon Min·2024년 5월 12일
0

PROJECT-HEARUS

목록 보기
5/12
post-thumbnail

위 구조로 기존의 Express.js 서버의 실시간 음성 데이터 전송 로직을 구현한다.

Netty-socketio

dependencies {
	...
	// Socket
	implementation 'com.corundumstudio.socketio:netty-socketio:2.0.3'
}

현재는 Vue.js 테스트 Client로 기능을 구현하고 있지만
추후 React-Native로 기능을 구현할 것이기 때문에 Socket.io를 사용한다.

이때 Java에서 사용할 수 있는 netty-socketio 라이브러리를 설치한다.

// application-private.properties

# Socket
SOCKET_IO_SERVER=127.0.0.1
SOCKET_IO_PORT=9092

netty-socketio 서버가 Open될 주소와 포트를 명시한다.

package com.hearus.hearusspring.data.dto;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SocketMsgDTO {
    String message;

    @Override
    public String toString(){
        return message;
    }
}

이후 테스트를 위해 SocketMsgDTO를 위와 같이 구현한다.

package com.hearus.hearusspring.common.socket;

import com.corundumstudio.socketio.SocketIOServer;
import com.hearus.hearusspring.common.environment.ConfigUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SocketIOConfig {
    @Bean
    public SocketIOServer socketIOServer(ConfigUtil configUtil) {
        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
        config.setHostname(configUtil.getProperty("socketio.host"));
        config.setPort(Integer.parseInt(configUtil.getProperty("socketio.port")));
        config.setTransports(Transport.WEBSOCKET, Transport.POLLING);

        return new SocketIOServer(config);
    }

    @Bean
    public CorsFilter corsFilter() {
        UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
        CorsConfiguration config = new CorsConfiguration();
        config.setAllowCredentials(true);
        config.setAllowedOrigins(Arrays.asList("*"));
        config.setAllowedHeaders(Arrays.asList("*"));
        config.setAllowedMethods(Arrays.asList("GET", "POST", "PUT", "DELETE", "OPTIONS"));
        source.registerCorsConfiguration("/**", config);
        return new CorsFilter(source);
    }
}

또한 위와 같이 CORS 관련 Filter를 설정하는 함수를 선언하고
SocketIOServer 서버를 반환하여주는 SockeIOConfig Class를 구현한다.

package com.hearus.hearusspring.common.socket;

import com.corundumstudio.socketio.HandshakeData;
import com.corundumstudio.socketio.SocketIONamespace;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.listener.ConnectListener;
import com.corundumstudio.socketio.listener.DataListener;
import com.corundumstudio.socketio.listener.DisconnectListener;
import com.hearus.hearusspring.data.dto.SocketMsgDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class NettySocketio {
    private final SocketIONamespace namespace;

    @Autowired
    public NettySocketio(SocketIOServer server){
        this.namespace = server.addNamespace("/socketio");
        this.namespace.addConnectListener(onConnected());
        this.namespace.addDisconnectListener(onDisconnected());
        this.namespace.addEventListener("stt", SocketMsgDTO.class, onReceived());
    }

    private DataListener<SocketMsgDTO> onReceived() {
        return (client, data, ackSender) -> {
            log.info("[NettySocketio]-[onReceived]-[{}] Received message '{}'", client.getSessionId().toString(), data);
            namespace.getBroadcastOperations().sendEvent("stt", data);
        };
    }

    private ConnectListener onConnected() {
        return client -> {
            HandshakeData handshakeData = client.getHandshakeData();
            log.info("[NettySocketio]-[onConnected]-[{}] Connected to Socket Module through '{}'", client.getSessionId().toString(), handshakeData.getUrl());
        };
    }

    private DisconnectListener onDisconnected() {
        return client -> {
            log.info("[NettySocketio]-[onDisconnected]-[{}] Disconnected from Socket Module.", client.getSessionId().toString());
        };
    }
}

NettySocketio Class를 통해 위와 같으 Listener 형태로 Connection이 활성화 되었을 때, 메세지를 받았을 때 등의 이벤트를 처리한다.

package com.hearus.hearusspring.common.socket;

import com.corundumstudio.socketio.SocketIOServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class SocketIOCommandLineRunner implements CommandLineRunner {
    private final SocketIOServer server;

    @Autowired
    public SocketIOCommandLineRunner(SocketIOServer server) {
        log.info("[SocketIOCommandLineRunner] SocketIOServer Initialized");
        this.server = server;
    }

    @Override
    public void run(String... args) throws Exception {
        log.info("[SocketIOCommandLineRunner] SocketIOServer Running");
        server.start();
    }
}

최종적으로 CommandLineRunner를 통해 SocketIO 서버를 구동한다.
위와 같이 정상적으로 서버가 구동되는 것을 볼 수 있고
Postman을 통해 Spring 서버에서 정상적으로 메세지를 받는 것을 볼 수 있다.

Vue main.js

import { createApp } from 'vue';
import App from './App.vue';
import io from 'socket.io-client';

console.log("process.env.VUE_APP_BACKEND_HOST : ", process.env.VUE_APP_BACKEND_HOST)
console.log("process.env.VUE_APP_SOCKETIO_HOST : ", process.env.VUE_APP_SOCKETIO_HOST)
console.log("window.VUE_APP_K8S : ", window.VUE_APP_K8S);

let socketUrl = process.env.NODE_ENV === 'development' 
  ? '${process.env.VUE_APP_SOCKETIO_HOST}' 
  : '${window.VUE_APP_K8S}';
  console.log("socketURL : ", socketUrl)

const socket = io(socketUrl, {
  path: '/socket.io',
  transports: ['websocket'],
  extraHeaders: {
    'Sec-WebSocket-Extensions': 'permessage-deflate; client_max_window_bits'
  },
  reconnection: true,
  reconnectionDelay: 5000,
  reconnectionAttempts: Infinity
});

const app = createApp(App);
app.provide('socket', socket);
app.config.productionTip = false;

app.mount('#app');

Vue.js의 경우 위와 같이 transportsextraHeaders를 추가하여 Spring 서버의 Socket.io를 통한 소켓 접속을 establish할 수 있다.

WebRTC Proxy

WebRTC를 사용하면 개방형 표준 외에 작동하는 실시간 통신 기능을 애플리케이션에 추가할 수 있습니다. 동영상, 음성, 일반 데이터를 동종 앱 간에 전송할 수 있어 개발자가 강력한 음성 및 영상 통신 솔루션을 구축할 수 있습니다. 이 기술은 모든 최신 브라우저뿐만 아니라 모든 주요 플랫폼의 기본 클라이언트에서 사용할 수 있습니다. WebRTC를 뒷받침하는 기술은 개방형 웹 표준으로 구현되며 모든 주요 브라우저에서 일반 자바스크립트 API로 제공됩니다. Android 및 iOS 애플리케이션과 같은 네이티브 클라이언트의 경우 동일한 기능을 제공하는 라이브러리를 사용할 수 있습니다. WebRTC 프로젝트는 오픈소스이며 Apple, Google, Microsoft, Mozilla 등의 지원을 받습니다. Google WebRTC팀에서 관리하는 페이지입니다.
https://webrtc.org/?hl=ko

WebRTC 프로젝트를 직접 활용하는 것은 아니지만 해당 구조를 차용하여

Spring Boot에서는 기존 Express.js 서버와는 다르게 React-Native 클라이언트와 FastAPI 서버 간의 Audio Data, 메세지를 포워딩 해주는 방식으로 실시간 음성인식을 구현하였다.

추후 프로젝트의 코드를 리팩토링하는 과정에서 WebRTC 프로젝트를 적용하여, Spring 서버를 Sinaling 서버로 활용하여 서비스의 성능을 향상시킬 계획이다.

Spring

// application-private.properties

FAST_API_ENDPOINT=ws://localhost:8000

이를 구현하기 위해 위와 같이 FAST_API_ENDPOINT를 정의한다.

package com.hearus.hearusspring.common.nettySocketio;

import com.corundumstudio.socketio.HandshakeData;
import com.corundumstudio.socketio.SocketIONamespace;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.listener.ConnectListener;
import com.corundumstudio.socketio.listener.DataListener;
import com.corundumstudio.socketio.listener.DisconnectListener;
import com.hearus.hearusspring.common.environment.ConfigUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class WebRTCProxy {

    private final String FastAPIEndpoint;
    private final SocketIOServer server;
    private final SocketIONamespace namespace;
    private WebSocket fastAPIWebSocket;
    private final ScheduledExecutorService reconnectExecutor;

    @Autowired
    public WebRTCProxy(SocketIOServer server, ConfigUtil configUtil) {
        this.server = server;
        this.FastAPIEndpoint = configUtil.getProperty("FAST_API_ENDPOINT");
        this.namespace = server.addNamespace("/webrtc");
        this.namespace.addConnectListener(onConnected());
        this.namespace.addDisconnectListener(onDisconnected());
        this.namespace.addEventListener("transcription", byte[].class, audioListener());
        this.reconnectExecutor = Executors.newSingleThreadScheduledExecutor();

        connectToFastAPI();
    }

    private ConnectListener onConnected() {
        return client -> {
            HandshakeData handshakeData = client.getHandshakeData();
            log.info("[WebRTCProxy]-[onConnected]-[{}] Connected to WebRTCProxy Socketio through '{}'", client.getSessionId().toString(), handshakeData.getUrl());
        };
    }

    private DisconnectListener onDisconnected() {
        return client -> {
            log.info("[WebRTCProxy]-[onDisconnected]-[{}] Disconnected from WebRTCProxy Socketio Module.", client.getSessionId().toString());
        };
    }

    private DataListener<byte[]> audioListener() {
        return (client, audioData, ackSender) -> {
            try {
                if (fastAPIWebSocket != null && !fastAPIWebSocket.isInputClosed()) {
                    log.info("[WebRTCProxy]-[audioListener] Forwarding audio data to FastAPI server");
                    fastAPIWebSocket.sendBinary(ByteBuffer.wrap(audioData), true);
                } else {
                    log.error("[WebRTCProxy]-[audioListener] FastAPI WebSocket is not connected");
                }
            } catch(Exception ex){
                log.error("[WebRTCProxy]-[audioListener] Invalid Audio Data");
                ex.printStackTrace();
            }
        };
    }

    private void connectToFastAPI() {
        HttpClient client = HttpClient.newHttpClient();
        CompletionStage<WebSocket> webSocketCompletionStage = client.newWebSocketBuilder()
                .buildAsync(URI.create(FastAPIEndpoint + "/ws"), new WebSocketListener(namespace));

        webSocketCompletionStage.thenAccept(webSocket -> {
            fastAPIWebSocket = webSocket;
            log.info("[WebRTCProxy] Connected to FastAPI server [{}]",FastAPIEndpoint);
        }).exceptionally(ex -> {
            log.error("[WebRTCProxy] Failed to connect to FastAPI server {}", fastAPIWebSocket, ex);
            reconnectToFastAPI();
            return null;
        });
    }

    private void reconnectToFastAPI() {
        reconnectExecutor.schedule(() -> {
            connectToFastAPI();
        }, 5, TimeUnit.SECONDS);
    }
}

이후 위와 같이 WebRTCProxy 함수를 구현하여 Spring 서버가 단순히 메세지들을 포워딩할 수 있는 코드를 작성한다.

현재는 Socket.io와 WebSocket을 하나의 클래스에서 관리하는 것으로 구현하였다.

package com.hearus.hearusspring.common.nettySocketio;

import com.corundumstudio.socketio.SocketIONamespace;
import lombok.extern.slf4j.Slf4j;

import java.net.http.WebSocket;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletionStage;

@Slf4j
public class WebSocketListener implements WebSocket.Listener {
    private final SocketIONamespace namespace;

    public WebSocketListener(SocketIONamespace namespace) {
        this.namespace = namespace;
    }

    @Override
    public void onOpen(WebSocket webSocket) {
        webSocket.sendText("Spring Server", true);
    }

    @Override
    public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
        log.info("[WebSocketListener] Received text from FastAPI server: {}", data);
        namespace.getBroadcastOperations().sendEvent("sttResult", data.toString());
        return null;
    }

    @Override
    public CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message) {
        webSocket.request(1);
        return null;
    }

    @Override
    public CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message) {
        webSocket.request(1);
        return null;
    }

    @Override
    public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
        log.warn("[WebSocketListener] WebSocket connection to FastAPI server closed: {} - {}", statusCode, reason);
        return null;
    }

    @Override
    public void onError(WebSocket webSocket, Throwable error) {
        log.error("[WebSocketListener] WebSocket error from FastAPI server", error);
    }
}

위와 같이 WebSocketListener를 구현하고, FastAPI 서버에 연결한다.

# FastAPI
INFO:     connection open
[WebSocket] BE Client [Spring Server] Accepted
[Whisper] STT Thread Executed

Vue.js

VUE_APP_SOCKETIO_HOST=http://localhost:9092/webrtc

또한 Vue.js의 환경변수를 위와 같이 재 정의하여 Spring 서버에 연결할 수 있다.

const socket = io(socketUrl, {
  path: '/socket.io',
  transports: ['websocket'],
  extraHeaders: {
    'Sec-WebSocket-Extensions': 'permessage-deflate; client_max_window_bits'
  },
  reconnection: true,
  reconnectionDelay: 5000,
  reconnectionAttempts: Infinity
});

이때 Socket.iopath'/socket.io'로 선언해주어야 연결할 수 있다.


AudioData Encoding

TroubleShooting

2024-05-08T20:56:02.489+09:00 ERROR 39420 --- [HEARUS-SPRING] [ntLoopGroup-3-3] c.c.socketio.handler.InPacketHandler     : Error during data processing. Client sessionId: de0d2978-542e-4d9e-b5de-9144603a5e40, data: ...

java.lang.IllegalStateException: null
	...
2024-05-08T20:56:02.497+09:00 ERROR 39420 --- [HEARUS-SPRING] [ntLoopGroup-3-3] c.c.s.listener.DefaultExceptionListener  : null
java.lang.IllegalStateException: null
	...

만약 기존의 Express.js와 연결할 때의 방식으로 Vue.js의 MediaRecorder의 AudioBlob을 그대로 전송한다면 위와 같이 Spring 서버에서 해당 데이터를 정상적으로 받아들이지 못한다.

Vue.js

			...
			this.mediaRecorder.ondataavailable = (event) => {
                if (event.data.size > 0 && event.data != null) {
                  const reader = new FileReader();
                  reader.onload = (e) => {
                    // Access ArrayBuffer directly
                    const arrayBuffer = e.target.result;
              
                    if (arrayBuffer) {
                      // Use window.btoa for browser compatibility
                      const base64EncodedData = window.btoa(String.fromCharCode(...new Uint8Array(arrayBuffer)));
                      this.socket.emit('transcription', base64EncodedData);
                    } else {
                      console.error("[mediaRecorder]-[ondataavailable] ArrayBuffer is null");
                    }
                  };
                  reader.readAsArrayBuffer(event.data);
                }
              };     

따라서 위와 같이 audioBlob 데이터를 byte[] 형태
즉, Uint8Array 형태로 변환하고 base64 형식으로 인코딩하여 Spring 서버에 전송한 후 Spring 서버에서 이를 다시 디코딩하고 ByteArray 형식으로 변환하는 방식으로 구현하였다.


WebSocket Configuration

기존 java.net.http.WebSocket 라이브러리를 사용할 경우 주기적인 HealthCheck를 통해 웹 소켓과의 연결을 시도해도 Socket이 약 60초가 지나면 1006에러 코드와 함께 closed되는 문제가 발생하였다.

따라서 java-websocket:Java-WebSocket 라이브러리를 활용하여 WebSocket 연결을 유지할 수 있도록 로직을 수정하였다.

Spring

dependencies {
	// Socket
	implementation 'org.java-websocket:Java-WebSocket:1.5.0'

	// JSON
	implementation group: 'org.json', name: 'json', version: '20231013'
}

먼저 위와 같이 build.gradle에 JSON을 포함한 의존성을 추가한다.

// WebRTCProxy.java
	...
    @Autowired
    public WebRTCProxy(SocketIOServer server, ConfigUtil configUtil)
    {
		...
        connectFastAPI();
    }

	private void connectFastAPI(){
        new Timer().scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    if(fastAPIWebSocket == null || fastAPIWebSocket.isClosed()) {
                        fastAPIWebSocket = new WebSocketUtil(
                                new URI(FastAPIEndpoint + "/ws"),
                                new Draft_6455(),
                                namespace
                        );
                        fastAPIWebSocket.connectBlocking();
                        fastAPIWebSocket.send("Spring Server");
                    }
                } catch (Exception e) {
                    // Handle connection exceptions
                    e.printStackTrace();
                }
            }
        }, 0, 60);
    }

이후 connectFastAPI() 메소드에서 scheduleAtFixedRate를 통해 매 60초마다 현재 WebSocket이 연결되어 있는지 여부를 검사하고, 재연결을 시도할 수 있도록 구현하였다.

2024-05-12T19:44:20.554+09:00  INFO 37120 --- [HEARUS-SPRING] [ctReadThread-60] c.h.h.common.webrtcProxy.WebSocketUtil   : [WebSocketUtil] WebSocket connection Closed
java.net.ConnectException: Connection timed out: connect
	at java.base/sun.nio.ch.Net.connect0(Native Method)
	...
2024-05-12T19:44:41.628+09:00  INFO 37120 --- [HEARUS-SPRING] [tReadThread-224] c.h.h.common.webrtcProxy.WebSocketUtil   : [WebSocketUtil] WebSocket connection Closed
2024-05-12T19:44:47.753+09:00  INFO 37120 --- [HEARUS-SPRING] [tReadThread-225] c.h.h.common.webrtcProxy.WebSocketUtil   : [WebSocketUtil] WebSocket connection Opened

위와 같이 FastAPI 서버를 중지했다가 다시 가동해도 정상적으로 재연결하는 모습을 볼 수 있다.

// WebSocketUtil.java
package com.hearus.hearusspring.common.webrtcProxy;

import com.corundumstudio.socketio.SocketIONamespace;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft;
import org.java_websocket.handshake.ServerHandshake;

import java.net.URI;

@Slf4j
public class WebSocketUtil extends WebSocketClient {
    private final SocketIONamespace namespace;
    public WebSocketUtil(URI serverUri, Draft protocolDraft, SocketIONamespace namespace) {
        super(serverUri, protocolDraft);
        this.namespace = namespace;
    }

    @Override
    public void onMessage(String message) {
        log.info("[WebSocketUtil] Received Messegae {}", message);
    }

    @Override
    public void onOpen( ServerHandshake handshake ) {
        log.info("[WebSocketUtil] WebSocket connection Opened");
    }

    @Override
    public void onClose( int code, String reason, boolean remote ) {
        log.info("[WebSocketUtil] WebSocket connection Closed");
    }

    @Override
    public void onError( Exception ex ) {
        ex.printStackTrace();
    }

}

이후 WebSocketUtil Class를 구현하여 현 단계에서는 FE 클라이언트로부터의 Audio Data가 FastAPI로 전송될 수 있는 인프라를 구현하였고
정상적으로 AudioData가 FastAPI 서버로 포워딩 되는 것을 볼 수 있다.


FFmpeg Audio Transfer

// ffmpeg
implementation group: 'net.bramp.ffmpeg', name: 'ffmpeg', version: '0.7.0'

Vue.js로부터의 데이터를 Whisper에서 사용 가능하도록 변환하기 위해
Spring에서 사용할 수 있는 ffmpeg 라이브러리를 impelment 한다.

package com.hearus.hearusspring.common.ffmpeg;

import lombok.extern.slf4j.Slf4j;
import net.bramp.ffmpeg.FFmpeg;
import net.bramp.ffmpeg.FFmpegExecutor;
import net.bramp.ffmpeg.builder.FFmpegBuilder;
import net.bramp.ffmpeg.job.FFmpegJob;
import org.springframework.stereotype.Component;

import java.io.*;

@Slf4j
@Component
public class AudioConverter {

    public byte[] convertAudio(byte[] decodedBytes) throws Exception {
        // Create an input stream from the decoded bytes
        InputStream inputStream = new ByteArrayInputStream(decodedBytes);

        // Create a temporary file to store the audio data
        File tempInputFile = File.createTempFile("temp_audio_input", ".webm");
        tempInputFile.deleteOnExit();

        // Write the audio data from the InputStream to the temporary file
        try (FileOutputStream fos = new FileOutputStream(tempInputFile)) {
            byte[] buffer = new byte[4096];
            int bytesRead;
            while ((bytesRead = inputStream.read(buffer)) != -1) {
                fos.write(buffer, 0, bytesRead);
            }
        }

        // Create a temporary file to store the converted audio data
        File tempOutputFile = File.createTempFile("temp_audio_output", ".raw");
        tempOutputFile.deleteOnExit();

        // Use FFmpeg to convert the audio data
        FFmpeg ffmpeg = new FFmpeg("src/main/resources/ffmpeg/bin/ffmpeg");
        FFmpegBuilder builder = new FFmpegBuilder()
                .setInput(tempInputFile.getAbsolutePath())
                .overrideOutputFiles(true)
                .addOutput(tempOutputFile.getAbsolutePath())
                .setAudioCodec("pcm_s16le")
                .setAudioChannels(1)
                .setAudioSampleRate(16000)
                .setFormat("s16le")
                .done();

        // Execute the FFmpeg command
        FFmpegExecutor executor = new FFmpegExecutor(ffmpeg);
        FFmpegJob job = executor.createJob(builder);

        job.run();

        // Read the converted audio data from the temporary output file
        byte[] convertedBytes;
        try (FileInputStream fis = new FileInputStream(tempOutputFile)) {
            convertedBytes = fis.readAllBytes();
        }

        return convertedBytes;
    }
}

이후 위와 같이 AudioConverter를 통해 Audio Data를 변환할 수 있게 한다.

이때 Spring Server에서는 FileIO로 임시로 음성 데이터를 저장할 파일을 생성하고
해당 파일을 읽을 FileOutputStream을 생성한다.

이후 FFmpegBuilder를 선언한 뒤 FFmpegExecutor를 통해 MediaRecorder로 녹음된 파일을 Whisper 모델에서 사용할 수 있는 pcm_s16le Codec, s16le 포맷으로 변환하는 작업을 생성한다.

작업이 완료되면 FileInputStream을 통해 convertedBytes로 데이터를 읽어온 후 리턴한다.

...
	private DataListener<String> audioListener() {
        return (client, audioData, ackSender) -> {
            try {
                if (audioData == null) {
                    log.error("[WebRTCProxy]-[Socketio] Audio data is null");
                    return;
                }

                // Decode Base64 string to byte array
                byte[] decodedBytes = Base64.getDecoder().decode(audioData);
                // Convert to codec : pcm_s16le, format : s16le
                byte[] convertedBytes = audioConverter.convertAudio(decodedBytes);

                if (fastAPIWebSocket != null && fastAPIWebSocket.isOpen()) {
                    // Wrap converted bytes in ByteBuffer
                    ByteBuffer byteBuffer = ByteBuffer.wrap(convertedBytes);

                    log.info("[WebRTCProxy]-[Socketio] Forwarding audio data to FastAPI server [{}]", convertedBytes.length);

                    // Send binary data using fastAPISession
                    fastAPIWebSocket.send(byteBuffer);
                } else {
                    log.error("[WebRTCProxy]-[Socketio] FastAPI WebSocket is not connected");
                }
            } catch (Exception ex) {
                log.error("[WebRTCProxy]-[Socketio] Exception", ex);
            }
        };
    }

최종적으로 데이터를 저장하기 이전 AudioConverter를 통해 convertedBytes를 만들어주고
ByteBuffer 형태로 감싼 후 전송하여준다.

Trouble Shooting

2024-05-12T18:50:42.805+09:00  INFO 20896 --- [HEARUS-SPRING] [ntLoopGroup-3-1] net.bramp.ffmpeg.RunProcessFunction      : C:\ffmpeg\bin -version
2024-05-12T18:50:42.806+09:00 ERROR 20896 --- [HEARUS-SPRING] [ntLoopGroup-3-1] c.h.h.common.webrtcProxy.WebRTCProxy     : [WebRTCProxy]-[Socketio] Exception

java.io.IOException: Cannot run program "C:\ffmpeg\bin": CreateProcess error=5, 액세스가 거부되었습니다

이때, 위와 같이 C 드라이브에 설치한 ffmpeg 파일에 대한 접근 오류가 발생한다면
위와 같이 프로젝트의 resources에 ffmpeg를 포함시키고

FFmpeg ffmpeg = new FFmpeg("src/main/resources/ffmpeg/bin/ffmpeg");

FFmpeg 객체를 생성할 때 C 드라이브가 아닌 프로젝트의 경로로 설정해주면
위와 같이 정상적으로 Audio Data가 변환되는 것을 볼 수 있고
해당 데이터를 Whisper 라이브러리에서 텍스트로 변환하는 모습을 볼 수 있다.

git lfs

remote: error: File src/main/resources/ffmpeg/bin/ffmpeg.exe is 129.84 MB; this exceeds GitHub's file size limit of 100.00 MB
remote: error: File src/main/resources/ffmpeg/bin/ffprobe.exe is 129.69 MB; this exceeds GitHub's file size limit of 100.00 MB
remote: error: File src/main/resources/ffmpeg/bin/ffplay.exe is 129.68 MB; this exceeds GitHub's file size limit of 100.00 MB

이때 ffmpeg/bin 내부의 실행 파일들은 각각 100MB가 넘기 때문에
git의 Large File Storage 설정을 통해 Repo에 PUSH 해주어야 한다.

git lfs install

git lfs track "*.exe"
git lfs track "*.ffpreset"

먼저 위와 같이 lfs를 install 해주고 track을 통해 별도로 트랙할 파일을 설정한다.

# .gitattributes
*.exe filter=lfs diff=lfs merge=lfs -text
*.ffpreset filter=lfs diff=lfs merge=lfs -text

이후 위와 같이 .gitattributes가 생성되는 것을 볼 수 있으며

git rm --cached *.exe

$ git push
Uploading LFS objects:  62% (5/8), 196 MB | 27 MB/s
...

만약 이미 add한 파일들이라면 위와 같이 rm --cached <filePath> 캐시를 제거해 주고 다시 addcommit 해 주어야 정상적으로 LFS Object로 인식되어 PUSH되는 것을 볼 수 있다.


Socket.io Client Control

	// WebSocket
    private void connectFastAPI(Timer timer, SocketIOClient client){
        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    if(fastAPIWebSocket == null || fastAPIWebSocket.isClosed()) {
                        fastAPIWebSocket = new WebSocketUtil(
                                new URI(FastAPIEndpoint + "/ws"),
                                new Draft_6455(),
                                client
                        );
                        fastAPIWebSocket.connectBlocking();
                        fastAPIWebSocket.send("Spring Server");
                    }
                } catch (Exception e) {
                    // Handle connection exceptions
                    log.info("[WebRTCProxy]-[connectFastAPI] WebSocket Connection Failed");
                }
            }
        }, 0, 60);
    }

    // Socketio Listeners
    private ConnectListener onConnected() {
        return client -> {
            HandshakeData handshakeData = client.getHandshakeData();
            log.info("[WebRTCProxy]-[Socketio]-[{}] Connected to WebRTCProxy Socketio through '{}'", client.getSessionId().toString(), handshakeData.getUrl());
            connectFastAPI(timer, client);
        };
    }

    private DisconnectListener onDisconnected() {
        return client -> {
            log.info("[WebRTCProxy]-[Socketio]-[{}] Disconnected from WebRTCProxy Socketio Module.", client.getSessionId().toString());
            if(timer != null) {
                timer.cancel();
                timer.purge();
                timer = null;
            }

            if(fastAPIWebSocket != null)
                fastAPIWebSocket.close();
        };
    }

또한 이후 각 SocketIO를 통해 연결한 Client마다 FastAPI와 연결하는 WebSocket을 하나씩 생성해주어 FastAPI로부터의 STT 결과를 SocketIO 클라이언트로 전송해주기 위하여

위와 같이 onnectFastAPI() 메소드에서 Timer 객체와 SocketIOClient 객체를 받아 SocketIO가 onConnected()될 때 새로운 WebSocket 연결을 하나씩 생성해주었다.

또한 SocketIO가 onDisconnected()될 때 timer 객체를 cancel하여 HealthCheck 및 연결이 해제된 SocketIO 클라이언트의 WebSocket에 대한 재연결을 방지하였으며, fastAPIWebSocket를 close() 해주어 리소스가 낭비되는 것을 방지하였다.

public class WebSocketUtil extends WebSocketClient {
    private final SocketIOClient socketIOClient;
    public WebSocketUtil(URI serverUri, Draft protocolDraft, SocketIOClient socketIOClient) {
        super(serverUri, protocolDraft);
        this.socketIOClient = socketIOClient;
    }

    @Override
    public void onMessage(String message) {
        log.info("[WebSocketUtil] Received Messegae {}", message);
        socketIOClient.sendEvent("transitionResult", message);
    }
    ...

이후 FastAPI로부터의 메세지를 받았을 때 SocketIO의 onConnected() 후 WebSocket을 생성할 때 인자로 전달받은 SocketIOClient에 transitionResult 이벤트로 message를 전달하여 Express.js로 개발하였던 모든 로직을 Spring Boot 서버로 개발하는데 성공하였다.
추후 현재 개발되고 있는 FE와 본격적으로 협업하며 학습 보조 서비스로의 여러 기능들을 추가하고
2024년 상반기 이내에 프로토타입을 출시하여 테스트하는 것을 목표로 하고 있다.


0개의 댓글