[4th TeamProject] - Celery

장현웅·2023년 10월 27일
0

Celery에 대해 알아보기 전에 먼저 Task Cue에 대해서 알아보겠습니다.


테스트 큐

테스크 큐는 컴퓨터 프로그램에서 작업을 효율적으로 분배하고 처리하는 데 사용되는 메커니즘 중 하나로, 보통 스레드(Thread)나 컴퓨터 간에 작업을 나누는 데 사용됩니다. 주로 비동기적인 작업 처리와 병렬 처리를 지원하기 위해 사용됩니다.

스레드(Thread)는 컴퓨터 프로그램의 실행 흐름 또는 작업 단위를 나타내는 개념입니다. 스레드는 프로세스 내에서 실행되는 독립적인 작업 단위로, 여러 스레드가 하나의 프로세스 내에서 동시에 실행될 수 있습니다.

테스크 큐는 작업의 모음을 처리하는데 사용되며, 이 작업들은 "테스크(Task)"라고 불립니다.

테스크 큐는 작업(테스크)을 저장하는 대기열 형태의 자료 구조입니다. 이 큐에 작업이 추가되면, 이러한 작업은 순서대로 대기하게 됩니다.

테스크 큐에는 여러 종류의 작업(테스크)이 들어갈 수 있으며, 이러한 작업은 실행을 기다리는 상태입니다.

작업자 프로세스작업 큐에 대기 중인 작업(테스크)을 가져와서 처리하고, 작업을 완료한 후 결과를 반환하거나 다음 단계의 작업을 시작할 수 있습니다. 작업자 프로세스는 작업 큐에서 작업을 꺼내와서 실행하는 프로세스 또는 스레드입니다. 작업자 프로세스는 지속적으로 큐를 확인하고 작업 큐에 새 작업이 추가되면 이를 감지하고 처리를 시작합니다.

이러한 메커니즘을 사용하면 여러 작업이 동시에 처리될 수 있으며, 작업자 프로세스작업 큐에 있는 작업을 비동기적으로 처리하므로 전체 시스템이 빠르게 응답하고 효율적으로 동작할 수 있습니다.

쉽게 설명하면 우리가 여행을 계획할 때 비행기 스케줄 확인, 비행기 티켓 가격 조회, 비행 예약, 예약 확인 등 여러 작업이 순차적으로 이뤄져야 합니다. 이때, 각 작업(테스크)을 테스크 큐에 넣어두고 여러 작업자들에게 작업을 할당하면 작업자는 테스크 큐를 모니터링하고 작업을 수행합니다. 이렇게 작업자와 작업 큐를 사용하여 여러 작업을 효율적으로 분배하고 처리할 수 있습니다.


Celery

Celery는 테스크 큐 시스템의 한 예입니다.

Celery메시지를 통해 통신하며 일반적으로 브로커를 사용하여 클라이언트작업자 간의 통신을 중계합니다. 클라이언트가 작업을 시작하려면 메시지를 큐에 추가하고, 브로가 해당 메시지를 작업자에 전달합니다. Celery 시스템은 여러 작업자와 브로커로 구성될 수 있습니다.

만약 항공 사이트에서 비행편을 검색하는 작업을 큐에 추가한다면, 다수의 작업자들 중 하나가 이 작업을 수행합니다.

Celery는 작업을 조직하고 처리하기 위해 메시지를 보내고 받기 위한 메시지 전송 방식이 필요합니다. RabbitMQRedis는 Celery에서 사용되는 브로커(메시지 중계기) 전송 방식으로, 다음과 같이 작동합니다.

브로커 선택

  • RabbitMQ
    이 브로커는 메시지 큐 시스템으로, 작업자(worker)와 클라이언트 간의 효율적인 통신을 도와줍니다. 작업을 요청하는 클라이언트는 메시지를 큐(Queue)에 넣고, 작업자는 이 큐에서 메시지를 가져와 작업을 수행합니다. RabbitMQ는 메시지를 안정적으로 전달하고 저장할 수 있는 기능을 제공하여 데이터의 무결성을 보장합니다.

    • 설치

      • Ubuntu or Debian를 사용하는 경우 : sudo apt-get install rabbitmq-server
      • Docker에서 실행 : docker run -d -p 5672:5672 rabbitmq

      명령이 완료되면, Starting rabbitmq-server: SUCCESS 이런 메시지를 볼 수 있습니다. 브로커(RabbitMQ)는 이미 실행 중이며 메시지를 이동할 준비가 되어 있습니다.

  • Redis
    Redis는 빠르고 가벼운 메모리 기반 데이터 스토어로, Redis를 사용하면 작업 큐와 메시지 브로커로 사용하여 메시지를 신속하게 처리할 수 있습니다. Redis는 분산 환경에서도 잘 작동하며, 메모리를 활용하여 높은 성능을 제공합니다. Redis는 갑작스러운 종료나 정전이 발생할 경우 데이터 손실에 더 취약합니다.

    • 설치
      • Docker에서 실행 : docker run -d -p 6379:6379 redis

셀러리 설치

pip install celery

Django 프로젝트에서 Celery 연동

프로젝트 디렉토리 레이아웃은 아래와 같습니다.

- proj/
  - manage.py
  - proj/
    - __init__.py
    - settings.py
    - urls.py
  - app1/
    - tasks.py
    - models.py
  - app2/
    - tasks.py
    - models.py
    

[ init.py ]

from .celery import app as celery_app           # 이 코드는 Django 프로젝트 내부의 celery.py 또는 celery와 같은 파일에서 정의된 Celery 인스턴스(app 객체)를 가져옵니다. = Django와 Celery를 연동하는 단계 
                                                # 그런 다음, shared_task 데코레이터를 이용하여 Celery가 수행할 작업들을 import할 필요 없이 가져올 수 있습니다.
__all__ = ('celery_app',)

작업 생성 및 등록

가장 먼저 필요한 것은 Celery 인스턴스입니다. Celery 애플리케이션이라고 부르거나 줄여서 앱이라고 부릅니다. 이 인스턴스는 작업 생성, 작업자 관리 등 Celery에서 수행하려는 모든 작업의 시작점으로 사용되므로 다른 모듈에서 이를 가져올 수 있어야 합니다.

  1. Django 프로젝트에서 Celery를 설정합니다. 프로젝트 루트 디렉토리에 celery.py 파일을 만들고 Celery 인스턴스 = app을 생성합니다.

[ settings.py ]

CELERY_BROKER_URL = 'amqp://guest@localhost:5672//' # Celery와 같은 작업 대기열 시스템에서 작업을 비동기적으로 처리하기 위해 활용되는 메시지 브로커입니다.

CELERY_RESULT_BACKEND = 'rpc://'                    # 정의한 url은 RabbitMQ 메시지 브로커 서버에 연결하는데 사용되는 URL입니다.
                                                    # Celery 작업의 결과를 저장하는 백엔드 저장소를 설정하는 데 사용되는 환경 변수 또는 설정 옵션입니다. 이 설정은 Celery가 비동기 작업을 처리하고 작업의 결과를 저장하는 방법을 지정합니다.
CELERY_RESULT_EXPIRES = 3600                        # rpc://로 설정하면 작업 결과가 메시지 브로커에 저장되며 작업 수행자는 해당 결과를 검색합니다.
                                                    # 작업 결과를 설정된 시간(초) 동안 보관합니다.
import os
from django.conf import settings
from celery import Celery


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'facebug_back_end.settings')                                                            # Django 프로젝트의 설정 모듈을 지정하기 위한 코드입니다. Celery 설정 파일인 celery.py에서 Django 설정을 참조하여 Celery 설정을 Django와 함께 사용하기 위해 추가해줍니다.

app = Celery('project_name', broker=settings.CELERY_BROKER_URL, backend=settings.CELERY_RESULT_BACKEND, include=['user.tasks'])     # ask 모듈을 지정하여 Celery가 해당 모듈에서 task를 찾고 처리할 수 있도록 하고 작업을 백그라운드에서 진행할 수 있도록 메시지 브로커와 작업 결과 저장 백엔드 설정을 적용한 Celery 인스턴스(app)을 생성합니다.

app.config_from_object('django.conf:settings', namespace='CELERY')                                                                      # Django 설정에서 이름이 'CELERY_'로 시작하는 Celery 설정을 가져오는 경로를 지정합니다.

app.conf.update(result_expires=settings.CELERY_RESULT_EXPIRES,)                                                                         # Celery 애플리케이션의 설정 중 'result_expires' 옵션을 Django 설정 파일(settings.py)에서 가져온 값으로 업데이트하는 부분입니다. 'result_expires' : Celery 작업 결과가 저장될(유효한) 시간을 지정합니다.


if __name__ == '__main__':                                                                                                              # 현재 스크립트가 직접 실행될 때 Celery 애플리케이션을 시작합니다. Celery 애플리케이션을 시작하면 이 애플리케이션은 백그라운드에서 작업을 처리할 준비가 됩니다.
    app.start()

app.autodiscover_tasks()                                                                                                                # Django에서 Celery 작업 모듈을 자동으로 찾고 등록하는 메서드입니다. 이 메서드를 사용하면 Celery 애플리케이션에 대한 작업 모듈을 명시적으로 지정하지 않고도 Django 애플리케이션 내에서 사용 가능한 모든 작업을 자동으로 등록할 수 있습니다.
  1. 이제 Celery 작업을 정의할 Django 앱의 tasks.py 파일을 만듭니다.
from celery import shared_task
from django.core.mail import send_mail


@shared_task                                                                                # '@shared_task' : Django에서 Celery 작업을 정의하는 데 사용되는 데코레이터입니다. Celery를 통해 백그라운드에서 실행되어야 하는 비동기 작업을 만들 때 사용됩니다.
def send_verification_email(user_id, verification_url, recipient_email):                    # 이 작업은 Celery worker에서 실행될 때 백그라운드에서 비동기로 실행됩니다. 
    subject = '이메일 확인 링크'
    message = f'이메일 확인을 완료하려면 다음 링크를 클릭하세요: {verification_url}'
    from_email = 'hyeonwoongjang01@gmail.com'
    print(f'subject:{subject}')
    send_mail(subject, message, from_email, [recipient_email])
  • @shared_task 데코레이터
    작업은 앱에 tasks.py에 정의되어 있을 것입니다. 이 앱은 프로젝트 자체에 의존할 수 없기 때문에 앱 인스턴스를 직접 가져올 수도 없습니다.

@shared_task 데코레이터를 사용하면 구체적인 앱 인스턴스 없이 작업을 생성할 수 있습니다.

결과 보관

작업 상태를 추적하려면 Celery가 상태를 어딘가에 저장하거나 보내야 합니다.

  • 상태
    Celery는 작업의 현재 상태를 추적하고 이를 결과 백엔드에 저장합니다. 상태 정보와 결과는 결과 백엔드에 저장되며, 기본적으로는 RabbitMQ, Redis, 또는 다른 지원되는 메시지 브로커를 사용하여 관리됩니다. 결과 백엔드에 저장되는 정보에는 다음과 같은 내용이 포함될 수 있습니다

    • 작업 상태(States): 작업이 큐에서 대기 중, 실행 중, 완료되었거나 실패했는지를 나타내는 상태 정보가 저장됩니다. 예를 들어, 작업이 "PENDING" (대기 중), "STARTED" (실행 중), "SUCCESS" (성공) 또는 "FAILURE" (실패)와 같은 상태를 가질 수 있습니다.
    • 작업 결과(Result): 작업의 실행 결과가 저장됩니다. 이것은 작업이 성공적으로 완료된 경우 결과 값이고, 실패한 경우 예외 및 역추적(traceback) 정보가 포함될 수 있습니다.
    • 작업 메타데이터(Metadata): 작업에 대한 추가 메타데이터도 결과 백엔드에 저장될 수 있으며, 이는 작업 관련 정보를 포함합니다
  • 결과 백엔드
    작업을 추적하고 싶거나 반환 값이 필요한 경우 Celery는 나중에 검색할 수 있도록 상태를 어딘가에 저장하거나 보내야 합니다. SQLAlchemy/Django ORM, Memcached, RabbitMQ/QPid( )Redis 중에서 선택할 수 있는 여러 내장 결과 백엔드가 있습니다.

    • RPC 결과 백엔드(RabbitMQ/QPid)
      RPC 결과 백엔드( rpc:// )는 실제로 상태를 저장 하지 않고 메시지로 보내기 때문에 특별합니다. 이는 결과가 한 번만 검색될 수 있고 작업을 시작한 클라이언트에 의해서만 검색될 수 있다는 점에서 중요한 차이점입니다. 메시지는 기본적으로 일시적(비지속적)이므로 브로커가 다시 시작되면 결과가 사라집니다.

      • 상태를 임시 메시지로 다시 보내는 rpc 결과 백엔드를 사용하려는 경우
        app = Celery('tasks', backend='rpc://', broker='pyamqp://')
      • 결과 백엔드로 Redis를 사용하고 싶지만 여전히 RabbitMQ를 메시지 브로커로 사용하려는 경우(인기 있는 조합)
        app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')
  • Django 프로젝트의 데이터베이스에 Celery 결과를 저장하고 싶은 경우

    • django-celery-results 라이브러리를 설치합니다

      pip install django-celery-results
    • settings.py에 설치한 라이브러리 추가

      INSTALLED_APPS = (
        ...,
        'django_celery_results',
      )
    • 데이터베이스 마이그레이션을 수행하여 Celery 데이터베이스 테이블을 생성

      python manage.py migrate django_celery_results
    • django-celery-results 백엔드를 사용하도록 Celery를 구성

      [ settings.py ]

        CELERY_RESULT_BACKEND = 'django-db'
      
        # 캐시 백엔드의 경우
        CELERY_CACHE_BACKEND = 'django-cache'
      
        # django의 CACHES 설정에 정의된 캐시를 사용할 수도 있습니다.
        CELERY_CACHE_BACKEND = 'default'
      
        CACHES = {
            'default': {
                'BACKEND': 'django.core.cache.backends.db.DatabaseCache',
                'LOCATION': 'my_cache_table',
            }
        }

Celery 작업자 프로세스 시작하기

Celery 프로그램을 실행하여 Celery 작업을 수행할 작업자(Worker)을 실행시킵니다.

celery -A project_name worker --loglevel=INFO

가동하면 아래와 같은 결과가 나옵니다.

-------------- celery@DESKTOP-L81ERLR v5.3.4 (emerald-rush)
--- ***** -----
-- ******* ---- Windows-10-10.0.19041-SP0 2023-10-25 01:29:26
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         facebug_back_end:0x180b8d9ba00        
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//      
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 12 
(eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery
 exchange=celery(direct) key=celery

[tasks]
  . user.tasks.send_verification_email

만약, 작업자(Worker)는 연결되었지만, Celery 작업을 인식하지 못한다면 Window환경 때문일 수도 있습니다. 그럴 때는, 아래 명령어로 실행하면 정상 실행됩니다.

pip install eventlet
celery -A <module> worker -l info -P eventlet

WindowsUnix 계열의 운영체제와는 다른 프로세스 처리 모델을 사용하므로 Celery와 같은 비동기 작업을 관리하기에 일부 문제가 발생할 수 있습니다.

eventlet은 비동기 이벤트 루프 라이브러리로, Celery와 함께 사용하여 Windows 환경에서 Celery 작업자(worker)를 실행하는 데 도움이 될 수 있습니다. eventlet을 설치하고 -P eventlet 플래그를 사용하여 Celery 작업자를 시작하면 Windows에서의 Celery 실행 문제를 완화할 수 있습니다.

이렇게 하면 Celery 작업자가 시작되고 등록된 작업을 실행할 수 있습니다.

작업 호출 (= 연결)

작업을 호출하려면 delay()메서드와 apply_async()메서드를 사용할 수 있습니다.

  • delay() 메서드
from .tasks import send_verification_email  # 작업을 정의한 모듈을 가져와야 합니다.

class RegisterView(APIView):
    def post(self, request):
        """사용자 정보를 받아 회원가입 합니다."""
        
        serializer = UserSerializer(data=request.data, context={'profile_img':request.FILES})
        
        if serializer.is_valid():
            user = serializer.save()
            
            # 이메일 확인 토큰 생성
            token = default_token_generator.make_token(user)
            uid = urlsafe_base64_encode(force_bytes(user.pk))
            
            # 이메일에 인증 링크 포함하여 보내기
            verification_url = f"http://127.0.0.1:8000/verify-email/{uid}/{token}/"
            
            send_verification_email.delay(user.id, verification_url, user.email)                        # 'delay' : Celery 작업을 예약하여 나중에 실행되도록 하는 메소드입니다. 'delay' 메소드를 호출하면 작업이 백그라운드에서 비동기적으로 실행됩니다.
            
            return Response({'message':'회원가입 성공'}, status=status.HTTP_201_CREATED)
        else:
            return Response({'message': serializer.errors}, status=status.HTTP_400_BAD_REQUEST)
  • apply_async() 메서드
from .tasks import send_verification_email  # 작업을 정의한 모듈을 가져와야 합니다.

send_verification_email.apply_async(args=(user.id, verification_url, user.email), countdown=10, priority=2)

# countdown 및 priority와 같은 .apply_async 메서드의 옵션을 사용하여 작업을 스케줄링하거나 우선 순위를 설정할 수 있습니다.

이렇게 하면 send_verification_email 작업을 10초 후에 실행하고 우선 순위를 2로 설정합니다.

이렇게 작업 큐와 Celery를 사용하여 Django 프로젝트에서 비동기 작업을 효과적으로 처리할 수 있는 방법에 대해 알아봤습니다.

1개의 댓글

comment-user-thumbnail
2023년 10월 30일

take cue와 celery이 무엇인지 어떻게 작동하는지에 대한 정리를 잘 해주셨네요👍

답글 달기