Celery - Django에서 비동기 작업을 해보자.

Junseo Jung·2024년 4월 18일
0
post-thumbnail

Celery?

: 방대한 양의 메시지를 처리하는 간단하고 유연하고 신뢰도 있는 분산시스템. 작업 큐(Task Queue)를 사용한 비동기 작업을 위해 사용할 수 있다.

Task Queue

: 말 그대로 태스크가 존재하는 큐이다. Celery는 작업 큐를 항상 모니터링하며 새로운 작업이 추가되는 경우 해당 작업을 실행한다.

Broker

: Celery에서는 메시지를 통해 통신하며 Broker가 작업을 명령하는 클라이언트와 작업을 수행하는 worker사이를 중재한다. 즉, 클라이언트가 작업 큐에 작업을 넣으면 Broker가 woker에게 메시지를 전달한다.

  • RabbitMQ : 목적 자체가 Message Broker이다. 데이터 생산자가 RabbitMQ에 데이터를 전송하면 해당 데이터는 규칙에 따라 해당 대기열로 라우팅된다. 해당 메시지는 데이터 소비자가 수신할 때 까지 대기열에 상주한다. 만약 대기열이 가득 차게 되면 새로운 메시지를 게시하지 못하게 한다.
  • Redis : key-value 데이터베이스로 메시지 브로커로 사용 시 데이터 생산자가 데이터를 전송하면 Redis가 해당 데이터에 관심있는 구독자를 식별한다. 이후 모든 구독자에게 메시지를 전송한다.

RabbitMQ v.s Redis

  • 속도: Redis
    → 메모리에서 처리하므로 Redis가 훨씬 빠르다.
    Redis는 초당 수백만개의 메시지 처리 가능, RabbitMQ는 초당 수만개 처리 가능
  • 안정성: RabbitMQ
    → Redis의 경우 서버에서 장애가 발생하면 읽지 않은 데이터가 손실될 위험이 있다.
  • 가용성: RabbitMQ
    → RabbitMQ는 클러스터링이 가능한 반면 Redis는 특정 버전 이상에서만 지원한다.

Result Backend

: Task의 상태를 추적하기 위해 Celery가 결과를 저장하는 장소를 의미한다.

SQLAlchemy, Django ORM, MongoDB, Memcached, Redis, RPC를 포함해 사용자가 직접 정의할 수도 있다.

  • Result Backend가 정의되면 AsyncResult를 통해 해당 Task의 상태를 확인할 수 있다.
  • Redis등으로 설정 후 GUI 툴을 활용해 결과를 확인할 수 있다.

Result Backend와 Message Broker를 같게 설정해도된다.


Djnago와 Celery

설정 방법

  1. config 등 settings.py가 있는 폴더에 celery.py를 추가한다.

    import os
    
    from celery import Celery
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
    
    app = Celery('config')
    
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    app.autodiscover_tasks()
    
    @app.task(bind=True, ignore_result=True)
    def debug_task(self):
        print(f'Request: {self.request!r}')
  2. settings.py에 celery에 대한 설정을 진행한다.

    CELERY_TIMEZONE = 'UTC'
    CELERY_TASK_TRACK_STARTED = True
    CELERY_BROKER_URL = f'redis://:localhost6379/0'
    CELERY_RESULT_BACKEND = f'redis://localhost:6379/0'
    CELERY_ACCEPT_CONTENT = ['application/json']
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_TASK_SERIALIZER = 'json'
  3. __inin__.py를 설정한다.

    from .celery import app as celery_app
    
    __all__ = ('celery_app',)
    • 위 과정들을 통해 Django가 시작될 때 @shared_task 데코레이터가 앱을 사용할 수 있도록 앱이 로드 된다.
  4. 원하는 Django app에 tasks.py를 구현한다.

    import os
    from datetime import datetime
    from pathlib import Path
    
    from .models import LearningLog
    from celery import shared_task
    
    def delete_on_failure(self, exc, task_id, args, kwargs, einfo):
        print(kwargs)
        learning_log_id = kwargs.get("learning_log_id")
        print(f"delete {learning_log_id}")
        LearningLog.objects.filter(pk=learning_log_id).delete()
    
    @shared_task(on_failure=delete_on_failure)
    def async_train_model(*args, **kargs):
        
        """비동기 로직 실행"""
        
        return f"model training finished. {learning_log_id}"
    • on_failure: 작업 실행 중 예외 발생 시 실행될 함수를 정의

      celery.Task에 정의되어 있으며 이를 override하는 것과 같다.

      Exception handling in Celery?

      def on_failure(self, exc, task_id, args, kwargs, einfo):
          """Error handler.
      
          This is run by the worker when the task fails.
      
          Arguments:
              exc (Exception): The exception raised by the task.
              task_id (str): Unique id of the failed task.
              args (Tuple): Original arguments for the task that failed.
              kwargs (Dict): Original keyword arguments for the task that failed.
              einfo (~billiard.einfo.ExceptionInfo): Exception information.
      
          Returns:
              None: The return value of this handler is ignored.
          """
  5. 작성한 작업을 호출한다.

    @api_view(["Post"])
    def start_train(request):
        learning_log = LearningLog.objects.create(file_path=BASE_DIR, in_use=False)
    
        task = async_train_model.delay(
            learning_log_id=learning_log.learning_log_id,
            save_path=SAVE_PATH,
            json_path=JSON_PATH,
            default_model_path=DEFAULT_MODEL_PATH,
        )
        return Response({"message": "model training started.", "task_id": task.id})

참고.

Getting Started — Celery 5.3.6 documentation

RabbitMQ와 Redis 비교 - 게시/구독 메시징 시스템 간의 차이점 - AWS

0개의 댓글

관련 채용 정보