[Redis] 작업 Queue 만들기

heejini·2025년 1월 16일
1
post-thumbnail

Redis로 작업 대기 queue 만들기

작업의 효율성을 높이기 위해 queue 에 task 를 저장해놓고, 현재 실행 중인 task 가 끝나면 queue 에 있는 다음 task 가 실행되도록 한다.

Structure

┌───────────┐      Redis          ┌─────────────┐
│ producer │  →  Queue →         │ consumer   │
└───────────┘                     └─────────────┘

consumer.py : 큐에서 데이터 가져오기
producer.py : 데이터를 큐에 추가
monitor.py  : 모니터링 하는 py 파일

Code

consumer.py

import redis
import argparse
import subprocess
import uuid
import time

def dequeue(redis_client, queue_name, timeout):
    print("[Consumer] Waiting for tasks...")
    while True:
        task = redis_client.brpop(queue_name, timeout)
        if task:
            _, command = task
            command = command.decode()  # 바이트 -> 문자열
            # 각 명령어를 고유 ID와 함께 추적
            task_id = str(uuid.uuid4())

            print(f"[Consumer] Picked Task ID: {task_id}")
            print(f"[Consumer] Executing Command: {command}")

            # 1) RUNNING 상태 설정
            redis_client.set(f"task:{task_id}:command", command)
            redis_client.set(f"task:{task_id}:status", "RUNNING")

            # 2) 비동기 실행
            process = subprocess.Popen(command, shell=True)
            pid = process.pid
            redis_client.set(f"task:{task_id}:pid", pid)

            # 3) 프로세스 종료 대기 (동기이지만, popen 자체는 내부적으로 비동기로 동작)
            return_code = process.wait()
            if return_code == 0:
                redis_client.set(f"task:{task_id}:status", "DONE")
                print(f"[Consumer] Task {task_id} completed successfully.")
            else:
                redis_client.set(f"task:{task_id}:status", "FAILED")
                print(f"[Consumer] Task {task_id} failed with code {return_code}.")
        else:
            print("[Consumer] Queue empty or timeout reached. Exiting...")
            break

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Redis queue consumer")
    parser.add_argument('-q', '--queue-name', help="Redis queue name", default="task_queue")
    parser.add_argument('-t', '--timeout', help="Timeout in seconds for queue blocking", default=0, type=int)

    args = parser.parse_args()

    redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)

    queue_name = args.queue_name
    timeout = args.timeout

    dequeue(redis_client, queue_name, timeout)

producer.py

import redis
import argparse
import sys

def enqueue(redis_client, queue_name, item):
    # 작업 큐에 task push
    redis_client.lpush(queue_name, item)
    print(f"[Producer] Enqueued item: {item}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('-q', '--queue_name', help="redis queue name", default="task_queue", required=True)
    # consumer가 실제로 실행할 명령어 입력 (ex: nohup python train.py ...)
    parser.add_argument('--c', '--command', dest='command', help="CLI command", default="ls", required=True)

    args = parser.parse_args()

    redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)

    queue_name = args.queue_name
    cli_command = args.command  

    # 작업 적재
    enqueue(redis_client, queue_name, cli_command)

monitor.py

monitor.py 를 실행하면
현재 어떤 task 가 실행되고 있는 지, 죽은 프로세스가 있다면 어떤 건지 , pid 등을 알 수 있다.

import redis
import subprocess
import re

def is_process_alive(pid):
    #간단히 ps 명령어를 통해 프로세스 생존 여부 확인 
    try:
        # ps -p <pid> 명령어로 프로세스 존재 여부 확인
        result = subprocess.run(["ps", "-p", str(pid)], capture_output=True, text=True)
        # returncode == 0이면 프로세스 존재
        return (result.returncode == 0)
    except Exception as e:
        print(f"[Monitor] Error checking process: {e}")
        return False

def kill_process(pid):
    try:
        subprocess.run(["kill", "-9", str(pid)], check=True)
        print(f"[Monitor] Killed PID {pid} by force.")
    except Exception as e:
        print(f"[Monitor] Failed to kill PID {pid}: {e}")

def monitor_redis_tasks():
    r = redis.StrictRedis(host='localhost', port=6379, db=0)

    cursor = 0
    running_tasks = []
    while True:
        cursor, keys = r.scan(cursor=cursor, match="task:*:status")
        for key in keys:
            status = r.get(key).decode()
            if status == "RUNNING":
                task_id = key.decode().split(":")[1]

                pid_key = f"task:{task_id}:pid"
                pid_value = r.get(pid_key)
                if pid_value is None:
                    print(f"[Monitor] Task {task_id} RUNNING, but no PID found.")
                    continue
                pid_value = pid_value.decode()

                # 프로세스 살아있는지 확인
                if is_process_alive(pid_value):
                    print(f"[Monitor] Task {task_id} is still running (PID={pid_value}).")
                else:
                    print(f"[Monitor] Task {task_id} marked RUNNING, but process is dead.")
                    # 상태를 FAILED로 업데이트
                    r.set(f"task:{task_id}:status", "FAILED")
        if cursor == 0:
            break
    
    print("[Monitor] Monitoring done.")

if __name__ == "__main__":
    monitor_redis_tasks()

사용 방법

1. redis server 실행

redis-server

2. consumer 실행 (작업 대기)

nohup python consumer.py -q task_queue > output.log 2>&1 &

"consumer is waiting for tasks.." 가 뜨면 작업을 기다리는 상태.

3. producer 실행

작업 예시 명령어

# 예시 
python producer.py -q task_queue \
   --c "nohup python train.py --data_dir /path/to/data > output.log 2>&1 &"    

-q 또는 --queue_name으로 Redis 큐의 이름을 입력받는다.
--c / --command 를 통해서 Consumer가 실행할 명령어를 입력받는다.

현재 작업 queue 에 몇개의 task 가 남아있는 지 확인하는 명령어

redis-cli LLEN task_queue

queue에 들어있는 task 목록 확인 명령어

redis-cli LRANGE task_queue 0 -1

0개의 댓글