지난 포스트를 작성하면서 Celery
가 무엇에 쓰는 물건인지를 간략하게 개요 성격으로 알아보았다. 이번 포스트에서는 실제 Python 코드를 작성하면서 Celery 사용법을 익히고 각 브로커에 대해 더 자세하게 다룰 것이다.
Celery 공식 문서를 번역하여 작성하였다. 의욕이 앞서다보니 의역, 오역이 난무할 수 있다.
이번 튜토리얼을 통해 Celery 기초 사용법에 대해 배울 것이다.
Celery는 메시지 송수신에 대한 솔루션이 필요하다. 일반적으로 메시지 브로커
라는 별도의 서비스 형태로 제공된다. 메시지 브로커에 대한 선택지가 몇 개 있다.
RabbitMQ
는 기능적으로 완전하고 안정적이며 내구성이 뛰어나고 설치가 쉽다. 프로덕션 환경에서 최고의 선택지이다. 자세한 정보는 Using RabbitMQ에서 확인할 수 있다.
$ sudo apt-get install rabbitmq-server
$ docker run -d -p 5672:5672 rabbitmq
$ brew install rabbitmq
설치가 완료되면 브로커는 백그라운드에서 실행되고 있고 아래의 메시지를 확인할 수 있을 것이다.
Starting rabbitmq-server: SUCCESS.
macOS 환경은 약간 다르다. 별도로 커맨드를 실행해야 백그라운드로 올릴 수 있다.
(필자의 경우 10.15.2 Catalina)
...
To have launchd start rabbitmq now and restart at login:
brew services start rabbitmq
Or, if you don't want/need a background service you can just run:
rabbitmq-server
$ brew services start rabbitmq
==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)
Redis
또한 완전한 기능을 갖추고 있지만, in-memory
구조이기 때문에 갑작스러운 종료 또는 전원 차단 상황에서 데이터 손실에 더욱 취약하다. 자세한 정보는 Using Redis에서 확인할 수 있다.
$ docker run -d -p 6379:6379 redis
위의 브로커들 외에도 Amazon SQS를 포함하여 다른 브로커들도 존재한다.
PyPI에 올라와 있기 때문에 pip
또는 easy_install
커맨드로 설치할 수 있다.
$ pip install celery
제일 먼저 Celery 인스턴스를 생성하자. 이 인스턴스는 태스크 생성, 워커 관리 등 Celery에서 수행하고자 하는 모든 일들에 대한 엔트리포인트로 사용되므로 다른 모듈에서 import가 가능해야 한다.
이 튜토리얼에서는 단 하나의 모듈에 모든 걸 담을 것이지만 더 큰 프로젝트의 경우 Celery 전용 모듈을 사용하고 싶어할 것이다.
tasks.py
모듈을 만들자.
# tasks.py
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
첫 번째 인자는 현재 모듈의 이름이다. 메인(__main__) 모듈에 태스크들이 정의될 때 이름이 자동으로 생성될 수 있도록 할 경우에만 필요하다.
두 번째 인자는 사용하고자 하는 메시지 브로커의 URL이다. 여기서는 디폴트 옵션인 RabbitMQ
를 사용하고 있다.
RabbitMQ의 URL은 amqp://localhost
, Redis
은 redis://localhost
형식으로 URL을 사용한다.
worker
인자를 넣어 워커가 위에서 작성한 프로그램을 실행하도록 할 수 있다.
$ celery -A tasks worker --loglevel=info
celery@YOONui-MacBook-Pro.local v4.4.1 (cliffs)
Darwin-19.2.0-x86_64-i386-64bit 2020-03-07 23:53:43
[config]
.> app: tasks:0x107af0550
.> transport: amqp://guest:**@localhost:5672//
.> results: disabled://
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2020-03-07 23:53:44,025: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2020-03-07 23:53:44,043: INFO/MainProcess] mingle: searching for neighbors
[2020-03-07 23:53:45,085: INFO/MainProcess] mingle: all alone
[2020-03-07 23:53:45,109: INFO/MainProcess] celery@YOONui-MacBook-Pro.local ready.
현재 물려 있는 태스크의 주소값, 메시지 브로커에 대한 정보들이 출력되며 Celery가 준비되었다고 말해주고 있다.
프로덕션 환경에서는 워커를 데몬으로 백그라운드에서 실행하기를 원할 것이다. 그러기 위해서는 supervisord
와 같은 플랫폼별 데몬 툴이 필요하다. macOS에서는 launchd
이다.
도움말이 필요하다면 아래 커맨드로.
$ celery worker --help
$ celery help
delay()
메소드로 태스크를 호출할 수 있다. 이 메소드는 태스크 호출을 ㅎ과적으로 제어할 수 있는 apply_asyncx()
메소드에 대한 바로가기이다.
>>> from tasks import add
>>> add.delay(4, 4)
<AsyncResult: 8c9fec82-a48f-47c0-a3e2-6574e35dd8f9>
>>>
워커의 콘솔 출력에서 알 수 있듯이 앞에서 시작했던 워커에 의해 태스크가 처리되었다.
태스크를 호출하면 AsyncResult
인스턴스가 리턴된다. AsyncResult
인스턴스는 태스크의 상태를 체크하거나, 태스크가 완료될 때까지 기다리거나, 리턴값을 얻거나, 태스크가 실패할 경우 예외처리와 traceback을 얻는 데에 사용할 수 있다.
기본적으로 호출 결과는 사용되지 않는다. 원격 프로시저 호출을 수행하거나 데이터베이스에서 태스크 결과를 추적하려면 결과 백엔드(?)를 사용하도록 Celery를 설정해야 한다. 다음 섹션에서 알아보자.
https://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#first-steps