Celery 첫 걸음마 떼기 (1/2)

marco x brown·2020년 3월 7일
5

지난 포스트를 작성하면서 Celery가 무엇에 쓰는 물건인지를 간략하게 개요 성격으로 알아보았다. 이번 포스트에서는 실제 Python 코드를 작성하면서 Celery 사용법을 익히고 각 브로커에 대해 더 자세하게 다룰 것이다.

Celery 공식 문서를 번역하여 작성하였다. 의욕이 앞서다보니 의역, 오역이 난무할 수 있다.


Celery는 배터리를 포함하고 있는 태스크 큐(Task Queue)이다. 사용법이 간단하기 때문에 너무 복잡하게 생각하지 않아도 된다. 작성한 프로그램이 다른 언어로 확장 및 통합될 수 있도록 최고의 사례를 중심으로 설계되었으며, 프로덕션 환경에서 이러한 시스템을 실행하는 데에 필요한 도구 등을 지원한다.

이번 튜토리얼을 통해 Celery 기초 사용법에 대해 배울 것이다.

  • 메시지 전송(브로커)및 설치
  • Celery 설치 및 첫 태스크 생성
  • 워커 시작 및 태스크 호출
  • 서로 다른 상태로 전환할 때의 태스크 추적 및 리턴값 검사

브로커 선정

Celery는 메시지 송수신에 대한 솔루션이 필요하다. 일반적으로 메시지 브로커라는 별도의 서비스 형태로 제공된다. 메시지 브로커에 대한 선택지가 몇 개 있다.

RabbitMQ

RabbitMQ는 기능적으로 완전하고 안정적이며 내구성이 뛰어나고 설치가 쉽다. 프로덕션 환경에서 최고의 선택지이다. 자세한 정보는 Using RabbitMQ에서 확인할 수 있다.

  • Ubuntu/Debian에서의 설치
$ sudo apt-get install rabbitmq-server
  • Docker
$ docker run -d -p 5672:5672 rabbitmq
  • macOS
$ brew install rabbitmq

설치가 완료되면 브로커는 백그라운드에서 실행되고 있고 아래의 메시지를 확인할 수 있을 것이다.

Starting rabbitmq-server: SUCCESS.

macOS 환경은 약간 다르다. 별도로 커맨드를 실행해야 백그라운드로 올릴 수 있다.
(필자의 경우 10.15.2 Catalina)

...
To have launchd start rabbitmq now and restart at login:
  brew services start rabbitmq
Or, if you don't want/need a background service you can just run:
  rabbitmq-server
$ brew services start rabbitmq
==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)

Redis

Redis 또한 완전한 기능을 갖추고 있지만, in-memory 구조이기 때문에 갑작스러운 종료 또는 전원 차단 상황에서 데이터 손실에 더욱 취약하다. 자세한 정보는 Using Redis에서 확인할 수 있다.

  • Docker
$ docker run -d -p 6379:6379 redis

기타 브로커

위의 브로커들 외에도 Amazon SQS를 포함하여 다른 브로커들도 존재한다.

Broker Overview

Celery 설치

PyPI에 올라와 있기 때문에 pip 또는 easy_install 커맨드로 설치할 수 있다.

$ pip install celery

애플리케이션 작성

제일 먼저 Celery 인스턴스를 생성하자. 이 인스턴스는 태스크 생성, 워커 관리 등 Celery에서 수행하고자 하는 모든 일들에 대한 엔트리포인트로 사용되므로 다른 모듈에서 import가 가능해야 한다.

이 튜토리얼에서는 단 하나의 모듈에 모든 걸 담을 것이지만 더 큰 프로젝트의 경우 Celery 전용 모듈을 사용하고 싶어할 것이다.

tasks.py 모듈을 만들자.

# tasks.py

from celery import Celery


app = Celery('tasks', broker='pyamqp://guest@localhost//')


@app.task
def add(x, y):
    return x + y

첫 번째 인자는 현재 모듈의 이름이다. 메인(__main__) 모듈에 태스크들이 정의될 때 이름이 자동으로 생성될 수 있도록 할 경우에만 필요하다.

두 번째 인자는 사용하고자 하는 메시지 브로커의 URL이다. 여기서는 디폴트 옵션인 RabbitMQ를 사용하고 있다.

RabbitMQ의 URL은 amqp://localhost, Redisredis://localhost 형식으로 URL을 사용한다.

Celery 워커 서버 실행

worker 인자를 넣어 워커가 위에서 작성한 프로그램을 실행하도록 할 수 있다.

$ celery -A tasks worker --loglevel=info


celery@YOONui-MacBook-Pro.local v4.4.1 (cliffs)

Darwin-19.2.0-x86_64-i386-64bit 2020-03-07 23:53:43

[config]
.> app:         tasks:0x107af0550
.> transport:   amqp://guest:**@localhost:5672//
.> results:     disabled://
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2020-03-07 23:53:44,025: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2020-03-07 23:53:44,043: INFO/MainProcess] mingle: searching for neighbors
[2020-03-07 23:53:45,085: INFO/MainProcess] mingle: all alone
[2020-03-07 23:53:45,109: INFO/MainProcess] celery@YOONui-MacBook-Pro.local ready.

현재 물려 있는 태스크의 주소값, 메시지 브로커에 대한 정보들이 출력되며 Celery가 준비되었다고 말해주고 있다.

프로덕션 환경에서는 워커를 데몬으로 백그라운드에서 실행하기를 원할 것이다. 그러기 위해서는 supervisord와 같은 플랫폼별 데몬 툴이 필요하다. macOS에서는 launchd이다.

도움말이 필요하다면 아래 커맨드로.

$ celery worker --help
$ celery help

태스크 호출

delay() 메소드로 태스크를 호출할 수 있다. 이 메소드는 태스크 호출을 ㅎ과적으로 제어할 수 있는 apply_asyncx() 메소드에 대한 바로가기이다.

>>> from tasks import add
>>> add.delay(4, 4)
<AsyncResult: 8c9fec82-a48f-47c0-a3e2-6574e35dd8f9>
>>>

워커의 콘솔 출력에서 알 수 있듯이 앞에서 시작했던 워커에 의해 태스크가 처리되었다.

태스크를 호출하면 AsyncResult 인스턴스가 리턴된다. AsyncResult 인스턴스는 태스크의 상태를 체크하거나, 태스크가 완료될 때까지 기다리거나, 리턴값을 얻거나, 태스크가 실패할 경우 예외처리와 traceback을 얻는 데에 사용할 수 있다.

기본적으로 호출 결과는 사용되지 않는다. 원격 프로시저 호출을 수행하거나 데이터베이스에서 태스크 결과를 추적하려면 결과 백엔드(?)를 사용하도록 Celery를 설정해야 한다. 다음 섹션에서 알아보자.

참고 문서

https://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#first-steps

profile
개발자로 크는 중

0개의 댓글