import threading
import redis
import time
conn = redis.Redis(host='localhost', port=6379, db=0)
def run_list_queue():
# sender
threading.Thread(target=sender, args=(10,)).start()
# receiver
while True:
print("waiting message...")
res = conn.blpop('test_list', timeout=0)
if res is not None:
print(res)
time.sleep(0.5)
def sender(n: int):
time.sleep(1)
for num in range(n):
time.sleep(1)
conn.rpush('test_list', f'message #{num}')
if __name__ == '__main__':
run_list_queue()
파이썬에서 Redis List를 사용해 메세징 큐를 구현해 보았다. publish하는 시점에 subscriber가 없으면 데이터가 유실되는 Pub/Sub과는 다르게 메세지를 넣어 둔 후 시간이 지나도 메세지를 가져오는것이 가능하다.
위의 코드를 실행할 때 인상깊었던 부분은 blpop
에서 꺼내올 데이터가 없다면 대기를 한다는 점이다.
아래의 실행결과를 보면 가져올 데티어가 없으면 while문이 계속해서 반복되는것이 아니라 blpop
을 하는 지점에서 멈춰있는 것을 확인할 수 있다.
waiting message...
(b'test_list', b'message #0')
waiting message...
(b'test_list', b'message #1')
waiting message...
(b'test_list', b'message #2')
waiting message...
(b'test_list', b'message #3')
waiting message...
(b'test_list', b'message #4')
waiting message...
(b'test_list', b'message #5')
waiting message...
(b'test_list', b'message #6')
waiting message...
(b'test_list', b'message #7')
waiting message...
(b'test_list', b'message #8')
waiting message...
(b'test_list', b'message #9')
waiting message...
위의 예시에서는 rpush
를 사용했지만 rpushx
를 사용하면 키가 존재할 때만 리스트에 데이터를 추가할 수 있다. 트위터는 해당 기능을 사용해 트위터를 자주 이용하던 유저의 타임라인에만 새로운 데이터를 캐싱하고 자주 사용하지 않는 유저에 대해서는 데이터를 캐싱하는 비효율적인 작업을 방지하는 캐싱전략에도 사용된다고 한다.
또한 위의 예시에서 사용한 blpop
은 lpop
과는 다르게 blocking 기능이 있기 때문에 제어권을 자신이 가지고 있어 Event Queue로써 기능할 수 있게 해준다.