: 방대한 양의 메시지를 처리하는 간단하고 유연하고 신뢰도 있는 분산시스템. 작업 큐(Task Queue)를 사용한 비동기 작업을 위해 사용할 수 있다.
: 말 그대로 태스크가 존재하는 큐이다. Celery는 작업 큐를 항상 모니터링하며 새로운 작업이 추가되는 경우 해당 작업을 실행한다.
: Celery에서는 메시지를 통해 통신하며 Broker가 작업을 명령하는 클라이언트와 작업을 수행하는 worker사이를 중재한다. 즉, 클라이언트가 작업 큐에 작업을 넣으면 Broker가 woker에게 메시지를 전달한다.
RabbitMQ v.s Redis
- 속도: Redis
→ 메모리에서 처리하므로 Redis가 훨씬 빠르다.
Redis는 초당 수백만개의 메시지 처리 가능, RabbitMQ는 초당 수만개 처리 가능- 안정성: RabbitMQ
→ Redis의 경우 서버에서 장애가 발생하면 읽지 않은 데이터가 손실될 위험이 있다.- 가용성: RabbitMQ
→ RabbitMQ는 클러스터링이 가능한 반면 Redis는 특정 버전 이상에서만 지원한다.
: Task의 상태를 추적하기 위해 Celery가 결과를 저장하는 장소를 의미한다.
SQLAlchemy, Django ORM, MongoDB, Memcached, Redis, RPC를 포함해 사용자가 직접 정의할 수도 있다.
Result Backend와 Message Broker를 같게 설정해도된다.
config 등 settings.py
가 있는 폴더에 celery.py
를 추가한다.
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
app = Celery('config')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')
settings.py에 celery에 대한 설정을 진행한다.
CELERY_TIMEZONE = 'UTC'
CELERY_TASK_TRACK_STARTED = True
CELERY_BROKER_URL = f'redis://:localhost6379/0'
CELERY_RESULT_BACKEND = f'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
__inin__.py
를 설정한다.
from .celery import app as celery_app
__all__ = ('celery_app',)
@shared_task
데코레이터가 앱을 사용할 수 있도록 앱이 로드 된다.원하는 Django app에 tasks.py
를 구현한다.
import os
from datetime import datetime
from pathlib import Path
from .models import LearningLog
from celery import shared_task
def delete_on_failure(self, exc, task_id, args, kwargs, einfo):
print(kwargs)
learning_log_id = kwargs.get("learning_log_id")
print(f"delete {learning_log_id}")
LearningLog.objects.filter(pk=learning_log_id).delete()
@shared_task(on_failure=delete_on_failure)
def async_train_model(*args, **kargs):
"""비동기 로직 실행"""
return f"model training finished. {learning_log_id}"
celery.Task에 정의되어 있으며 이를 override하는 것과 같다.
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Error handler.
This is run by the worker when the task fails.
Arguments:
exc (Exception): The exception raised by the task.
task_id (str): Unique id of the failed task.
args (Tuple): Original arguments for the task that failed.
kwargs (Dict): Original keyword arguments for the task that failed.
einfo (~billiard.einfo.ExceptionInfo): Exception information.
Returns:
None: The return value of this handler is ignored.
"""
작성한 작업을 호출한다.
@api_view(["Post"])
def start_train(request):
learning_log = LearningLog.objects.create(file_path=BASE_DIR, in_use=False)
task = async_train_model.delay(
learning_log_id=learning_log.learning_log_id,
save_path=SAVE_PATH,
json_path=JSON_PATH,
default_model_path=DEFAULT_MODEL_PATH,
)
return Response({"message": "model training started.", "task_id": task.id})