PySide, Multi-Processing

moredev·2025년 3월 25일

pyside

목록 보기
3/3

Process?

프로세스(process)는 컴퓨터에서 연속적으로 실행되고 있는 프로그램이다. 프로그램을 구동하여 프로그램 자체와 프로그램의 상태가 메모리 상에서 실행되는 작업 단위를 지칭한다.
멀티프로세싱에서 프로세스들은 parallel(병렬)로 수행된다.

Python GUI

Multi-Processing

  • CPU가 사용되는 연산, 이미지 처리, 파싱 등 CPU bounding 작업이 많을 경우 파이썬에서는 멀티프로세싱 방식이 더 유리하다.
  • 파이썬의 multiprocessing 모듈을 사용하여 멀티프로세싱을 구현할 수 있다.
  • GUI 스레드를 블로킹할 수 있기 때문에 별도의 스레드(QThread)에서 멀티프로세스를 생성한다.

1. 메시지 통신(multiprocessing.Queue)

Queue에 데이터를 삽입/추출하는 방식으로 텍스트 전체 결과를 수신한다.

  • 대기열에 입력되는 모든 데이터는 직렬화된다.
  • 여러 프로세스가 동시에 접근 가능하다.
  • 프로세스 간에 큰 객체가 전달 될 때 느려질 수 있다.

예시: STT 변환

STT 변환 작업은 CPU 연산량이 커 별도의 프로세스로 분리하는 것이 효율적이다.

STT 프로세스

import multiprocessing

import whisper


class STTProcess(multiprocessing.Process):
    def __init__(self, input_queue, output_queue, ready_event):
        super().__init__()
        self.input_queue = input_queue
        self.output_queue = output_queue
        self.ready_event = ready_event

    def run(self):
    	# PC 환경에 따라 device 변경
        model = whisper.load_model("base", device="cpu")
        self.ready_event.set()
        print("STT Process Whisper Model Loaded")

        while True:
            meta = self.input_queue.get()
            if meta == "STOP":
                break

            try:
                wav = meta["wav"]
                result = model.transcribe(wav, language="en")
                self.output_queue.put(result["text"].strip())
            except Exception as e:
                print(f"STT Process error: {e}")
                self.output_queue.put(None)

VAD 스레드

from PySide6.QtCore import QThread
import numpy as np
import sounddevice as sd


class VADThread(QThread):
    def __init__(self, stt_input_queue):
        super().__init__()
        self.running = True
        self.stt_input_queue = stt_input_queue
        self.fs = 16000     # samplerate
        self.duration = 3   # 녹음 시간 (초)

    def run(self):
        while self.running:
            audio = sd.rec(int(self.duration * self.fs), samplerate=self.fs,
            			channels=1, dtype='float32')
            sd.wait()

            wav = np.squeeze(audio)
            if np.max(np.abs(wav)) > 0.05:  # 단순 VAD 감지 기준
                self.stt_input_queue.put({"wav": wav})

    def stop(self):
        print(f"VAD Thread stop")
        self.running = False

메인

import sys
import multiprocessing
from multiprocessing import Queue, Event

from PySide6.QtWidgets import QApplication, QMainWindow
from PySide6.QtCore import Signal, QTimer
from PySide6.QtGui import QPixmap

from indexwindow import Ui_MainWindow
from processes.vad_thread import VADThread
from processes.stt_process import STTProcess

class indexWindow(QMainWindow, Ui_MainWindow):
    frame_signal = Signal(QPixmap)  # 애니메이션 프레임을 전달하는 신호

    def __init__(self, stt_process, stt_input_queue, stt_output_queue):
        super(indexWindow, self).__init__()
        self.setupUi(self)

        # process
        self.stt_process = stt_process
        self.stt_input_queue = stt_input_queue
        self.stt_output_queue = stt_output_queue

        # VAD + STT 시작
        self.vad_thread = VADThread(stt_input_queue)
        self.vad_thread.start()

        # STT 결과 polling
        self.timer = QTimer()
        self.timer.timeout.connect(self.check_stt_output)
        self.timer.start(1000)

    def check_stt_output(self):
        while not self.stt_output_queue.empty():
            text = self.stt_output_queue.get()
            self.label.setText(text)

    def closeEvent(self, event):
        self.thread_pool.clear()
        self.thread_pool.waitForDone()
        self.vad_thread.stop()

        event.accept()


def start_stt_process():
    """STT 프로세스 실행"""
    stt_input_queue = Queue()
    stt_output_queue = Queue()
    stt_ready_event = Event()

    stt_process = STTProcess(stt_input_queue, stt_output_queue, stt_ready_event)
    stt_process.start()
    stt_ready_event.wait()

    return stt_process, stt_input_queue, stt_output_queue


if __name__ == '__main__':
    multiprocessing.set_start_method("spawn")
    stt_process, stt_input_queue, stt_output_queue = start_stt_process()

    app = QApplication(sys.argv)
    Window = indexWindow(stt_process, stt_input_queue, stt_output_queue)
    Window.show()

    exit_code = app.exec()

    stt_input_queue.put("STOP")
    stt_process.join()
    
    # 모든 프로세스가 종료된 후 프로그램 종료
    sys.exit(exit_code)

multiprocessing.Event()

프로세스 간의 상태를 공유하고, 그 상태의 변경을 다른 프로세스들이 감지할 수 있도록 하는 동기화 객체. Event 객체는 하나의 상태(flag)를 가지고, set() 혹은 clear()를 사용해 상태를 변경 한다.
STT 프로세스에서 모델이 로드된 후 set()을 호출하여 상태가 변경됨을 알림 -> wait() 중인 메인 프로세스는 set()이 호출된 후 다음 작업을 수행한다.

  • wait(): 프로세스의 작업 대기
  • set(): flag를 True로 변경. 대기 중인 모든 프로세스를 통과
  • clear(): flag를 False로 변경. 다시 대기 상태로 돌아감.

1. 채널 통신(multiprocessing.Pipe)

양방향 또는 단방향으로 데이터를 직접 주고받을 수 있는 채널로, 두 프로세스를 직접 연결

  • 두 프로세스가 서로 데이터를 주고받을 수 있다.
  • 큐 방식보다 오버헤드가 적음
  • 직접 연결이기에 1:1만 가능하다.
  • 실시간 스트리밍 방식에 적합

예시: 스트리밍 텍스트 표시

단어 단위로 결과가 나오는 작업을 실시간으로 GUI에 표시하고자 할 때 적합하다. 예시 코드에서는 지연(time.sleep())을 추가하여 단어 단위로 결과가 나오는 것 처럼 보이게 함.

STT 프로세스

import multiprocessing
import time

import whisper

class STTStreamProcess(multiprocessing.Process):
    def __init__(self, conn, ready_event):
        super().__init__()
        self.conn = conn
        self.ready_event = ready_event

    def run(self):
        model = whisper.load_model("base", device="cpu")
        self.ready_event.set()
        print("STT Process Whisper Model Loaded")

        while True:
            data = self.conn.recv()
            if data == "STOP":
                break

            try:
                wav = data["wav"]
                result = model.transcribe(wav, language="en")
                text = result["text"].strip()

                # 단어 단위로 스트리밍
                for word in text.split():
                    self.conn.send(word)
                    time.sleep(0.5)

                if word and len(word) > 0:
                    self.conn.send("__END__")
            except Exception as e:
                print(f"STT Process error: {e}")

VAD 스레드

from PySide6.QtCore import QThread
import numpy as np
import sounddevice as sd


class VADThread(QThread):
    def __init__(self, conn):
        super().__init__()
        self.running = True
        self.conn = conn
        self.fs = 16000     # samplerate
        self.duration = 3   # 녹음 시간 (초)

    def run(self):
        while self.running:
            audio = sd.rec(int(self.duration * self.fs), samplerate=self.fs,
            channels=1, dtype='float32')
            sd.wait()

            wav = np.squeeze(audio)
            if np.max(np.abs(wav)) > 0.05:  # 단순 VAD 감지 기준
                self.conn.send({"wav": wav})

    def stop(self):
        print(f"VAD Thread stop")
        self.running = False

메인

import sys
import multiprocessing
from multiprocessing import Pipe, Event

from PySide6.QtWidgets import QApplication, QMainWindow
from PySide6.QtCore import Signal, QTimer
from PySide6.QtGui import QPixmap

from indexwindow import Ui_MainWindow
from processes.vad_thread import VADThread
from processes.stt_process import STTStreamProcess

class indexWindow(QMainWindow, Ui_MainWindow):
    frame_signal = Signal(QPixmap)  # 애니메이션 프레임을 전달하는 신호

    def __init__(self, stt_process, conn):
        super(indexWindow, self).__init__()
        self.setupUi(self)
        # process
        self.stt_process = stt_process
        self.conn = conn

        # VAD + STT 시작
        self.vad_thread = VADThread(conn)
        self.vad_thread.start()

        # STT 결과 polling
        self.timer = QTimer()
        self.timer.timeout.connect(self.read_pipe_output)
        self.timer.start(300)


    def read_pipe_output(self):
        while self.conn.poll():
            word = self.conn.recv()
            print(word)
            if word == "__END__":
                self.textEdit.insertPlainText(f"\nEND!\n")
            else:
                self.textEdit.insertPlainText(f"{word} ")


    def closeEvent(self, event):
        self.thread_pool.clear()
        self.thread_pool.waitForDone()
        self.vad_thread.stop()

        self.conn.send("STOP")
        self.stt_process.join()

        event.accept()


def start_stt_process():
    """STT 프로세스 실행"""
    conn_main, conn_child = Pipe()
    stt_ready_event = Event()

    stt_process = STTStreamProcess(conn_child, stt_ready_event)
    stt_process.start()
    stt_ready_event.wait()

    return stt_process, conn_main


if __name__ == '__main__':
    multiprocessing.set_start_method("spawn")
    stt_process, conn_main = start_stt_process()

    app = QApplication(sys.argv)
    Window = indexWindow(stt_process, conn_main)
    Window.show()

    sys.exit(app.exec())

Pipe()의 양방향/단방향

  • duplex가 True면 양방향, False면 단방향으로 기본값은 True이다.

0개의 댓글