Zero Message Queue

nana·2023년 1월 16일
0

네트워크

목록 보기
6/12

프로토콜의 위치

과거 전통적인 인터넷 프로토콜은 운영체제 내부 커널에 위치했지만, 현대 인터넷 프로토콜은 운영체제 상위의 어플리케이션 계층으로 제공한다.

위치에 따른 장단점

  • 성능 측면(datarate; 단위 시간 내에 계층 간 데이터 전송 속도, memory; 메모리에 읽고 쓸 때 걸리는 시간) : 운영체제 내부 > 어플리케이션 계층
  • 개발 및 발전 용이성 측면 : 커널에 존재하면 새로운 통신 프로토콜을 주입하기 어렵고 운영체제를 관리하는 회사에 버전 업데이트 여부가 달려 있으나, application layer에 존재하면 여러 개발자들이 함께 오픈소스화 해서 개발할 수 있으며 버전 업데이트가 빠르다. => 운영체제 내부 < 어플리케이션 계층

전통적인 인터넷 프로토콜(TCP/UDP/IP)

운영체제 내부에 존재

Socket API

  • 커널과 응용 프로그램을 연결하여 커널 내부의 TCP/UDP/IP와 통신할 수 있도록 해준다.
  • Linux나 Unix를 포함한 POSIX 계열의 운영체제는 소켓을 file descriptor로 어플리케이션에 노출한다.
  • POSIX 계열의 운영체제에서 소켓은 파일의 한 종류이다.(POSIX 계열의 운영체제에서는 대부분의 하드웨어, 소프트웨어 자원이 파일이다)
  • 운영체제의 커널 내부에 위치한 통신 소프트웨어를 운영체제의 자원처럼 사용하기 때문에, POSIX 계열의 운영체제의 커널에 접근할 때 사용하는 파일 함수(파일 읽기/쓰기)를 통신할 때에도 사용한다.

ZeroMQ(Zero Message Queue)

ZeroMQ(ØMQ, 0MQ, zmq)

어플리케이션 계층의 메시징 라이브러리로, 운영체제 위 어플리케이션 계층에 존재한다.(일부는 커널에 존재하는 TCP/IP를 사용하기도 한다.)
ZeroMQ는 컴퓨터 간(Inter-node, TCP/IP 사용), 프로그램 간(Inter-process, named pipe 사용), 스레드 간(Inter-thread, 메모리 사용) 통신을 모두 지원한다. -> 운영체제를 통해서 정보를 주고 받는 여러 기법들을 쉽고 편하게 제공할 수 있도록 만들어졌다.
Type : Message queue, concurrency framework(병렬처리)

참고) named pipe : 프로세스 간 통신(IPC) 기법 중 하나로, Unix 및 Unix 계열의 파이프(두 프로세스가 생산자-소비자 모델에 따라 통신할 수 있게 해주는 원형 버퍼. 한 프로세스가 쓰고 다른 프로세스가 읽는 FIFO 형태의 Queue)를 확장한 것이다.

ZeroMQ 특징 (from website)

  • Universal : 다양한 언어들과 운영체제들을 지원한다.
  • Smart : pub-sub(publish-subscribe; 게시자가 이벤트를 브로드캐스트하여 구독자와 비동기적으로 통신하는 방법으로, 게시자가 이벤트 처리 방식, 시기와 관계없이 pub/sub 서비스에 이벤트를 보내면 pub/sub은 이벤트에 응답하는 모든 서비스에 이벤트를 전송한다.), push-pull(필요할 때 정보를 보내고, 업데이트된 정보가 있고 읽을 의향이 있다면 정보를 가져오는 방식), client-server(1:1, 1:N)와 같은 smart pattern들을 지원한다.
  • Multi-Transport : IPC, TCP, UDP, TIPC, WebSocket, multicast 위에서도 돌아간다.
  • High-speed : 비동기적으로 양방향 통신이 가능하다.
  • Community : 오픈소스 커뮤니티가 존재한다.
  • The Guide : ZeroMQ를 사용하는 방법과 예제들이 제공된다.

ZeroMQ 특징

  • zero : zero broker(brokerless; 반드시 있어야 할 서버인 broker가 필요하지 않다), zero latency(속도가 빠르다), zero cost(free; 비용이 들지 않는다), zero administration(분산 처리 시 관리자가 필요하지 않다)
    비동기식 메시징 라이브러리이다.
  • 분산화된 응용 프로그램에 유용하다. (distributed or concurrent)
  • 중간에 통과해야하는 broker가 존재하지 않는다.
  • Berkeley socket API의 일부를 활용하였다.
  • Request/reply(1:1, 1:N), publish/subscriber 등의 여러 messaging pattern들을 지원한다.
  • TCP/UDP, in-process(프로세스 내), inter-process(프로세스 간), multicast, web socket 등의 여러 transport들 위에서 돌아간다.
  • 28개 언어들과 750개의 예제들을 제공한다. (2021.7월 기준)

ZeroMQ Pattern

  • Request-reply : 전형적인 클라이언트-서버 구조로, 1:1, 1:N, M:N이 가능하다. (REQ, REP, DEALER, ROUTER)
  • Pub-sub : 이벤트 발생 시 공지하는 방식으로, 데이터를 만들어 뿌리는 형태이다. (PUB, SUB, XPUB, XSUB)
  • Pipeline : 분배하여 작업을 수행한 후 다시 모아서 처리하는 방식으로, 분배하는 것을 fan-out, 모으는 것을 fan-in이라고 한다. (PULL; 필요한 데이터를 가져다 쓸 때, PUSH; 작업을 수행한 결과를 밀어 넣을 때)
  • Exclusive pair : 두 socket을 독점적으로 연결하는 방식으로, 프로세스 내 두 스레드를 연결할 때 사용된다.

일반적인 socket vs ZeroMQ socket

일반적인 socket

  • connection-oriented 방식(SOCK_STREAM)이나 connectionless 방식(SOCK_DGRAM)을 사용하는 동기적인 인터페이스로, byte 단위의 segment나 datagram을 전송한다.
  • 1:1, N:1, 1:N(multicast) 모두 지원한다.

ZeroMQ socket

  • 비동기적인 통신이 가능한 message queue로, 메시지들을 전송한다.
  • M:N 통신도 지원한다.

ZeroMQ socket

  • 비동기성 : 물리적인 연결(OSI 1,2계층)과 상관 없이 어플리케이션을 짤 수 있다.(연결이 끊어지면 연결이 되기까지 기다렸다가, 다시 연결이 되면 메시지를 전달한다)
  • socket을 생성하고 제거할 수 있다.
  • socket에 옵션을 설정할 수 있다. (전달받는 데이터 필터링 가능. 예: 날씨 데이터에서 서울 정보만 추출)
  • 네트워크 토폴로지에 맞춰 socket을 연결할 수 있다.
  • socket을 사용하여 데이터를 주고 받을 수 있다.

ZermoMQ socket의 bind & connect

  • 일반적으로 socket API는 서버가 Bind, 클라이언트가 Connect를 사용한다.
  • ZeroMQ는 Connection마다 queue를 만들어 사용한다.
  • ZeroMQ는 프로그램의 시작부터 끝까지 오래 살아있는 경우에는 Bind를, 단기간에 사용되는 경우에는 Connect를 사용한다. (서버가 존재하지 않을 수 있음)

Bind

다른 프로그램들보다 오래 살아 있거나, 동적인 프로그램들과 연결하는 정적인 프로그램의 경우에 사용한다.

Connect

동적인 프로그램들이나 디바이스들이 사용한다.
즉, 프로그램의 시간에 따라 connect와 bind가 다르게 사용될 수 있다.

참고) RabbitMQ (open source message broker)
중앙 집중형 Broker 기반의 message queue. 예) 단체 공지 메일, 재난문자 등

ZeroMQ pattern

Request-Reply pattern

  • 일반적인 socket에서의 client/server 모델
  • 클라이언트 입장에서 보면 동기적으로(synchronous) 동작한다 (REQ를 보내면 서버가 REP를 보낼 때까지 기다림) - 1:1 일 때
  • 1:N도 별도의 수정 없이 가능하다
import time
import zmq

context = zmq.Context()
# 요청에 응답하는 REP 패턴의 socket을 연다
socket = context.socket(zmq.REP)
# TCP를 사용하며, 현재 IP 주소와 포트 번호 5555로 사용할 것이다
socket.bind("tcp://*:5555")

while True:
    message = socket.recv()
    print("Received request: %s" % message)

    time.sleep(1)

    socket.send(b"World")
import zmq

context = zmq.Context()

print("Connecting to hello world server…”)
# 요청을 보내는 REQ 패턴의 socket을 연다
socket = context.socket(zmq.REQ)
# TCP를 사용하며, localhost와 포트 번호 5555로 사용할 것이다
socket.connect("tcp://localhost:5555")

for request in range(10):
    print("Sending request %s …" % request)
    socket.send(b”Hello")

   message = socket.recv()
    print("Received reply %s [ %s ]" % (request, message))

Publish-Subscribe pattern

  • 정보를 생성하고 배포하는 publisher와 정보를 받는 subscriber들로 구성되어 있다.
  • 단방향으로 데이터가 배포된다. (Client(SUB)들은 recv()만 할 수 있으며 send()는 불가능하다. 서비스는 PUB socket에 send()만 가능하며 recv()는 불가능하다.)
  • 비동기적으로 동작한다.
  • publisher가 정보를 push하면, subscriber들이 받는 구조이다.

예) publisher는 날씨 예보 데이터들을 모두 제공하고, subscriber는 특정 우편번호의 데이터만 필터해서 받는다.

import zmq
from random import randrange

print("Publishing updates at weather server...") 

context = zmq.Context()
# PUB 패턴으로 socket을 연다
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")

while True:
    zipcode = randrange(1, 100000)
    temperature = randrange(-80, 135)
    relhumidity = randrange(10, 60)

    socket.send_string(f"{zipcode} {temperature} {relhumidity}")
import sys
import zmq

context = zmq.Context()
# SUB 패턴으로 socket을 연다
socket = context.socket(zmq.SUB)

print("Collecting updates from weather server...")
socket.connect("tcp://localhost:5556")

# 실행 시 입력 파라미터로 특정 우편번호를 전달한 경우 해당 우편번호를, 그렇지 않은 경우에는 10001을 zip_filter에 넣는다 
zip_filter = sys.argv[1] if len(sys.argv) > 1 else10001"
# zip_filter에 해당하는 것만 SUBSCRIBE 패턴으로 받을 수 있도록 설정
socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)

total_temp = 0
for update_nbr in range(20):
    string = socket.recv_string()
    zipcode, temperature, relhumidity = string.split()
    total_temp += int(temperature)

    print(f"Receive temperature for zipcode "
       f"'{zip_filter}' was {temperature} F")

print((f"Average temperature for zipcode " 
       f"'{zip_filter}' was {total_temp / (update_nbr+1)} F"))

Pipeline pattern (Push pull pattern)

  • 하나의 일을 여러 개로 쪼개서 분배(fan-out; 1:N)하고 각각의 결과를 취합해서 하나로 만드는(fan-in; N:1) 패턴이다.(fan-out/fan-in pattern)
  • 병렬적으로 작업을 분배하고 수집하는 경우에 사용된다.
  • 각각의 작업이 완료 된 다음, 결과를 받을 때(pull) 알고리즘을 통해 순서를 조절할 수 있다.(직접 구현 가능)

Publish-Subscribe with Pipeline pattern(Push/pull pattern)

하나의 실행 프로세스는 여러 개의 ZeroMQ 패턴을 가질 수 있다.

  • Server
    - PUB과 PULL 기능을 가지고 있다.
    - 구동 시 publish와 pull 서버 기능을 활성화한다.
    - Client들이 주기적으로 보고하는 상태 정보를 pull한 후, 다시 모든 client들에게 publish한다

  • Client
    - SUB과 PUSH 기능을 가지고 있다.
    - 주기적으로 자신의 상태 정보를 Server에게 push한다.
    - Server가 publish한 전체 client들의 상태를 subscribe하여 수신한다.

import zmq

def main():
    ctx = zmq.Context()
    # PUB 패턴으로 socket을 연다
    publisher = ctx.socket(zmq.PUB)
    publisher.bind(“tcp://*:5557")
    # PULL 패턴으로 socket을 연다
    collector = ctx.socket(zmq.PULL)
    collector.bind("tcp://*:5558")

    while True:
        message = collector.recv()
        print("I: publishing update ", message)
        publisher.send(message)

if __name__ == '__main__':
    main()
import random
import time

import zmq
import sys

def main(argue):

    ctx = zmq.Context()
    # SUB 패턴으로 socket을 연다
    subscriber = ctx.socket(zmq.SUB)
    # b’’는 bytes 문자열로, 모든 문자열을 받겠다는 의미이단
    subscriber.setsockopt(zmq.SUBSCRIBE, b'')
    subscriber.connect(“tcp://localhost:5557")
    # PUSH 패턴으로 socket을 연다
    publisher = ctx.socket(zmq.PUSH)
    publisher.connect("tcp://localhost:5558")

    clientID = argv[1]
    random.seed(time.time())
    while True:
        # 100ms 마다 주기적으로 수신 버퍼에 데이터가 존재하는지 확인한다
        # zmq.POLLIN은 수신 버퍼에 데이터가 있으면 true, 없으면 false인 flag이다
        if subscriber.poll(100) & zmq.POLLIN:
            message = subscriber.recv()
            print("I: received message ", message)
        else:
            rand = random.randint(1, 100)
            if rand < 10:
                time.sleep(1)
                msg = "(" + clientID + ":ON)"
                publisher.send_string(msg)
                print("{0}: send status - activated".format(clientID))
            elif rand > 90:
                time.sleep(1)
                msg = "(" + clientID + ":OFF)"
                publisher.send_string(msg)
                print("{0}: send status - deactivated".format(clientID))
if __name__ == '__main__':
    main(sys.argv)

Dealer-Router pattern

  • 1:N과 N:1이 섞여있을 때 주로 사용하며, 비동기적인 REQ-REP 패턴이라고 볼 수 있다.
  • N:1 -> 여러 client들이 하나의 server에 비동기적으로 요청을 보낸다.
  • 1:N -> 하나의 서버가 여러 worker들에게 비동기적으로 작업을 분배한다.
  • Client와 worker는 생겼다가 없어질 수도 있기 때문에 connect를, server는 계속해서 유지될 수 있기 때문에 bind를 한다.
  • Client는 응답을 기다리지 않고 여러 개의 요청을 보낼 수 있으며, Server는 요청을 기다리지 않고 여러 개의 응답을 보낼 수 있다.
  • Worker가 동일하다면 fair queuing 방식으로 load balancing한다.

zmq.proxy(frontend, backend) : frontend로 오는 것을 backend로, backend로 오는 것을 frontend로 전달하도록 설정하는 함수.
inproc : in-process. TCP/UDP보다 경량화된 통신 방식으로, 컴퓨터 내 메모리 등을 사용해서 데이터를 주고 받는 방식이다.

import zmq
import sys
import threading
import time
from random import randint, random

class ServerTask(threading.Thread):
    """ServerTask"""
    def __init__(self, num_server):
        threading.Thread.__init__ (self)
        self.num_server = num_server

    def run(self):
        context = zmq.Context()
        frontend = context.socket(zmq.ROUTER)
        frontend.bind('tcp://*:5570')

        backend = context.socket(zmq.DEALER)
        backend.bind('inproc://backend')

        workers = []
        for i in range(self.num_server):
            worker = ServerWorker(context, i)
            worker.start()
            workers.append(worker)

        zmq.proxy(frontend, backend) 

        frontend.close()
        backend.close()
        context.term()

class ServerWorker(threading.Thread):
    """ServerWorker"""
    def __init__(self, context, id):
        threading.Thread.__init__ (self)
        self.context = context
        self.id = id

    def run(self):
        worker = self.context.socket(zmq.DEALER)
        worker.connect('inproc://backend')
        print('Worker#{0} started'.format(self.id))
        while True:
            ident, msg = worker.recv_multipart()
            print('Worker#{0} received {1} from {2}'.format(self.id, msg, ident))
            worker.send_multipart([ident, msg])

        worker.close() # useless

def main(argv):
    """main function"""
    server = ServerTask(int(argv[1]))
    server.start()
    server.join()

# worker의 수를 입력 파라미터로 전달해줘야 한다
if __name__ == "__main__":
    main(sys.argv)
import zmq
import sys
import threading
import time
from random import randint, random

class ClientTask(threading.Thread):
    """ClientTask"""
    def __init__(self, id):
        self.id = id
        threading.Thread.__init__ (self)

    def run(self):
        context = zmq.Context()
        socket = context.socket(zmq.DEALER)
        identity = u'%s' % self.id
        socket.identity = identity.encode('ascii')
        socket.connect('tcp://localhost:5570')
        print('Client %s started' % (identity))
        poll = zmq.Poller()
        poll.register(socket, zmq.POLLIN)
        reqs = 0
        while True:
            reqs = reqs + 1
            print('Req #%d sent..' % (reqs))
            socket.send_string(u'request #%d' % (reqs))

            time.sleep(1)
            sockets = dict(poll.poll(1000))
            if socket in sockets:
                msg = socket.recv()
                print('{0} received: {1}'.format(identity, msg))

        socket.close() #useless
        context.term() #useless

def main(argv):
    """main function"""
    client = ClientTask(argv[1])
    client.start()

# client_id를 입력 파라미터로 전달해줘야 한다
if __name__ == "__main__":
    main(sys.argv)
profile
언젠가 개발자

0개의 댓글

관련 채용 정보