[분산 시스템] 1-5. Prod and Cons Using Queue

gimseonjin616·2021년 12월 27일
0

분산시스템

목록 보기
6/14

Queue란

큐(queue)는 컴퓨터의 기본적인 자료 구조의 한가지로, 먼저 집어 넣은 데이터가 먼저 나오는 FIFO(First In First Out)구조로 저장하는 형식을 말한다.

영어 단어 queue는 표를 사러 일렬로 늘어선 사람들로 이루어진 줄을 말하기도 하며, 먼저 줄을 선 사람이 먼저 나갈 수 있는 상황을 연상하면 된다.

프린터의 출력 처리나 윈도 시스템의 메시지 처리기, 프로세스 관리 등 데이터가 입력된 시간 순서대로 처리해야 할 필요가 있는 상황에 이용된다.

파이썬에서는 queue 라이브러리를 통해 쉽게 Queue를 구현할 수 있다.

# 파이썬 queue 구현 코드

import queue 

pipeline = queue.Queue(maxsize=10)

Python Event 객체

python에서는 스레드간의 간단한 통신으로 활용할 수 있는 Event 객체를 제공한다.

이벤트 객체의 기능은 다음과 같다.

  • is_set() : 내부 플래그가 True면 그때만 True 반환

  • set() : 내부 플래그를 True로 설정

  • clear() : 내부 플래그를 False로 설정

  • wait(timeout=None) : 내부 플래그가 True가 될때까지 blocking함.

Producer & Consumer pattern

Producer & Consumer pattern은 멀티 스레드에서 가장 자주 쓰이는 디자인 패턴이다. 서버 프로그램의 핵심이며 주로 허리 역할에 해당한다.

Producer는 자신이 필요한 요청을 Consumer와 공유하고 있는 Queue에 담는다. 그 후 Consumer는 자신이 요청을 처리할 수 있는 상태일 때, Queue에 담긴 요청을 가져와서 처리한다.

이때 중요한 점은 Producer는 Queue에 값을 넣어놓고 그 결과를 확인하지 않는다. 즉 비동기 방식으로 진행된다. 따라서 사용할 때 예외처리에 조심해야한다.

Producer & Consumer 코드

Producer

# 생산자
def producer(queue, event):
    """네트워크 대기 상태라 가정(서버)"""
    while not event.is_set():
        message = random.randint(1, 11)
        logging.info("Producer got message: %s", message)
        queue.put(message)

    logging.info("Producer received event. Exiting")

Consumer

# 소비자
def consumer(queue, event):
    """응답 받고 소비하는 것으로 가정 or DB 저장"""
    while not event.is_set() or not queue.empty():
        message = queue.get()
        logging.info(
            "Consumer storing message: %s (size=%d)", message, queue.qsize()
        )

    logging.info("Consumer received event. Exiting")

Main 스레드 코드

Main

if __name__ == "__main__":
    # Logging format 설정
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    # 사이즈 중요
    pipeline = queue.Queue(maxsize=10)

    # 이벤트 플래그 초기 값 0
    event = threading.Event()

    # With Context 시작
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        # 실행 시간 조정
        time.sleep(0.1)

        logging.info("Main: about to set event")
        
        # 프로그램 종료
        event.set()

결과

0.1초간 실행된 결과가 많아서 일부만 가져왔다. 여기서 보면 Python의 GIL에 대해 알고있어야 한다. GIL은 한번에 한 스레드만 사용할 수 있다는 개념이다.

우선 Producer 스레드가 값을 큐에 채워넣는다. 그 후 Consumer에게 자원 배정이 되면 큐에 쌓인 데이터를 처리한다.

그렇기 때문에 Main 스레드에서 Event 상태를 set으로 바꾸면 먼저 실행된 Producer가 먼저 끝나는 것이 아니라 Consumer에 배정된 시간이 다 끝나야 Producer 스레드가 끝이 나는 것이다.

profile
to be data engineer

0개의 댓글