작업의 효율성을 높이기 위해 queue 에 task 를 저장해놓고, 현재 실행 중인 task 가 끝나면 queue 에 있는 다음 task 가 실행되도록 한다.
┌───────────┐ Redis ┌─────────────┐
│ producer │ → Queue → │ consumer │
└───────────┘ └─────────────┘
consumer.py : 큐에서 데이터 가져오기
producer.py : 데이터를 큐에 추가
monitor.py : 모니터링 하는 py 파일
import redis
import argparse
import subprocess
import uuid
import time
def dequeue(redis_client, queue_name, timeout):
print("[Consumer] Waiting for tasks...")
while True:
task = redis_client.brpop(queue_name, timeout)
if task:
_, command = task
command = command.decode() # 바이트 -> 문자열
# 각 명령어를 고유 ID와 함께 추적
task_id = str(uuid.uuid4())
print(f"[Consumer] Picked Task ID: {task_id}")
print(f"[Consumer] Executing Command: {command}")
# 1) RUNNING 상태 설정
redis_client.set(f"task:{task_id}:command", command)
redis_client.set(f"task:{task_id}:status", "RUNNING")
# 2) 비동기 실행
process = subprocess.Popen(command, shell=True)
pid = process.pid
redis_client.set(f"task:{task_id}:pid", pid)
# 3) 프로세스 종료 대기 (동기이지만, popen 자체는 내부적으로 비동기로 동작)
return_code = process.wait()
if return_code == 0:
redis_client.set(f"task:{task_id}:status", "DONE")
print(f"[Consumer] Task {task_id} completed successfully.")
else:
redis_client.set(f"task:{task_id}:status", "FAILED")
print(f"[Consumer] Task {task_id} failed with code {return_code}.")
else:
print("[Consumer] Queue empty or timeout reached. Exiting...")
break
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Redis queue consumer")
parser.add_argument('-q', '--queue-name', help="Redis queue name", default="task_queue")
parser.add_argument('-t', '--timeout', help="Timeout in seconds for queue blocking", default=0, type=int)
args = parser.parse_args()
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
queue_name = args.queue_name
timeout = args.timeout
dequeue(redis_client, queue_name, timeout)
import redis
import argparse
import sys
def enqueue(redis_client, queue_name, item):
# 작업 큐에 task push
redis_client.lpush(queue_name, item)
print(f"[Producer] Enqueued item: {item}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-q', '--queue_name', help="redis queue name", default="task_queue", required=True)
# consumer가 실제로 실행할 명령어 입력 (ex: nohup python train.py ...)
parser.add_argument('--c', '--command', dest='command', help="CLI command", default="ls", required=True)
args = parser.parse_args()
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
queue_name = args.queue_name
cli_command = args.command
# 작업 적재
enqueue(redis_client, queue_name, cli_command)
monitor.py 를 실행하면
현재 어떤 task 가 실행되고 있는 지, 죽은 프로세스가 있다면 어떤 건지 , pid 등을 알 수 있다.
import redis
import subprocess
import re
def is_process_alive(pid):
#간단히 ps 명령어를 통해 프로세스 생존 여부 확인
try:
# ps -p <pid> 명령어로 프로세스 존재 여부 확인
result = subprocess.run(["ps", "-p", str(pid)], capture_output=True, text=True)
# returncode == 0이면 프로세스 존재
return (result.returncode == 0)
except Exception as e:
print(f"[Monitor] Error checking process: {e}")
return False
def kill_process(pid):
try:
subprocess.run(["kill", "-9", str(pid)], check=True)
print(f"[Monitor] Killed PID {pid} by force.")
except Exception as e:
print(f"[Monitor] Failed to kill PID {pid}: {e}")
def monitor_redis_tasks():
r = redis.StrictRedis(host='localhost', port=6379, db=0)
cursor = 0
running_tasks = []
while True:
cursor, keys = r.scan(cursor=cursor, match="task:*:status")
for key in keys:
status = r.get(key).decode()
if status == "RUNNING":
task_id = key.decode().split(":")[1]
pid_key = f"task:{task_id}:pid"
pid_value = r.get(pid_key)
if pid_value is None:
print(f"[Monitor] Task {task_id} RUNNING, but no PID found.")
continue
pid_value = pid_value.decode()
# 프로세스 살아있는지 확인
if is_process_alive(pid_value):
print(f"[Monitor] Task {task_id} is still running (PID={pid_value}).")
else:
print(f"[Monitor] Task {task_id} marked RUNNING, but process is dead.")
# 상태를 FAILED로 업데이트
r.set(f"task:{task_id}:status", "FAILED")
if cursor == 0:
break
print("[Monitor] Monitoring done.")
if __name__ == "__main__":
monitor_redis_tasks()
1. redis server 실행
redis-server
2. consumer 실행 (작업 대기)
nohup python consumer.py -q task_queue > output.log 2>&1 &
"consumer is waiting for tasks.." 가 뜨면 작업을 기다리는 상태.
3. producer 실행
작업 예시 명령어
# 예시
python producer.py -q task_queue \
--c "nohup python train.py --data_dir /path/to/data > output.log 2>&1 &"
-q 또는 --queue_name으로 Redis 큐의 이름을 입력받는다.
--c / --command 를 통해서 Consumer가 실행할 명령어를 입력받는다.
현재 작업 queue 에 몇개의 task 가 남아있는 지 확인하는 명령어
redis-cli LLEN task_queue
queue에 들어있는 task 목록 확인 명령어
redis-cli LRANGE task_queue 0 -1