과거 전통적인 인터넷 프로토콜은 운영체제 내부 커널에 위치했지만, 현대 인터넷 프로토콜은 운영체제 상위의 어플리케이션 계층으로 제공한다.
운영체제 내부에 존재
어플리케이션 계층의 메시징 라이브러리로, 운영체제 위 어플리케이션 계층에 존재한다.(일부는 커널에 존재하는 TCP/IP를 사용하기도 한다.)
ZeroMQ는 컴퓨터 간(Inter-node, TCP/IP 사용), 프로그램 간(Inter-process, named pipe 사용), 스레드 간(Inter-thread, 메모리 사용) 통신을 모두 지원한다. -> 운영체제를 통해서 정보를 주고 받는 여러 기법들을 쉽고 편하게 제공할 수 있도록 만들어졌다.
Type : Message queue, concurrency framework(병렬처리)
참고) named pipe : 프로세스 간 통신(IPC) 기법 중 하나로, Unix 및 Unix 계열의 파이프(두 프로세스가 생산자-소비자 모델에 따라 통신할 수 있게 해주는 원형 버퍼. 한 프로세스가 쓰고 다른 프로세스가 읽는 FIFO 형태의 Queue)를 확장한 것이다.
다른 프로그램들보다 오래 살아 있거나, 동적인 프로그램들과 연결하는 정적인 프로그램의 경우에 사용한다.
동적인 프로그램들이나 디바이스들이 사용한다.
즉, 프로그램의 시간에 따라 connect와 bind가 다르게 사용될 수 있다.
참고) RabbitMQ (open source message broker)
중앙 집중형 Broker 기반의 message queue. 예) 단체 공지 메일, 재난문자 등
import time
import zmq
context = zmq.Context()
# 요청에 응답하는 REP 패턴의 socket을 연다
socket = context.socket(zmq.REP)
# TCP를 사용하며, 현재 IP 주소와 포트 번호 5555로 사용할 것이다
socket.bind("tcp://*:5555")
while True:
message = socket.recv()
print("Received request: %s" % message)
time.sleep(1)
socket.send(b"World")
import zmq
context = zmq.Context()
print("Connecting to hello world server…”)
# 요청을 보내는 REQ 패턴의 socket을 연다
socket = context.socket(zmq.REQ)
# TCP를 사용하며, localhost와 포트 번호 5555로 사용할 것이다
socket.connect("tcp://localhost:5555")
for request in range(10):
print("Sending request %s …" % request)
socket.send(b”Hello")
message = socket.recv()
print("Received reply %s [ %s ]" % (request, message))
예) publisher는 날씨 예보 데이터들을 모두 제공하고, subscriber는 특정 우편번호의 데이터만 필터해서 받는다.
import zmq
from random import randrange
print("Publishing updates at weather server...")
context = zmq.Context()
# PUB 패턴으로 socket을 연다
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")
while True:
zipcode = randrange(1, 100000)
temperature = randrange(-80, 135)
relhumidity = randrange(10, 60)
socket.send_string(f"{zipcode} {temperature} {relhumidity}")
import sys
import zmq
context = zmq.Context()
# SUB 패턴으로 socket을 연다
socket = context.socket(zmq.SUB)
print("Collecting updates from weather server...")
socket.connect("tcp://localhost:5556")
# 실행 시 입력 파라미터로 특정 우편번호를 전달한 경우 해당 우편번호를, 그렇지 않은 경우에는 10001을 zip_filter에 넣는다
zip_filter = sys.argv[1] if len(sys.argv) > 1 else “10001"
# zip_filter에 해당하는 것만 SUBSCRIBE 패턴으로 받을 수 있도록 설정
socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)
total_temp = 0
for update_nbr in range(20):
string = socket.recv_string()
zipcode, temperature, relhumidity = string.split()
total_temp += int(temperature)
print(f"Receive temperature for zipcode "
f"'{zip_filter}' was {temperature} F")
print((f"Average temperature for zipcode "
f"'{zip_filter}' was {total_temp / (update_nbr+1)} F"))
하나의 실행 프로세스는 여러 개의 ZeroMQ 패턴을 가질 수 있다.
Server
- PUB과 PULL 기능을 가지고 있다.
- 구동 시 publish와 pull 서버 기능을 활성화한다.
- Client들이 주기적으로 보고하는 상태 정보를 pull한 후, 다시 모든 client들에게 publish한다
Client
- SUB과 PUSH 기능을 가지고 있다.
- 주기적으로 자신의 상태 정보를 Server에게 push한다.
- Server가 publish한 전체 client들의 상태를 subscribe하여 수신한다.
import zmq
def main():
ctx = zmq.Context()
# PUB 패턴으로 socket을 연다
publisher = ctx.socket(zmq.PUB)
publisher.bind(“tcp://*:5557")
# PULL 패턴으로 socket을 연다
collector = ctx.socket(zmq.PULL)
collector.bind("tcp://*:5558")
while True:
message = collector.recv()
print("I: publishing update ", message)
publisher.send(message)
if __name__ == '__main__':
main()
import random
import time
import zmq
import sys
def main(argue):
ctx = zmq.Context()
# SUB 패턴으로 socket을 연다
subscriber = ctx.socket(zmq.SUB)
# b’’는 bytes 문자열로, 모든 문자열을 받겠다는 의미이단
subscriber.setsockopt(zmq.SUBSCRIBE, b'')
subscriber.connect(“tcp://localhost:5557")
# PUSH 패턴으로 socket을 연다
publisher = ctx.socket(zmq.PUSH)
publisher.connect("tcp://localhost:5558")
clientID = argv[1]
random.seed(time.time())
while True:
# 100ms 마다 주기적으로 수신 버퍼에 데이터가 존재하는지 확인한다
# zmq.POLLIN은 수신 버퍼에 데이터가 있으면 true, 없으면 false인 flag이다
if subscriber.poll(100) & zmq.POLLIN:
message = subscriber.recv()
print("I: received message ", message)
else:
rand = random.randint(1, 100)
if rand < 10:
time.sleep(1)
msg = "(" + clientID + ":ON)"
publisher.send_string(msg)
print("{0}: send status - activated".format(clientID))
elif rand > 90:
time.sleep(1)
msg = "(" + clientID + ":OFF)"
publisher.send_string(msg)
print("{0}: send status - deactivated".format(clientID))
if __name__ == '__main__':
main(sys.argv)
zmq.proxy(frontend, backend) : frontend로 오는 것을 backend로, backend로 오는 것을 frontend로 전달하도록 설정하는 함수.
inproc : in-process. TCP/UDP보다 경량화된 통신 방식으로, 컴퓨터 내 메모리 등을 사용해서 데이터를 주고 받는 방식이다.
import zmq
import sys
import threading
import time
from random import randint, random
class ServerTask(threading.Thread):
"""ServerTask"""
def __init__(self, num_server):
threading.Thread.__init__ (self)
self.num_server = num_server
def run(self):
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind('tcp://*:5570')
backend = context.socket(zmq.DEALER)
backend.bind('inproc://backend')
workers = []
for i in range(self.num_server):
worker = ServerWorker(context, i)
worker.start()
workers.append(worker)
zmq.proxy(frontend, backend)
frontend.close()
backend.close()
context.term()
class ServerWorker(threading.Thread):
"""ServerWorker"""
def __init__(self, context, id):
threading.Thread.__init__ (self)
self.context = context
self.id = id
def run(self):
worker = self.context.socket(zmq.DEALER)
worker.connect('inproc://backend')
print('Worker#{0} started'.format(self.id))
while True:
ident, msg = worker.recv_multipart()
print('Worker#{0} received {1} from {2}'.format(self.id, msg, ident))
worker.send_multipart([ident, msg])
worker.close() # useless
def main(argv):
"""main function"""
server = ServerTask(int(argv[1]))
server.start()
server.join()
# worker의 수를 입력 파라미터로 전달해줘야 한다
if __name__ == "__main__":
main(sys.argv)
import zmq
import sys
import threading
import time
from random import randint, random
class ClientTask(threading.Thread):
"""ClientTask"""
def __init__(self, id):
self.id = id
threading.Thread.__init__ (self)
def run(self):
context = zmq.Context()
socket = context.socket(zmq.DEALER)
identity = u'%s' % self.id
socket.identity = identity.encode('ascii')
socket.connect('tcp://localhost:5570')
print('Client %s started' % (identity))
poll = zmq.Poller()
poll.register(socket, zmq.POLLIN)
reqs = 0
while True:
reqs = reqs + 1
print('Req #%d sent..' % (reqs))
socket.send_string(u'request #%d' % (reqs))
time.sleep(1)
sockets = dict(poll.poll(1000))
if socket in sockets:
msg = socket.recv()
print('{0} received: {1}'.format(identity, msg))
socket.close() #useless
context.term() #useless
def main(argv):
"""main function"""
client = ClientTask(argv[1])
client.start()
# client_id를 입력 파라미터로 전달해줘야 한다
if __name__ == "__main__":
main(sys.argv)