Celery
에 대해 알아보기 전에 먼저 Task Cue
에 대해서 알아보겠습니다.
테스크 큐
는 컴퓨터 프로그램에서 작업을 효율적으로 분배하고 처리하는 데 사용되는 메커니즘 중 하나로, 보통 스레드(Thread)
나 컴퓨터 간에 작업을 나누는 데 사용됩니다. 주로 비동기적인 작업 처리와 병렬 처리를 지원하기 위해 사용됩니다.
스레드(Thread)
는 컴퓨터 프로그램의 실행 흐름 또는 작업 단위를 나타내는 개념입니다. 스레드는 프로세스 내에서 실행되는 독립적인 작업 단위로, 여러 스레드가 하나의 프로세스 내에서 동시에 실행될 수 있습니다.
테스크 큐
는 작업의 모음을 처리하는데 사용되며, 이 작업들은 "테스크(Task)
"라고 불립니다.
테스크 큐
는 작업(테스크
)을 저장하는 대기열 형태의 자료 구조입니다. 이 큐에 작업이 추가되면, 이러한 작업은 순서대로 대기하게 됩니다.
테스크 큐
에는 여러 종류의 작업(테스크
)이 들어갈 수 있으며, 이러한 작업은 실행을 기다리는 상태입니다.
작업자 프로세스
는 작업 큐
에 대기 중인 작업(테스크
)을 가져와서 처리하고, 작업을 완료한 후 결과를 반환하거나 다음 단계의 작업을 시작할 수 있습니다. 작업자 프로세스는 작업 큐에서 작업을 꺼내와서 실행하는 프로세스 또는 스레드입니다. 작업자 프로세스는 지속적으로 큐를 확인하고 작업 큐에 새 작업이 추가되면 이를 감지하고 처리를 시작합니다.
이러한 메커니즘을 사용하면 여러 작업이 동시에 처리될 수 있으며, 작업자 프로세스
는 작업 큐
에 있는 작업을 비동기적으로 처리하므로 전체 시스템이 빠르게 응답하고 효율적으로 동작할 수 있습니다.
쉽게 설명하면 우리가 여행을 계획할 때 비행기 스케줄 확인, 비행기 티켓 가격 조회, 비행 예약, 예약 확인 등 여러 작업이 순차적으로 이뤄져야 합니다. 이때, 각 작업(테스크
)을 테스크 큐
에 넣어두고 여러 작업자들에게 작업을 할당하면 작업자는 테스크 큐를 모니터링하고 작업을 수행합니다. 이렇게 작업자와 작업 큐를 사용하여 여러 작업을 효율적으로 분배하고 처리할 수 있습니다.
Celery
는 테스크 큐 시스템의 한 예입니다.
Celery
는 메시지
를 통해 통신하며 일반적으로 브로커
를 사용하여 클라이언트
와 작업자
간의 통신을 중계합니다. 클라이언트가 작업을 시작하려면 메시지를 큐에 추가하고, 브로가 해당 메시지를 작업자에 전달합니다. Celery 시스템은 여러 작업자와 브로커로 구성될 수 있습니다.
만약 항공 사이트에서 비행편을 검색하는 작업을 큐에 추가한다면, 다수의 작업자들 중 하나가 이 작업을 수행합니다.
Celery
는 작업을 조직하고 처리하기 위해 메시지를 보내고 받기 위한 메시지 전송 방식이 필요합니다. RabbitMQ
와 Redis
는 Celery에서 사용되는 브로커(메시지 중계기) 전송 방식으로, 다음과 같이 작동합니다.
RabbitMQ
이 브로커는 메시지 큐 시스템으로, 작업자(worker)와 클라이언트 간의 효율적인 통신을 도와줍니다. 작업을 요청하는 클라이언트는 메시지를 큐(Queue)에 넣고, 작업자는 이 큐에서 메시지를 가져와 작업을 수행합니다. RabbitMQ는 메시지를 안정적으로 전달하고 저장할 수 있는 기능을 제공하여 데이터의 무결성을 보장합니다.
설치
sudo apt-get install rabbitmq-server
docker run -d -p 5672:5672 rabbitmq
명령이 완료되면, Starting rabbitmq-server: SUCCESS
이런 메시지를 볼 수 있습니다. 브로커(RabbitMQ)는 이미 실행 중이며 메시지를 이동할 준비가 되어 있습니다.
Redis
Redis
는 빠르고 가벼운 메모리 기반 데이터 스토어로, Redis를 사용하면 작업 큐와 메시지 브로커로 사용하여 메시지를 신속하게 처리할 수 있습니다. Redis는 분산 환경에서도 잘 작동하며, 메모리를 활용하여 높은 성능을 제공합니다. Redis는 갑작스러운 종료나 정전이 발생할 경우 데이터 손실에 더 취약합니다.
docker run -d -p 6379:6379 redis
pip install celery
프로젝트 디렉토리 레이아웃은 아래와 같습니다.
- proj/
- manage.py
- proj/
- __init__.py
- settings.py
- urls.py
- app1/
- tasks.py
- models.py
- app2/
- tasks.py
- models.py
[ init.py ]
from .celery import app as celery_app # 이 코드는 Django 프로젝트 내부의 celery.py 또는 celery와 같은 파일에서 정의된 Celery 인스턴스(app 객체)를 가져옵니다. = Django와 Celery를 연동하는 단계
# 그런 다음, shared_task 데코레이터를 이용하여 Celery가 수행할 작업들을 import할 필요 없이 가져올 수 있습니다.
__all__ = ('celery_app',)
가장 먼저 필요한 것은 Celery 인스턴스입니다. Celery 애플리케이션이라고 부르거나 줄여서 앱이라고 부릅니다. 이 인스턴스는 작업 생성, 작업자 관리 등 Celery에서 수행하려는 모든 작업의 시작점으로 사용되므로 다른 모듈에서 이를 가져올 수 있어야 합니다.
[ settings.py ]
CELERY_BROKER_URL = 'amqp://guest@localhost:5672//' # Celery와 같은 작업 대기열 시스템에서 작업을 비동기적으로 처리하기 위해 활용되는 메시지 브로커입니다.
CELERY_RESULT_BACKEND = 'rpc://' # 정의한 url은 RabbitMQ 메시지 브로커 서버에 연결하는데 사용되는 URL입니다.
# Celery 작업의 결과를 저장하는 백엔드 저장소를 설정하는 데 사용되는 환경 변수 또는 설정 옵션입니다. 이 설정은 Celery가 비동기 작업을 처리하고 작업의 결과를 저장하는 방법을 지정합니다.
CELERY_RESULT_EXPIRES = 3600 # rpc://로 설정하면 작업 결과가 메시지 브로커에 저장되며 작업 수행자는 해당 결과를 검색합니다.
# 작업 결과를 설정된 시간(초) 동안 보관합니다.
import os
from django.conf import settings
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'facebug_back_end.settings') # Django 프로젝트의 설정 모듈을 지정하기 위한 코드입니다. Celery 설정 파일인 celery.py에서 Django 설정을 참조하여 Celery 설정을 Django와 함께 사용하기 위해 추가해줍니다.
app = Celery('project_name', broker=settings.CELERY_BROKER_URL, backend=settings.CELERY_RESULT_BACKEND, include=['user.tasks']) # ask 모듈을 지정하여 Celery가 해당 모듈에서 task를 찾고 처리할 수 있도록 하고 작업을 백그라운드에서 진행할 수 있도록 메시지 브로커와 작업 결과 저장 백엔드 설정을 적용한 Celery 인스턴스(app)을 생성합니다.
app.config_from_object('django.conf:settings', namespace='CELERY') # Django 설정에서 이름이 'CELERY_'로 시작하는 Celery 설정을 가져오는 경로를 지정합니다.
app.conf.update(result_expires=settings.CELERY_RESULT_EXPIRES,) # Celery 애플리케이션의 설정 중 'result_expires' 옵션을 Django 설정 파일(settings.py)에서 가져온 값으로 업데이트하는 부분입니다. 'result_expires' : Celery 작업 결과가 저장될(유효한) 시간을 지정합니다.
if __name__ == '__main__': # 현재 스크립트가 직접 실행될 때 Celery 애플리케이션을 시작합니다. Celery 애플리케이션을 시작하면 이 애플리케이션은 백그라운드에서 작업을 처리할 준비가 됩니다.
app.start()
app.autodiscover_tasks() # Django에서 Celery 작업 모듈을 자동으로 찾고 등록하는 메서드입니다. 이 메서드를 사용하면 Celery 애플리케이션에 대한 작업 모듈을 명시적으로 지정하지 않고도 Django 애플리케이션 내에서 사용 가능한 모든 작업을 자동으로 등록할 수 있습니다.
from celery import shared_task
from django.core.mail import send_mail
@shared_task # '@shared_task' : Django에서 Celery 작업을 정의하는 데 사용되는 데코레이터입니다. Celery를 통해 백그라운드에서 실행되어야 하는 비동기 작업을 만들 때 사용됩니다.
def send_verification_email(user_id, verification_url, recipient_email): # 이 작업은 Celery worker에서 실행될 때 백그라운드에서 비동기로 실행됩니다.
subject = '이메일 확인 링크'
message = f'이메일 확인을 완료하려면 다음 링크를 클릭하세요: {verification_url}'
from_email = 'hyeonwoongjang01@gmail.com'
print(f'subject:{subject}')
send_mail(subject, message, from_email, [recipient_email])
@shared_task
데코레이터tasks.py
에 정의되어 있을 것입니다. 이 앱은 프로젝트 자체에 의존할 수 없기 때문에 앱 인스턴스를 직접 가져올 수도 없습니다.@shared_task
데코레이터를 사용하면 구체적인 앱 인스턴스 없이 작업을 생성할 수 있습니다.
작업 상태를 추적하려면 Celery가 상태를 어딘가에 저장하거나 보내야 합니다.
상태
Celery는 작업의 현재 상태를 추적하고 이를 결과 백엔드에 저장합니다. 상태 정보와 결과는 결과 백엔드에 저장되며, 기본적으로는 RabbitMQ
, Redis
, 또는 다른 지원되는 메시지 브로커를 사용하여 관리됩니다. 결과 백엔드에 저장되는 정보에는 다음과 같은 내용이 포함될 수 있습니다
결과 백엔드
작업을 추적하고 싶거나 반환 값이 필요한 경우 Celery는 나중에 검색할 수 있도록 상태를 어딘가에 저장하거나 보내야 합니다. SQLAlchemy/Django ORM
, Memcached
, RabbitMQ/QPid( )
및 Redis
중에서 선택할 수 있는 여러 내장 결과 백엔드가 있습니다.
RPC 결과 백엔드(RabbitMQ/QPid)
RPC 결과 백엔드( rpc:// )는 실제로 상태를 저장 하지 않고 메시지로 보내기 때문에 특별합니다. 이는 결과가 한 번만 검색될 수 있고 작업을 시작한 클라이언트에 의해서만 검색될 수 있다는 점에서 중요한 차이점입니다. 메시지는 기본적으로 일시적(비지속적)이므로 브로커가 다시 시작되면 결과가 사라집니다.
app = Celery('tasks', backend='rpc://', broker='pyamqp://')
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')
Django 프로젝트의 데이터베이스에 Celery 결과를 저장하고 싶은 경우
django-celery-results
라이브러리를 설치합니다
pip install django-celery-results
settings.py
에 설치한 라이브러리 추가
INSTALLED_APPS = (
...,
'django_celery_results',
)
데이터베이스 마이그레이션을 수행하여 Celery 데이터베이스 테이블을 생성
python manage.py migrate django_celery_results
django-celery-results
백엔드를 사용하도록 Celery를 구성
[ settings.py ]
CELERY_RESULT_BACKEND = 'django-db'
# 캐시 백엔드의 경우
CELERY_CACHE_BACKEND = 'django-cache'
# django의 CACHES 설정에 정의된 캐시를 사용할 수도 있습니다.
CELERY_CACHE_BACKEND = 'default'
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.db.DatabaseCache',
'LOCATION': 'my_cache_table',
}
}
Celery 프로그램을 실행하여 Celery 작업을 수행할 작업자(Worker)을 실행시킵니다.
celery -A project_name worker --loglevel=INFO
가동하면 아래와 같은 결과가 나옵니다.
-------------- celery@DESKTOP-L81ERLR v5.3.4 (emerald-rush)
--- ***** -----
-- ******* ---- Windows-10-10.0.19041-SP0 2023-10-25 01:29:26
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: facebug_back_end:0x180b8d9ba00
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: rpc://
- *** --- * --- .> concurrency: 12
(eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery
exchange=celery(direct) key=celery
[tasks]
. user.tasks.send_verification_email
만약, 작업자(Worker)는 연결되었지만, Celery 작업을 인식하지 못한다면 Window환경 때문일 수도 있습니다. 그럴 때는, 아래 명령어로 실행하면 정상 실행됩니다.
pip install eventlet
celery -A <module> worker -l info -P eventlet
Windows
는 Unix
계열의 운영체제와는 다른 프로세스 처리 모델을 사용하므로 Celery와 같은 비동기 작업을 관리하기에 일부 문제가 발생할 수 있습니다.
eventlet
은 비동기 이벤트 루프 라이브러리로, Celery와 함께 사용하여 Windows 환경에서 Celery 작업자(worker)를 실행하는 데 도움이 될 수 있습니다. eventlet을 설치하고 -P eventlet
플래그를 사용하여 Celery 작업자를 시작하면 Windows에서의 Celery 실행 문제를 완화할 수 있습니다.
이렇게 하면 Celery 작업자가 시작되고 등록된 작업을 실행할 수 있습니다.
작업을 호출하려면 delay()메서드와 apply_async()메서드를 사용할 수 있습니다.
from .tasks import send_verification_email # 작업을 정의한 모듈을 가져와야 합니다.
class RegisterView(APIView):
def post(self, request):
"""사용자 정보를 받아 회원가입 합니다."""
serializer = UserSerializer(data=request.data, context={'profile_img':request.FILES})
if serializer.is_valid():
user = serializer.save()
# 이메일 확인 토큰 생성
token = default_token_generator.make_token(user)
uid = urlsafe_base64_encode(force_bytes(user.pk))
# 이메일에 인증 링크 포함하여 보내기
verification_url = f"http://127.0.0.1:8000/verify-email/{uid}/{token}/"
send_verification_email.delay(user.id, verification_url, user.email) # 'delay' : Celery 작업을 예약하여 나중에 실행되도록 하는 메소드입니다. 'delay' 메소드를 호출하면 작업이 백그라운드에서 비동기적으로 실행됩니다.
return Response({'message':'회원가입 성공'}, status=status.HTTP_201_CREATED)
else:
return Response({'message': serializer.errors}, status=status.HTTP_400_BAD_REQUEST)
from .tasks import send_verification_email # 작업을 정의한 모듈을 가져와야 합니다.
send_verification_email.apply_async(args=(user.id, verification_url, user.email), countdown=10, priority=2)
# countdown 및 priority와 같은 .apply_async 메서드의 옵션을 사용하여 작업을 스케줄링하거나 우선 순위를 설정할 수 있습니다.
이렇게 하면 send_verification_email
작업을 10초 후에 실행하고 우선 순위를 2로 설정합니다.
이렇게 작업 큐와 Celery를 사용하여 Django 프로젝트에서 비동기 작업을 효과적으로 처리할 수 있는 방법에 대해 알아봤습니다.
take cue와 celery이 무엇인지 어떻게 작동하는지에 대한 정리를 잘 해주셨네요👍