[Django] Django에서 Celery로 비동기 작업 구현하기

김동욱·2024년 4월 19일

Django

목록 보기
5/6
post-thumbnail

회원가입에서 이메일 인증 기능을 구현했다. Gmail SMTP 사용하여 이메일로 인증번호를 보내는데, 이메일 전송 api 응답 시간이 약 2 ~ 3초가 소요된다. 하지만 가끔 아래와 같아 꽤 오래 걸릴 때가 있었다.

이 처럼 처리가 오래 걸리는 api를 동기 방식으로 구현하면 처리가 끝날 때 까지 기다리면서 다른 작업을 할 수 없다. 리소스의 효율적 사용하기 위해서 비동기 방식으로 구현해야 한다. Django에서 Celery를 사용하여 비동기 작업을 구현할 수 있다.

Celery에 대해 살펴보자

Celery란

Celery란 분산 메시지 전달에 기반을 둔 비동기 태스크 큐이다. Celery를 사용하면 오래 걸리는 작업을 비동기적으로 처리할 수 있다. 사용자는 작업을 요청하고 바로 응답을 받을 수 있으며, 실제 작업은 백그라운드에서 처리된다.

Celery의 특징

  • 태스크가 여러 서버에 전달되고 병렬적으로 처리
  • 분산된 각각의 워커는 태스크를 받아서 서로 다른 머신에서 실행 가능
  • 클라이언트(Django)가 워커에 보낸 메세지를 브로커(Redis 등) 중간에서 중재
  • 완료된 태스크에 대한 상태를 저장
  • 반복적인 스케쥴링 작업 가능
  • 중간에 이상이 발생하더라도 끝까지 태스크를 수행할 수 있게 설계
  • 파이썬 웹 프레임워크와 연동이 쉬움
  • 복잡한 워크플로우도 지원함
  • Flower와 같은 모니터링 툴로 실시간 모니터링 가능

Celery를 활용한 비동기 작업 동작 방식

사용자가 Django 애플리케이션에 요청을 보낸다. Django 애플리케이션은 요청에 대한 응답을 즉시 반환한다. 이 때, 백그라운드에서 처리해야 할 작업을 Celery 태스크로 정의한다. Celery는 이러한 태스크를 생성하여 메시지 브로커(Redis, RabbitMQ)에 저장한다. Celery 워커는 메시지 브로커에서 이 태스크를 가져와 처리한다. 태스크가 실행을 완료하면, Celery는 결과를 데이터베이스에 저장하거나 필요에 따라 다른 작업을 수행한다.

Django에서 Celery 사용하기

Celery를 사용기 위해선 Celery 관련 라이브러리와 메시지 브로커(Redis 또는 RabbitMQ)를 설치해야 한다. Redis와 RabbitMQ에 대해 간단하게 비교해보자면, Redis는 상대적으로 덜 안정적이고 속도가 빠른반면 RabbitMQ는 상대적으로 안정적이고 속도가 느리다. 현 프로젝트에 redis 관련 설정이 되어있기 때문에 redis를 메세지 브로커로 선택했다.

다음의 명령어로 Celery를 설치하자.

pip install 'celery[redis]'

Django 프로젝트에서 Celery를 사용하려면 먼저 Celery 라이브러리의 인스턴스("앱"이라고 함)를 정의해야 한다. 프로젝트 디렉토리(settings.py가 존재하는 디렉토리)에 celery.py 모듈을 다음과 같이 생성한다.

[프로젝트 디렉토리/celery.py]

import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()


@app.task(bind=True, ignore_result=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

그런 다음 이 앱을 __init__.py 모듈로 가져와야 한다. 이렇게 하면 Django가 시작될 때 앱이 로드되어 @shared_task 데코레이터가 앱을 사용할 수 있게 된다.

[프로젝트 디렉토리/__init__.py]

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

설정 파일에 Celery관련 환경변수 선언을 해야 한다. 다음과 같은 포맷으로 진행한다. 공식 문서를 통해 사용 가능한 환경 변수 확인을 할 수 있다.

[config/settings.py]

CELERY_TIMEZONE = "Australia/Tasmania"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60

Celery로 이메일 전송 로직 비동기 방식으로 리팩토링하기

기존 코드 리팩토링하기

기존의 회원가입/비밀번호 변경 로직에서 인증 번호 발송 로직을 동기 방식으로 구현했다. 이를 Celery를 활용하여 비동기 방식으로 리팩토링 해보자.

우선 위에서 언급한 설정을 모두 진행했다고 가정한다. 필자 같은 경우는 account/views.py 모듈 상단에 이메일 전송 메서드를 구현하여 회원가입, 비밀번호 변경 각각의 상황에서 해당 메서드를 호출하는 방식으로 구현했다.

이메일 전송 메서드를 app 내부의 task.py 모듈을 생성하고 아래와 같이 작성한다.

[account/task.py]

import logging

from django.core.mail import send_mail

from config.celery import app


logger = logging.getLogger('django')


@app.task(name="worker")
def send_auth_code_to_email(type: str, receiver: str, code: int) -> None:
    try:
        email_subject = "[ROCCIA 901] 본인 확인 인증 번호 입니다."
        email_body = f"{type}을 위해 전송된 이메일입니다.\n 본인확인을 위해 인증 번호 [{code}]를 입력해 주세요."

        send_mail(subject=email_subject, message=email_body, from_email="ROCCIA 901", recipient_list=[receiver])
    except Exception as e:
        logger.error(f"{type} 인증 이메일 전송 실패 - {receiver}, 에러: {str(e)}")

기존의 동기 작업은 이메일 발송 실패 시 커스텀 예외를 발생시켰다. 하지만 비동기 방식은 이메일 전송 작업이 실패하더라도 사용자는 성공했다는 응답을 받게된다. 사용자 경험에 비중을 둬서 이처럼 구현하게 됐다. 따라서 예외 발생 시 로깅 처리를 하도록 했다.

아래의 두 데코레이터는 태스크를 정의할 때 사용되지만, 각각의 사용 목적과 적용 범위에 차이가 있다.

@app.task : 특정 Celery 애플리케이션 인스턴스에 태스크를 등록한다.
@shared_task: 태스크를 애플리케이션 인스턴스에 종속시키지 않고, 다양한 설정이나 여러 Django 앱에서 사용될 수 있도록 할 때 사용한다.

이처럼 등록한 태스크를 호출하기 위해선 delay() 메서드를 호출해야 한다.

send_auth_code_to_email.delay("비밀번호 변경", receiver, code)

Celery관련 환경 변수는 다음과 같이 추가했다.CELERY_BROKER_URL은 브로커로 사용할 Redis의 주소이다. 비동기 작업을 Celery로 넘기기 전에 Redis에 저장한다. Celery가 작업을 처리한 결과를 CELERY_RESULT_BACKEND에 저장한다. CELERY_RESULT_BACKEND 또한 Redis를 사용했다.

CELERY_BROKER_URL = env("CELERY_BROKER_URL", default=""),
CELERY_RESULT_BACKEND = env("CELERY_RESULT_BACKEND", default="")
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Seoul'

Celery 워커를 실행하기 위해선 Django 실행 이후 다음의 명령어를 사용한다. 도커 컴포즈로 따로 컨테이너화하여 실행할 수도 있다.

celery -A [celery.py가 존재하는 디렉토리 이름] worker --loglevel=info

결과 살펴보기

처음 결과 23초에 비해 0.053초로, 엄청난 성능 향상을 보인다. Celery가 동작하지 않으면 메일 전송이 정상적으로 처리되지 않지만 성공 응답을 받는다. Celery를 실행하면 메세지 브로커에 쌓여있던 작업들이 모두 실행된다.


참고 자료

https://docs.celeryq.dev/en/stable/django/first-steps-with-django.html#using-celery-with-django
https://www.youtube.com/watch?v=MtCa7A3_JGs

profile
안녕하세요! 질문과 피드백은 언제든지 환영입니다:)

0개의 댓글