Celery + Redis

Nam Eun-Ji·2020년 11월 27일
4
post-custom-banner

celery를 이해하기 위해 간단한 예제를 실행해보았다.
qna를 작성하면 qna 내용이 메일로 전달되는 기능이다.
celery를 이용하기 전에는 return될 때까지 약 8초정도 걸리는 작업이 1초도 안 걸려 끝났다.




Celery

python으로 작성된 비동기 작업 큐(Asynchronous task queue/job queue)

웹 서비스를 하면서 응답을 받기 오래 걸리는 작업이 종종 있다. 그럴 경우 사용자는 응답을 받기위해 오랜 시간을 기다려야 한다. 보통 웹 서비스에서 응답 시간은 서비스의 생명과 직결되므로 비동기로 작업을 처리하게 넘기고 바로 응답을 하는 경우가 많다. celery는 그 작업을 할 수 있도록 도와주는 파이썬 프레임워크로 보통 이런 프레임워크를 worker라고 부른다.

Celery는 task(작업)를 broker에게 전달하면 하나 이상의 워커가 이를 처리하는 구조이다.
때문에 celery를 사용하기 위해서는 작업 요청을 받을 브로커가 필요하다. 여기서 브로커란 요청한 작업을 담아두는 큐이고 담아둔 요청을 여러개의 worker에게 적절히 분배한다.

  • Celery는 보통 클라이언트와 워커 사이를 중재하는 브로커를 사용하여 메시지를 통해 통신한다.
  • task를 시작하기 위해 클라이언트는 큐에 메시지를 추가하고, 브로커는 워커에게 메시지를 전달한다.
  • Celery 시스템은 여러 워커와 브로커들로 구성될 수 있고, 고가용성 및 수평적 확장이 가능하다.
  • Celery는 메시지를 주고 받기 위해 메시지 전송이 필요하다.

비동기 작업의 사례, 메시지 브로커의 필요성

비동기 작업이 필수적인 곳이 있는데, 바로 은행 전산 시스템이다. 전국의 은행 지점에서 돈이 오고 가는 것이 전산적으로 어딘가의 중앙 시스템에 기록이 된다. 동시다발적으로 거래가 일어날 텐데, 전산 작업 중 일부가 충돌해서 문제가 발생한다면 은행 입장에서는 재앙과 같은 일이 된다. 이런 위험을 해소해주는 것이 메시지 브로커를 이용한 비동기 작업 큐를 구축하는 것이다.

queue라는 단어 자체가 컨베이어 벨트처럼 작업을 기다리는 줄을 의미한다. 모든 은행 지점에서 거래에 대한 전산 입력을 할 때마다 중앙 시스템의 메시지 큐에 순차적으로 작업이 등록시키고, 중앙 시스템은 큐에 등록된 작업을 차례대로 수행하면 충돌의 위험을 많이 줄일 수 있는 것이다. 이러한 메시지 큐 시스템과 여기에 작업을 전달해주는 시스템을 브로커라고 부르는 것이다.

task queue란?

  • 작업 대기열
  • 스레드 또는 머신에 작업을 분산시키기 위한 메커니즘.
  • task라는 하나의 작업단위를 입력으로 받는다.
  • 전담 워커 프로세스는 새로운 작업을 수행하기 위해 task queue를 지속적으로 모니터링한다.



예제 실습

1. Django 초기환경을 셋팅해준다.

참고 링크


2. Celery를 반영하기 전 기본 기능을 정의해준다.

해당 예제는 qna POST request를 받으면 해당 내용을 메일로 발송해주는 예제이다.

  1. 앱 생성 및 기본 셋팅
    해당 프로젝트는 qna 라고 생성하였다.
python manage.py startapp qna
# practice_celery/practice_celery/settings.py

INSTALLED_APPS = [
    ...,
    'qna',
]
# practice_celery/practice_celery/urls.py

from django.urls import path, include

urlpatterns = [
    path('qna', include('qna.urls'))
]
  1. GMail SMTP 서버를 이용하기 위한 설정
    Gmail SMTP 서버에 대한 내용은 이 링크를 참조하면 된다.
# practice_celery/practice_celery/settings.py

# email
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = "smtp.gmail.com"
EMAIL_HOST_USER = 'username@gmail.com'
EMAIL_HOST_PASSWORD = 'your-gmail-password'
EMAIL_PORT = 587
EMAIL_USE_TLS = True
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER

3. qna 기능 정의

아래와 같이 정의해주고 migrate 후 테스트를 진행해보면 mysql에 데이터가 들어가는 것과 약 8초 정도(환경마다 다를 수 있음) 후에 request에 적힌 이메일 주소로 메일이 간 것을 확인할 수 있다.

아래 코드는 여기서 자세히 확인할 수 있다.

# practice_celery/qna/models.py

from django.db import models

class Qna(models.Model):
    name = models.CharField(max_length=50)
    email = models.CharField(max_length=50, null=True)
    content = models.TextField()
    created_at = models.DateTimeField(auto_now_add=True)
    is_send = models.BooleanField(default=False)

    class Meta:
        db_table = 'qna'
# practice_celery/qna/views.py

import json

from django.views import View
from django.core.mail import send_mail
from django.http import JsonResponse, HttpResponse
from django.core.validators import validate_email
from django.core.exceptions import ValidationError

from .models import Qna
from practice_celery.settings import DEFAULT_FROM_EMAIL


class QnaView(View):
    def post(self, request):
        data = json.loads(request.body)

        try:
            validate_email(data["email"])
            Qna(
                name=data["name"],
                email=data["email"],
                content=data["content"]
            ).save()

            send_mail(
                subject='test',
                message=data["content"],
                from_email=DEFAULT_FROM_EMAIL,
                recipient_list=[data["email"]],
                fail_silently=False
            )
            return HttpResponse(status=201)
        except ValidationError:
            return JsonResponse({"message": "INVALID_EMAIL"}, status=400)
# practice_celery/qna/urls.py

from django.urls import path
from .views import QnaView

urlpatterns = [
    path('', QnaView.as_view())
]

위 코드까지는 진행하게 되면 메일이 발송이 완료될 때까지 기다려야하기 때문에 다음 작업을 수행할 수 없다. 지금부터 celery를 이용하여 비동기 작업을 진행할 것이다.


4. Celery 설치

pip를 이용해 cerlry 모듈과 redis와의 연동을 위한 dependency를 한 번에 설치한다.

pip install 'celery[redis]'

5. Redis 설치

$ wget http://download.redis.io/redis-stable.tar.gz
$ tar xvzf redis-stable.tar.gz
$ cd redis-stable
$ make
$ redis-server # redis 실행
$ redis-cli ping # 정상 설치되었는지 확인
> PONG

PONG 메시지를 띄우면 설치 성공이다.


6. Celery 환경설정

자세한 설명은 Celery 공식문서를 참고하면 된다.

# practice_celery/practice_celery/settings.py

# celery
# 참고로 CELERY_BROKER_URL에 'redis://localhost:6379'를 했을 때는 적용이 되지 않았다.
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
# practice_celery/practice_celery/celery.py

from __future__ import absolute_import, unicode_literals

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'practice_celery.settings')
app = Celery('practice_celery')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))
# practice_celery/practice_celery/__init__.py

from __future__ import absolute_import, unicode_literals

from .celery import app as celery_app

__all__ = ('celery_app',)

7. qna 기능 중 send_email 작업을 worker에게 전달할 업무로 따로 뺀다.

# practice_celery/qna/tasks.py

from django.core.mail import send_mail
from celery import shared_task

from practice_celery.settings import DEFAULT_FROM_EMAIL
from .models import Qna

@shared_task
def task_send_email(content, email, id):
    send_mail(
        subject='문의내용입니다.',
        message=content,
        from_email=DEFAULT_FROM_EMAIL,
        recipient_list=[email],
        fail_silently=False
    )
    qna = Qna.objects.get(id=id)
    qna.is_send = True
    qna.save()
    return None
# practice_celery/qna/views.py

import json

from django.views import View
from django.http import JsonResponse, HttpResponse
from django.core.validators import validate_email
from django.core.exceptions import ValidationError

from .models import Qna
from .tasks import task_send_email


class QnaView(View):
    def post(self, request):
        data = json.loads(request.body)
        try:
            validate_email(data["email"])
            qna = Qna(
                name=data["name"],
                email=data["email"],
                content=data["content"]
            )
            qna.save()

            task_send_email.delay(data["content"], data["email"], qna.id)
            return HttpResponse(status=201)
        except ValidationError:
            return JsonResponse({"message": "INVALID_EMAIL"}, status=400)

8. 테스트 진행

Redis, Django, Celery를 구동시킨 후 테스트해본다.

python manage.py runserver
redis-server

# 서버 정지
$ redis-cli shutdown
celery -A practice_celery worker -l info

위와 같이 코드가 변경되었을 때, 8초정도 걸리던 작업이 1초도 안 걸려 끝나는 것을 확인할 수 있었다. 물론 메일이 오는 속도는 비슷하다. 하지만 오래 걸리는 작업은 celery에게 맡기고 다른 작업을 수행할 수 있기 때문에 효과적이다.

변경된 전체 코드는 여기서 확인할 수 있다.




Celery WorkFlow

  1. @shared_task로 처리하고 싶은 일에 딱지를 붙인다.

  2. task_send_email 작업에 delay를 붙이면 Redis Backend에 기록이 저장된다.

  3. Redis는 Celery에게 일을 준다.

  4. 일을 받은 Celery는 task_send_email 작업을 시작한다.




참고사이트
https://whatisthenext.tistory.com/127
https://www.youtube.com/watch?v=b-6mEAr1m-A
https://dgkim5360.tistory.com/entry/python-celery-asynchronous-system-with-redis

profile
한 줄 소개가 자연스러워지는 그날까지
post-custom-banner

1개의 댓글

comment-user-thumbnail
2021년 3월 17일

잘보았습니다.

따라해봐서 진행을 해보는데,

현재 window10 환경에서 개발중입니다.

celery -A 앱이름 worker -l info 가 작동을 안해서 계속 찾아보니,
window10 에서는 --pool=prefork 설정이 안된다하여 --pool=solo 로 진행했습니다.

메일이 한번은 비동기로 처리가 되는데, 그 뒤로는 회원가입을해도 이메일이 발송이 되질않습니다.

redis 말고 rabbitMQ 를 브로커로 사용하니 일단 기능이 제대로 동작하긴 하는데,

혹시 제가 놓치고 있는 부분이 어디인지 알수있을까요.. redis 는 제일 처음 한번만 보내고 그뒤론 task를 받질 못하고 있습니다 ㅠㅠ

답글 달기