Redis Queue(RQ) 맛보기

dony·2024년 2월 8일
1

Parallel Processing

목록 보기
2/2
post-thumbnail

Redis Queue란?

Redis Queue는 작업 큐를 활용할 수 있는 간단한 Python 라이브러리이다. 사용하기 쉽도록 설계되어있으며, 웹 스택과도 쉽게 통합할 수 있다.

RQ worker를 여러 개 실행하면, 각 worker는 동일한 큐 혹은 다른 큐에서 작업을 가져와 독립적으로 수행할 수 있다.


테스트 해보기

간단히 병렬 처리를 시도해보자. 간단한 코드는 공식 문서에도 나와 있고, 해외 블로그에도 설명이 되어 있다. 다만 Windows 환경이라면 redis를 띄우는데 삽질을 조금 하게 될 수도 있다. Unix 기반 환경이나 docker 컨테이너를 이용하는 것이 테스트하기 편하다. 이런 저런 권한 문제와 fork 에러로 부딪히다 WSL을 활용해서 진행했다.

간단한 테스트로, 특정 url에 접속해서 공백 기준 문자열의 수를 계산하는 것과 숫자를 출력해보는 작업을 진행해보았다. 아래는 간단한 task 함수들이다.

# tasks.py
import time
import requests


def print_task(seconds):
    print("Starting task")
    for num in range(seconds):
        print(num, ".Hello World!")
        time.sleep(1)
    print("Task completed")


def print_numbers(seconds):
    print("Starting num task")
    for num in range(seconds):
        print(num)
        time.sleep(1)
    print("Task to print_numbers completed")


def count_words_at_url(url):
    time.sleep(10)
    response = requests.get(url)
    return len(response.text.split())

먼저 숫자를 출력해보는 테스트다. 총 3개의 프로세스를 켜서 redis-cli를 실행하고, worker를 2개 생성했다. worker의 경우 rq worker --with-scheduler 라는 명령어를 수행해주면 된다. 이때 주의해야할 점은 test 파일과 같은 디렉토리에서 생성해야 한다. 이후 test.py를 수행하면 worker들이 차례로 작업을 가져가 수행한다.

# test.py
from datetime import timedelta
from redis import Redis
from rq import Queue
import tasks


queue = Queue(connection=Redis())

def queue_tasks():
    queue.enqueue(tasks.print_task, 5)
    queue.enqueue(tasks.print_task, 5)
    queue.enqueue(tasks.print_task, 5)
    queue.enqueue_in(timedelta(seconds=10), tasks.print_numbers, 5)

def main():
    queue_tasks()

if __name__ == "__main__":
    main()

다음은 입력된 url에서 텍스트를 받아오고, 단어의 개수를 출력하는 테스트이다.

# test2.py
from redis import Redis
from rq import Queue
import tasks
import time

queue = Queue(connection=Redis())

if __name__ == "__main__":
    urls = [
        'http://www.google.com',
        'http://www.naver.com',
        'http://www.bing.com',
    ]
    
    jobs = [queue.enqueue(tasks.count_words_at_url, url) for url in urls]
    print(f"{len(jobs)} jobs queued.")


    while not all(job.is_finished for job in jobs):
        time.sleep(1)

    for job in jobs:
        print(f"Result for {job.args[0]}: {job.result}")

3개의 url로 실행해본 결과 다음과 같이 잘 출력되는 것을 볼 수 있다.

worker들은 다음과 같이 작업을 나누어 수행한다. 총 3개의 url이 주어졌기 때문에, 1개 2개로 나뉘어진 것을 볼 수 있다.


마주친 문제들

윈도우 fork 에러

AttributeError: module 'os' has no attribute 'fork'

이전에도 마주쳤었던 것 같은 에러.. 아마 윈도우를 쓰는 사람이라면 한 번쯤은 겪어봤을 것이다. 이는 Unix와 Windows 시스템 자체의 프로세스 생성 방식이 다르기 때문이다.

Unix에서 fork()는 프로세스 복제라고 이해할 수 있다. 자식 프로세스가 생성되면 부모 프로세스와 완전히 동일한 소스코드를 갖게되며, 뿐만 아니라 부모의 PCB까지 물려 받는다. 이때 copy-on-write 방식을 사용하여 메모리를 효율적으로 관리하는데, 때문에 자식 프로세스는 fork() 호출 이후 처음에는 부모 프로세스의 메모리 페이지를 그대로 가리키고 있다.

반면 Windows는 CreateProcess라는 다른 메커니즘을 사용한다. fork()와는 근본적으로 다른데, 새 프로세스를 위한 새로운 메모리 공간을 할당하고 실행 파일의 내용을 그 공간에 로드한다. Windows의 경우 copy-on-write 방식을 기본적으로 지원하지 않기 때문에, fork() 구현 자체가 어렵다.

이 에러 때문에 계속 테스트 진행이 불가해서 WSL로 옮겨갔다...


Redis RDB snapshot

MISCONF Redis is configured to save RDB snapshots, ...

Redis는 특정 시점에 데이터를 RDB 파일로 디스크에 저장한다. 이 과정을 snapshot이라고 하며, 덕분에 장애 발생시 복구가 가능한다는 장점이 있다. 문제는 파일 저장 실패 시 Redis의 모든 데이터 쓰기가 불가능하게 된다. 이때 위와 같은 에러가 발생하는 듯 하다. WSL을 사용하면서 생기는 여러 문제들이 있는데 그중 하나로 보인다..

당장은 테이터를 저장하는 기능이 필요가 없으므로 아래와 같이 해당 기능을 꺼두었다. redis-cli를 실행한 후 입력하면 된다.

127.0.0.1:6379> config set stop-writes-on-bgsave-error no

conf 파일에 직접 설정하는 방법도 있다.

# vi /etc/redis/redis.conf
stop-writes-on-bgsave-error no

함수 관련 버그

ValueError: Invalid attribute name: tasks.print_numbers

테스트 중 task 함수와 큐에 작업을 넣는 메인 함수를 같은 스크립트 파일 두었더니 위와 같은 에러가 생긴다. 스택오버플로우에서는 버그라고 하던데, 어쨌든 파일을 나누기만 하면 된다. tasks.py와 test.py로 따로 나누고, import 해오는 방식으로 해결했다.


Reference

RQ 공식문서: https://python-rq.org/
간단한 테스트 블로그: https://www.twilio.com/en-us/blog/first-task-rq-redis-python
os.fork 에러: https://www.sysnet.pe.kr/2/0/12811
os.fork: https://wslog.dev/fork-exec
RDB 파일 저장 설정: https://server-talk.tistory.com/477
모듈 버그: https://stackoverflow.com/questions/31012817/python-reddis-queue-valueerror-functions-from-the-main-module-cannot-be-pro

0개의 댓글