Queue 사용 X
모든 작업이 다 끝났는지 검사를 하기 위해서 추가로 done_queue에 대해 바쁜 대기(busy waiting)를 수행
run메서드가 루프를 무한히 반복
파이프라인 진행이 막힐 경우 프로그램이 임의로 중단된다.
Queue 사용 O
새로운 데이터가 나타날 때까지 get 메서드가 블록되게 하여 작업자의 바쁜 대기 문제를 해결
파이프라인이 중간에 막히지 않는다.
두 단계 사이에 허용 가능한 미완성 작업의 최대 개수 지정
버퍼 크기를 정하면 큐가 이미 가득 찬 경우 put 블록
센티널 원소를 추가하는 close 메서드 정의
join()메서드를 활용하면 병렬성을 노이게 되어서프로그램의 속도 증가
#위와 같은 문제를 queue 내장 모듈에 있는 Queue 클래스는 앞에서 설명한 모든 문제 해결
from queue import Queue
from threading import Thread
import time
my_queue = Queue()
def consumer():
print('소비자 대기')
my_queue.get() #다음에 보여줄 put()가 실행이 된 후에 실행
print('소비자 완료')
thread = Thread(target=consumer)
thread.start()
print('생산자 데이터 추가')
my_queue.put(object())
print('생산자 완료')
소비자 대기
생산자 데이터 추가
생산자 완료소비자 완료
#큐 작업 진행 감시 및 적당한 횟수 호출
class CloseableQueue(Queue):
SENTINEL = object()
def close(self):
self.put(self.SENTINEL)
def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTTINEL:
return #스레드를 종료
yield item
finally:
self.task_done()
class StoppableWorker(Thread):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
for item in self.in_queue:
result = self.func(item)
self.out_queue.put(result)
def start_threads(count, *args):
threads = [StoppableWorker(*args) for _ in range(count)]
for thread in threads:
thread.start()
return threads
def stop_threads(closable_queue, threads):
for _ in threads:
closable_queue.close()
closable_queue.join()
for thread in threads:
thread.join()
순차적인 작업을 동시에 여러 파이썬 스레드에서 실행되도록 조작하고 싶을 때, 특히 I/O위주의 프로그램인 경우라면 파이프라인이 매우 유용하다.
동시성 파이프라인을 만들 때 발생할 수 있는 여러가지문제(바쁜 대기, 작업자에게 종료를 알리는 방법, 잠재적인 메모리 사용량 폭발 등)를 잘 알아두라.
Queue 클래스는 튼튼한 파이프라인을 구축할 때 필요한 기능인 블로킹 연산, 버퍼 크기 지정, join을 통한 완료 대기를 모두 제공