Service Bus를 사용할 때, 어떻게 at least once 같은 부분들이 처리되는지 궁금해본적이 있었다.
그러다가 깊이 보게된 부분이 receive mode이다.
안에서 어떻게 처리 되길래 최소 한번 이상 실행을 보장하는거야!?
잠시... 보장 해주는거 맞지?
Service Bus는 msg를 pulling 해갈 때, Peek-lock과, Receive and delte라는 2가지 방법을 제시한다.
이번 글은 간략하게 나마 2가지 모드 및 각 종옵션들과 태가 테스트 했던 코드를 기록해 놓으려고 한다.
간략하게 말하자면, msg를 consumer가 가져오기 전, 가져올 메시지를 lock시켜 다른 consumer들이 가져가지 못하게 한 후, msg를 들고오는 방식이다.
이는 2가지의 단계로 진행 되어진다.
만약 application이 msg를 어떠한 이유로 처리하지 못하는 경우, Service Bus는 해당 msg를 unlock하고, 같은 consumer 혹은 다른 consumer가 다시 받을 수 있게 만들어 준다.
Lock timeout이라는 시간이 있는데, 해당 시간 내에 처리하지 못하면, 위와 같이 unlock 후, 같은 consumer 혹은 다른 consumer가 다시 받을 수 있게 한다.
msg를 처리하고, Service Bus에 complete 처리 해달라는 요청을 보내기 전에, application이 crash 되는 경우, Service Bus는 해당 메시지를 application이 다시 시작할 때 재전달한다. 이러한 process를 at least once processing이라고 부른다. 같은 메시지가 재전달 되어 2번 처리될 수 있기에, application에서 추가적인 logic을 넣어, duplicate 되는 부분들을 탐지해야한다.
위와 같이 2번씩 처리되는 부분을 방지하기 위해서는, MessageID를 이용하여 새로운 MessageID가 time window에 로깅 되어 있다면, accepted된 msg라고 판단하면 된다.
따라서 Peek-lock은 모든 메시지들이 최소 한번이상 무조건 처리되어야 하는 application에서 사용하면 된다.
이 모드는, consumer에게 요청을 받게 되면, 해당 msg를 consumed 되었다고 mark 하고 consumer application에 msg를 return한다.
가장 간단한 모데링며, 모든 msg가 전부 처리되지 않아도 되는 application에서 사용하면 된다.
Peek-lock이 default 옵션이다.
mode를 ReceiveAndDelete mode로 설정을 해야 해당 모드로 사용 가능하다.
위의 mode중 Receive and delete 방식은, 애초에 consume 마크하고 보내니, 여러 consumer가 같은 queue를 처리할 일은 없다.
그러니 우린 Peek-lock을 테스트해보자.
from azure.servicebus import ServiceBusClient, ServiceBusMessage
import asyncio
CONNECTION_STR = "{Service_Bus_Endpoint_Here}"
QUEUE_NAME = "{Queue_Name_Here}"
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)
servicebus_client2 = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)
servicebus_client3 = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)
print("\n\n----------------------------------------------\n")
print("Sending messages to Service Bus")
def send_message(sender):
for x in range(1,11):
message = ServiceBusMessage(str(x))
sender.send_messages(message)
print(" number: ", str(x))
print("Sending messages finished\n")
with servicebus_client:
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
with sender:
send_message(sender)
print("Receiving messages from Service Bus")
print("----------------------------------------------\n")
async def receiver_001():
with servicebus_client:
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, max_wait_time=5)
with receiver:
for msg in receiver:
await asyncio.sleep(3)
print("receiver_001 Received: " + str(msg))
receiver.complete_message(msg)
async def receiver_002():
with servicebus_client2:
receiver = servicebus_client2.get_queue_receiver(queue_name=QUEUE_NAME, max_wait_time=5)
with receiver:
for msg in receiver:
await asyncio.sleep(2)
print("receiver_002 Received: " + str(msg))
receiver.complete_message(msg)
async def receiver_003():
with servicebus_client3:
receiver = servicebus_client2.get_queue_receiver(queue_name=QUEUE_NAME, max_wait_time=5)
with receiver:
for msg in receiver:
await asyncio.sleep(1)
print("receiver_003 Received: " + str(msg))
receiver.complete_message(msg)
async def main():
try:
await asyncio.wait([
asyncio.create_task(receiver_001()),
asyncio.create_task(receiver_002()),
asyncio.create_task(receiver_003())
])
except:
pass
if __name__=='__main__':
asyncio.run(main())
위 코드를 돌리게 되면 아래와 같은 화면을 볼 수 있다.
10까지 넣고, 동시에 3개의 consumer를 돌리더라도 같은 msg를 들고오지 않는 부분을 확인할 수 있다.
3개의 consumer가 동시에 가져오는 부분을 보여주고자 짠 코드기에, queue에서 나갈때의 순서는 보장되지만, 처리되서 내 화면에서 뿌려지는 순서는 보장되지 않는 점은 감안해 주기 바란다.
아래 부분만 기억해도 남는 장사이다.
최소 한 번이상 처리 필요: Peek-lock
최소 한 번이상 처리 불필요: Receive and delete