추론 시간이 오래 걸리는 모델 서비스를 위한 Celery 구성을 기록한 글입니다.
더 좋은 방법이 있으면 알려주세요!
redis:
image: redis
container_name: redis
restart: always
rabbitmq:
image: rabbitmq
container_name: rabbitmq
restart: always
worker:
build: .
container_name: worker
restart: always
command: "celery -A app.worker.celery_worker worker --without-heartbeat --without-mingle -P solo -l info -E --concurrency=1 -Ofair"
depends_on:
- redis
- rabbitmq
prometheus:
image: prom/prometheus
container_name: prometheus
restart: always
volumes:
- ./prometheus/:/etc/prometheus/
command:
- '--config.file=/etc/prometheus/prometheus.yml'
ports:
- 9090:9090
flower:
image: mher/flower
container_name: flower
environment:
- CELERY_BROKER_URL=amqp://guest@rabbitmq:5672//
- FLOWER_PORT=7777
ports:
- 7777:7777
grafana:
image: grafana/grafana
container_name: grafana
restart: always
depends_on:
- prometheus
ports:
- "3000:3000"
from celery import Celery
app = Celery("worker", backend="redis://redis:6379", broker="amqp://guest@rabbitmq:5672//")
# https://ichi.pro/ko/dib-leoning-algolijeum-eul-seobiseulo-silhaeng-117261469126622
app.conf.update(
broker_pool_limit=None,
task_acks_late=True,
broker_heartbeat=None,
worker_prefetch_multiplier=1,
task_track_started=True,
worker_send_task_events=True,
task_send_sent_event=True,
result_expires=86400, # one day,
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="Asia/Seoul",
enable_utc=False
)
from celery import uuid
def celery_start():
task_id = uuid()
celery_test.apply_async(...),task_id=task_id)
return task_id
apply_async()를 이용하여 uuid로 생성한 task_id를 이용하여 실행된 함수의 상태가 backend로 지정한 redis에 저장되며 우리는 task_id를 이용하여 task info를알수있다.
from celery.contrib.abortable import AbortableTask
@app.task(bind=True, max_retries=0, base=AbortableTask)
def celery_test():
...
from celery.result import AsyncResult
def celery_state(task_id):
task = AsyncResult(id=task_id, app=app)
return {'state':task.state, 'info'=task.info}
task_id를 이용하여 celery의 상태값인 STARTED, SUCCESS, PENDING, REVOKED 같은 정보를 알 수 있다. SUCCESS상태면 추론 완료, REVOKED 상태면 추론 중단 등 활용이 가능하다.
from celery.contrib.abortable import AbortableAsyncResult
from celery.result import AsyncResult
def celery_stop(task_id):
try:
AbortableAsyncResult(task_id).abort()
AsyncResult(id=task_id).revoke(terminate=True, signal="SIGKILL")
log.info("task kill")
return Response("success task kill")
except Exception:
return Response("fail task kill")
사용자가 추론시간이 긴 작업을 수행중 중단을 할 경우 celery는 계속 돌아가고 있기 떄문에 task를 중단 해줘야함
💡 Flower는 따로 설정 안하셔도 됩니다.
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: flower
static_configs:
- targets: ['flower:5555']
localhost:9090 에서 flower가 검색되는지 확인합니다. flower가 검색이 안되면 각 설정의 URL들을 다시 확인합니다.
Prometheus Integration - Flower 1.0.1 documentation
💡 celery-monitoring-grafana-dashboard.json 파일은 밑 주소에서 확인할 수 있습니다.
https://github.com/mher/flower/blob/master/examples/celery-monitoring-grafana-dashboard.json
위 Grafana 설정대로 따라하셨으면 dashboard결과를 확인할 수 있습니다.
https://flower.readthedocs.io/en/latest/prometheus-integration.html
https://www.cloudamqp.com/docs/celery.html
https://daeguowl.tistory.com/157