Celery 겉핥기 (with redis, django)

렁호·2023년 8월 15일
0
post-thumbnail

python 기반의 서비스 운영 환경에서 참 많이 사랑받는 celery에 대해서 알아보자. (Broker는 redis를 사용)

Celery란

분산 메시지 전달 기반 비동기 Task queue

장점

  • 안정성
    • 연결 유실이나 실패가 일어날 경우 자동 재시작
  • 확장성
    • 사용자가 자유롭게 설정할 수 있는 영역이 많음
  • 속도
    • 최적화 설정이 되어있을 경우 최고 분당 수백만개의 작업 처리 가능

구성 요소

  • Producer
    • Task를 생산하여 Broker로 전달하는 주체 (Celery App)
  • Broker
    • Producer 를 통해 Task를 받고 Consumer에게 Task를 전달하는 주체
    • ex) RabbitMQ, Redis, Amazone SQS ...
  • Consumer (Worker)
    • Task를 받아 실제 작업을 처리하는 주체 (Celery Worker)

Extension

  • Celery beat
    • 작업 스케줄링 라이브러리
    • Task를 Cron과 같이 스케쥴 패턴대로 처리할 수 있음
    • 배치 작업등에도 사용 가능

Work Process

  1. Client 에게 요청을 받음
  2. 비동기로 태스크를 Broker로 전달
    • redis 스토리지에 태스크별 큐에 태스크에 대한 정보를 직렬화 하여 Rpush함 (type: list)
  3. Main Worker Process가 태스크별 큐를 확인하여 데이터가 존재할 경우 Lpop 하여 worker process에게 전달
  4. Worker process가 작업을 처리
  5. 결과를 Broker에 전달(Optional)

How To Use

django 프로젝트 셋팅

루트 디렉토리: manage.py 가 존재하는 프로젝트 최상위 경로

프로젝트 디렉토리: wsgi.py가 존재하는 프로젝트 경로


  1. Celery 설치
    • Broker로 redis를 사용할 예정임으로 한번에 redis 의존성까지 설치
pip install 'celery[redis]'

  1. settings 설정
# settings.py

# Redis
REDIS_HOST = os.getenv('REDIS_HOST')
REDIS_PORT = os.getenv('REDIS_PORT')
REDIS_PORT_SYSTEM = os.getenv('REDIS_PORT_SYSTEM')
REDIS_PW = os.getenv('REDIS_PW')

# Celery
CELERY_BROKER_URL = f'redis://:{REDIS_PW}@{REDIS_HOST}:{REDIS_PORT}'
CELERY_RESULT_BACKEND = CELERY_BROKER_URL
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'

  1. celery app 설정
# 루트디렉토리/프로젝트디렉토리/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'SQApi.settings')
app = Celery('app_name')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks() 
  • autodiscover_tasks(): 프로젝트내 모든 celery shared_task 데코레이터가 적용된 테스크 수집
    • 안 쓸 경우 Celery(include=[’app.tasks’, ‘app2.tasks’]) 식으로 지정 해주거나 app 별로 테스크를 관리해야함
  • config_from_object() 의 namespace 파라미터는 prefix를 의미, CELERY 로 시작하는 설정 값을 가져옴

  1. 프로젝트 init 수정
# 루트디렉토리/프로젝트디렉토리/__init__.py

from .celery import app as celery_app

__all__ = ('celery_app',)
  • django가 app이 시작되면서 celery 앱을 로드 하도록 설정

Task 함수 작성 및 Task 생성

  1. 함수 작성
# <>/tasks.py
from celery import shared_task

@shared_task
def do_something(x, y):
	...
	return x + y
  • shared_task 데코레이터를 사용
    • shared_task(bind=True) 로 설정할 경우 class의 인스턴스 메소드로 사용 가능

  1. Task 생성
from apps.tasks import do_something
do_something.delay(5, 5)
  • 함수의 delay 메소드 호출 하여 delay 메소드의 파라미터로 함수 파라미터 전달

Worker 실행

  • Broker 정보와 DB등을 환경 변수에서 가져옴으로 실행 전 환경변수 초기화 필수
celery -A SQApi worker -l INFO --autoscale 2,5
  • -A SQApi: 실행할 셀러리 어플리케이션 설정
  • -l INFO: 로그 레벨 설정
  • --autoscale n,m: 최소 n개 최대 m개의 프로세스 사용
    • 최소 자식 프로세스 개수를 유지하다가 최소 개수를 넘어선 요청이 들어오면 요청에 맞춰 최대 설정 값까지의 프로세스까지 늘린 후 일정 시간이 지나면 다시 최소 프로세스로 스케일 조정
  • —concurrency n: 하나의 워커에서 최대 n개의 작업만큼 동시 작업
    • 높게 설정하면 서버에 과부하 심함
  • 그 외 worker 설정 공식 문서 참조

종료, 재시작

  • 종료의 경우 kill 명령어를 통해 프로세스 자체를 죽이는 방식으로 종료
    • pkill 명령어를 통해 프로세스 이름으로 제거 가능
  • 재시작은 종료 후 다시 시작
    • 워커 프로세스가 죽어 있더라도 broker에 task가 남아있다면 재시작 하면서 broker의 task를 일괄적으로 작업함

Daemon Service (systemd 사용)

실 운영환경에서 효율적으로 Celery 워커를 동작시기키 위해서 daemon 서비스로 등록을 해서 사용하자

  1. 환경 변수 파일 생성
CELERYD_NODES="w1"
CELERY_BIN=가상환경내 Celery 파일
CELERY_APP="SQApi"
CELERYD_MULTI="multi"
CELERYD_OPTS="--concurrency=2 --autoscale=4,8"
CELERYD_PID_FILE="/home/ubuntu/logs/celery_%n.pid"
CELERYD_LOG_FILE="/home/ubuntu/logs/celery_%n%I.log"
CELERYD_LOG_LEVEL="INFO"
  1. celery service file 작성
## /etc/systemd/system/celery.service

[Unit]
Description=Celery Service
After=network.target

[Service]
Type=forking
User=user name
Group=group name
EnvironmentFile=환경변수파일
WorkingDirectory=루트디렉토리
ExecStart=/bin/bash -c '${CELERY_BIN} multi start ${CELERYD_NODES} \
  -A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} \
  --logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} ${CELERYD_OPTS}'
ExecStop=/bin/bash -c '${CELERY_BIN} multi stopwait ${CELERYD_NODES} \
  --pidfile=${CELERYD_PID_FILE}'
ExecReload=/bin/bash -c '${CELERY_BIN} multi restart ${CELERYD_NODES} \
  -A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} \
  --logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} ${CELERYD_OPTS}'
Restart=always

[Install]
WantedBy=multi-user.target
  1. 서비스 등록 및 실행
sudo systemctl enable celery
sudo systemctl daemon-reload

sudo systemctl start celery

sudo systemctl status celery

모니터링 & 로깅

  • 주로 Flower 라는 python 라이브러리를 사용
    • 사용법이 매우 간단하나 내용이 은근 많아서 생략..구글링 고고

사용시 주의사항

  • Celery에서 task를 브로커에게 전달할 때 json으로 직렬화 하여 전달하는데 객체에 따라 직렬화 불가능 할 수가 있음, 이럴 경우 사용자가 직렬화 하여 파라미터로 전달 하거나 직렬화 가능한 데이터 객체를 만들어서 전달
  • Redis(Broker) 가 종료될 경우 task가 유실 됨
    • Broker 종료 전 남은 작업 존재하는지 확인 필요해보임
  • 결과가 필요 없는 태스크에 대해서 ignore_result=True 설정을 하지 않을 경우, 결과가 필요 없음에도 브로커에 저장되어 불필요한 자원을 낭비할 수 있음
profile
식욕, 수면욕, 성욕 만땅

1개의 댓글

comment-user-thumbnail
2023년 8월 15일

훌륭한 글 감사드립니다.

답글 달기