프로세스(process)는 컴퓨터에서 연속적으로 실행되고 있는 프로그램이다. 프로그램을 구동하여 프로그램 자체와 프로그램의 상태가 메모리 상에서 실행되는 작업 단위를 지칭한다.
멀티프로세싱에서 프로세스들은 parallel(병렬)로 수행된다.
multiprocessing 모듈을 사용하여 멀티프로세싱을 구현할 수 있다.QThread)에서 멀티프로세스를 생성한다.Queue에 데이터를 삽입/추출하는 방식으로 텍스트 전체 결과를 수신한다.
STT 변환 작업은 CPU 연산량이 커 별도의 프로세스로 분리하는 것이 효율적이다.
import multiprocessing
import whisper
class STTProcess(multiprocessing.Process):
def __init__(self, input_queue, output_queue, ready_event):
super().__init__()
self.input_queue = input_queue
self.output_queue = output_queue
self.ready_event = ready_event
def run(self):
# PC 환경에 따라 device 변경
model = whisper.load_model("base", device="cpu")
self.ready_event.set()
print("STT Process Whisper Model Loaded")
while True:
meta = self.input_queue.get()
if meta == "STOP":
break
try:
wav = meta["wav"]
result = model.transcribe(wav, language="en")
self.output_queue.put(result["text"].strip())
except Exception as e:
print(f"STT Process error: {e}")
self.output_queue.put(None)
from PySide6.QtCore import QThread
import numpy as np
import sounddevice as sd
class VADThread(QThread):
def __init__(self, stt_input_queue):
super().__init__()
self.running = True
self.stt_input_queue = stt_input_queue
self.fs = 16000 # samplerate
self.duration = 3 # 녹음 시간 (초)
def run(self):
while self.running:
audio = sd.rec(int(self.duration * self.fs), samplerate=self.fs,
channels=1, dtype='float32')
sd.wait()
wav = np.squeeze(audio)
if np.max(np.abs(wav)) > 0.05: # 단순 VAD 감지 기준
self.stt_input_queue.put({"wav": wav})
def stop(self):
print(f"VAD Thread stop")
self.running = False
메인
import sys
import multiprocessing
from multiprocessing import Queue, Event
from PySide6.QtWidgets import QApplication, QMainWindow
from PySide6.QtCore import Signal, QTimer
from PySide6.QtGui import QPixmap
from indexwindow import Ui_MainWindow
from processes.vad_thread import VADThread
from processes.stt_process import STTProcess
class indexWindow(QMainWindow, Ui_MainWindow):
frame_signal = Signal(QPixmap) # 애니메이션 프레임을 전달하는 신호
def __init__(self, stt_process, stt_input_queue, stt_output_queue):
super(indexWindow, self).__init__()
self.setupUi(self)
# process
self.stt_process = stt_process
self.stt_input_queue = stt_input_queue
self.stt_output_queue = stt_output_queue
# VAD + STT 시작
self.vad_thread = VADThread(stt_input_queue)
self.vad_thread.start()
# STT 결과 polling
self.timer = QTimer()
self.timer.timeout.connect(self.check_stt_output)
self.timer.start(1000)
def check_stt_output(self):
while not self.stt_output_queue.empty():
text = self.stt_output_queue.get()
self.label.setText(text)
def closeEvent(self, event):
self.thread_pool.clear()
self.thread_pool.waitForDone()
self.vad_thread.stop()
event.accept()
def start_stt_process():
"""STT 프로세스 실행"""
stt_input_queue = Queue()
stt_output_queue = Queue()
stt_ready_event = Event()
stt_process = STTProcess(stt_input_queue, stt_output_queue, stt_ready_event)
stt_process.start()
stt_ready_event.wait()
return stt_process, stt_input_queue, stt_output_queue
if __name__ == '__main__':
multiprocessing.set_start_method("spawn")
stt_process, stt_input_queue, stt_output_queue = start_stt_process()
app = QApplication(sys.argv)
Window = indexWindow(stt_process, stt_input_queue, stt_output_queue)
Window.show()
exit_code = app.exec()
stt_input_queue.put("STOP")
stt_process.join()
# 모든 프로세스가 종료된 후 프로그램 종료
sys.exit(exit_code)
프로세스 간의 상태를 공유하고, 그 상태의 변경을 다른 프로세스들이 감지할 수 있도록 하는 동기화 객체. Event 객체는 하나의 상태(flag)를 가지고, set() 혹은 clear()를 사용해 상태를 변경 한다.
STT 프로세스에서 모델이 로드된 후 set()을 호출하여 상태가 변경됨을 알림 -> wait() 중인 메인 프로세스는 set()이 호출된 후 다음 작업을 수행한다.
wait(): 프로세스의 작업 대기set(): flag를 True로 변경. 대기 중인 모든 프로세스를 통과clear(): flag를 False로 변경. 다시 대기 상태로 돌아감.양방향 또는 단방향으로 데이터를 직접 주고받을 수 있는 채널로, 두 프로세스를 직접 연결
단어 단위로 결과가 나오는 작업을 실시간으로 GUI에 표시하고자 할 때 적합하다. 예시 코드에서는 지연(time.sleep())을 추가하여 단어 단위로 결과가 나오는 것 처럼 보이게 함.
import multiprocessing
import time
import whisper
class STTStreamProcess(multiprocessing.Process):
def __init__(self, conn, ready_event):
super().__init__()
self.conn = conn
self.ready_event = ready_event
def run(self):
model = whisper.load_model("base", device="cpu")
self.ready_event.set()
print("STT Process Whisper Model Loaded")
while True:
data = self.conn.recv()
if data == "STOP":
break
try:
wav = data["wav"]
result = model.transcribe(wav, language="en")
text = result["text"].strip()
# 단어 단위로 스트리밍
for word in text.split():
self.conn.send(word)
time.sleep(0.5)
if word and len(word) > 0:
self.conn.send("__END__")
except Exception as e:
print(f"STT Process error: {e}")
from PySide6.QtCore import QThread
import numpy as np
import sounddevice as sd
class VADThread(QThread):
def __init__(self, conn):
super().__init__()
self.running = True
self.conn = conn
self.fs = 16000 # samplerate
self.duration = 3 # 녹음 시간 (초)
def run(self):
while self.running:
audio = sd.rec(int(self.duration * self.fs), samplerate=self.fs,
channels=1, dtype='float32')
sd.wait()
wav = np.squeeze(audio)
if np.max(np.abs(wav)) > 0.05: # 단순 VAD 감지 기준
self.conn.send({"wav": wav})
def stop(self):
print(f"VAD Thread stop")
self.running = False
import sys
import multiprocessing
from multiprocessing import Pipe, Event
from PySide6.QtWidgets import QApplication, QMainWindow
from PySide6.QtCore import Signal, QTimer
from PySide6.QtGui import QPixmap
from indexwindow import Ui_MainWindow
from processes.vad_thread import VADThread
from processes.stt_process import STTStreamProcess
class indexWindow(QMainWindow, Ui_MainWindow):
frame_signal = Signal(QPixmap) # 애니메이션 프레임을 전달하는 신호
def __init__(self, stt_process, conn):
super(indexWindow, self).__init__()
self.setupUi(self)
# process
self.stt_process = stt_process
self.conn = conn
# VAD + STT 시작
self.vad_thread = VADThread(conn)
self.vad_thread.start()
# STT 결과 polling
self.timer = QTimer()
self.timer.timeout.connect(self.read_pipe_output)
self.timer.start(300)
def read_pipe_output(self):
while self.conn.poll():
word = self.conn.recv()
print(word)
if word == "__END__":
self.textEdit.insertPlainText(f"\nEND!\n")
else:
self.textEdit.insertPlainText(f"{word} ")
def closeEvent(self, event):
self.thread_pool.clear()
self.thread_pool.waitForDone()
self.vad_thread.stop()
self.conn.send("STOP")
self.stt_process.join()
event.accept()
def start_stt_process():
"""STT 프로세스 실행"""
conn_main, conn_child = Pipe()
stt_ready_event = Event()
stt_process = STTStreamProcess(conn_child, stt_ready_event)
stt_process.start()
stt_ready_event.wait()
return stt_process, conn_main
if __name__ == '__main__':
multiprocessing.set_start_method("spawn")
stt_process, conn_main = start_stt_process()
app = QApplication(sys.argv)
Window = indexWindow(stt_process, conn_main)
Window.show()
sys.exit(app.exec())
duplex가 True면 양방향, False면 단방향으로 기본값은 True이다.