django-admin startproject mq_celery .
: Celery 를 사용해볼 장고를 띄움
pip install celery
: celery 설치
brew install rabbitmq
: 브로커 역할을 해줄 토끼mq
python manage.py startapp app1
mq_clerey/settings.py
에서 app1
을 추가해준다.
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'app1' # 이거 추가 안해줬다가 task에 잡히질 앉는 오류를 범했었다.
]
mq_celery/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mq_celery.settings') # celery instance 생성 전에 설정해줘야함
app = Celery('mq_celery') # celery instance 가 생성됨
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks() # 이러면 등록된 앱들의 task를 오토로 챙겨온다.
app.autodicover
에는 어떤 package, 어떤 파일에서 테스크를 가져올 지 인자로 설정할 수 있다.
force (bool) – By default this call is lazy so that the actual auto-discovery won’t happen until an application imports the default modules.
Forcing will cause the auto-discovery to happen immediately.
이 외에도 bool 속성으로 위의 내용을 설정할 수 있다는데, 내가 맞게 이해한건지는 모르겠다. autodiscover가 각 app들이 import 되고 나서 실행되는데 이걸 각 app들이 import 되기 전에 먼저 autodiscover 할 수 있다는 것 같다.
app1/tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
를 사용하면 app.task
와 같이 app에 종속적이지 않게 사용할 수 있다. 이로 인해 app 을 import해오지 않아도 된다.
celery -A mq_celery worker -l info
: celery 실행 tasks
에 테스크가 잡힌다.
임의로 테스크를 넣어주었다. 오타는 덤.
[2022-04-06 13:37:51,037: INFO/MainProcess] Task app1.tasks.add[677b0c4e-ab18-4a3c-a076-6bb0d5f20678] received
[2022-04-06 13:37:51,041: INFO/ForkPoolWorker-8] Task app1.tasks.add[677b0c4e-ab18-4a3c-a076-6bb0d5f20678] succeeded in 0.0006483820034191012s: 8
[2022-04-06 13:39:11,150: INFO/MainProcess] Task app1.tasks.add[1fcdab19-11fc-4c5f-8cf9-87df5525d4b4] received
[2022-04-06 13:39:22,370: INFO/ForkPoolWorker-8] Task app1.tasks.add[1fcdab19-11fc-4c5f-8cf9-87df5525d4b4] succeeded in 0.000147557002492249s: 6
[2022-04-06 13:39:32,711: INFO/MainProcess] Task app1.tasks.add[4bddfcb3-d309-41c8-9d41-03cc94ac1793] received
[2022-04-06 13:39:34,297: INFO/ForkPoolWorker-8] Task app1.tasks.add[4bddfcb3-d309-41c8-9d41-03cc94ac1793] succeeded in 0.00013701198622584343s: 6
결과는 잘 나오는군.
이메일 보내는 처리를 clerey task를 이용해본다.
# task.py
from celery import shared_task
from celery.utils.log import get_task_logger
from .email import send_review_email # 이메일 전송하는 코드를 만들어놓고 import 하였음
logger = get_task_logger(__name__)
@shared_task(name="send_reviw_email_task")
def send_review_email_task(name, email, review):
logger.info("Sent review email!!")
return send_review_email(name, email, review) # celery shared_task 로 wrapping 함
# form.py
from django import forms
from app2.tasks import send_review_email_task # 위 task 로 지정한 것을 import
class ReviewForm(forms.Form):
name = forms.CharField(
label='Firstname', min_length=4, max_length=50, widget=forms.TextInput(
attrs={'class': 'form-control mb-3', 'placeholder': 'Firstname', 'id': 'form-firstname'}))
email = forms.EmailField(
max_length=200, widget=forms.TextInput(
attrs={'class': 'form-control mb-3', 'placeholder': 'E-mail', 'id': 'form-email'}))
review = forms.CharField(
label="Review", widget=forms.Textarea(attrs={'class': 'form-control', 'rows': '5'}))
def send_email(self): # send 하게 되면 celery task로 enqueue 됨
send_review_email_task.delay(
self.cleaned_data['name'], self.cleaned_data['email'], self.cleaned_data['review'])
깃(https://github.com/jungbumwoo/mq_celery)