Celery 첫 걸음마 떼기 (2/2)

marco x brown·2020년 3월 8일
3

결과 유지

태스크의 상태를 추적하기 위해서는 그 상태를 어딘가로 보내거나 저장해야 한다. SQLAlchemyDjango ORM, Memcached, Redis, RPC(RabbitMQ/AMQP) 정도의 result backend 선택지들이 있고 자체적으로 정의하여 사용할 수도 있다. result backend를 뭐라고 번역해야 할지 도통 모르겠다.

이번에는 일시적으로 메시지로 상태를 다시 보내는 rpc result backend를 사용한다. Celery 클래스의 backend 인자로 result backend를 명시한다. (config 모듈을 사용한다면 result_backend 변수로 설정할 수 있다.)

app = Celery('tasks', backend='rpc://', broker='pyamqp://')

Redis를 result backend로 사용하지만, 여전히 메시지 브로커로 RabbitMQ를 사용하고자 하는 경우 아래처럼 설정하면 된다. (인기가 많은 조합이다.)

app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

Result Backend에 대한 자세한 정보

result backend를 설정했고, AsyncResult 인스턴스를 변수에 담아서 태스크를 호출해보자.

>>> result = add.delay(4, 4)

ready() 메소드는 태스크 처리가 완료되었는지를 리턴한다.

>>> result.ready()
False

아래처럼 완료될 때까지 기다릴 수 있지만, 이는 비동기 호출을 동기 호출로 변환하기 때문에 거의 사용되지 않는다.

>>> result.get(timeout=1)
8

튜토리얼에서는 8을 리턴한다고 하는데 직접 해보니 예외들을 뱉어내고 있다.

Traceback (most recent call last):
  File "/Users/yoon/workspace/learn-python/celery/venv/lib/python3.7/site-packages/
celery/backends/asynchronous.py", line 268, in _wait_for_pending
    on_interval=on_interval):
  File "/Users/yoon/workspace/learn-python/celery/venv/lib/python3.7/site-packages/
celery/backends/asynchronous.py", line 53, in drain_events_until
    raise socket.timeout()
socket.timeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/yoon/workspace/learn-python/celery/venv/lib/python3.7/site-packages/
celery/result.py", line 228, in get
    on_message=on_message,
  File "/Users/yoon/workspace/learn-python/celery/venv/lib/python3.7/site-packages/
celery/backends/asynchronous.py", line 200, in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
  File "/Users/yoon/workspace/learn-python/celery/venv/lib/python3.7/site-packages/
celery/backends/asynchronous.py", line 272, in _wait_for_pending
    raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.

태스크가 예외를 발생시키면 get() 메소드는 예외를 다시 발생시킬 것이다. 하지만 propagate 인자를 명시함으로써 무시할 수 있다.
그런데 아무런 결과도 나타나지 않고 무한로딩에 걸린 것 같다.
예외가 발생한다면, 오리지널 traceback으로 접근할 수 있다.

>>> result.traceback

주의:
Backend는 리소스를 사용하여 결과를 전송하고 저장한다. 리소스를 해제하려면 태스크 호출 이후 리턴된 모든 AsyncResult 인스턴스에 get() 또는 forget() 메소드를 사용해야 한다.

Configuration

가전 제품처럼 Celery는 많은 설정이 필요없다. 입력과 출력이 있는데, 입력은 브로커에게 연결되어야 하고 출력은 result backend에 선택적으로 연결될 수 있다. 그러나 자세히 보면 슬라이더, 다이얼 그리고 버튼이 많은 뚜껑이라는 설정들이 있다.

기본 설정은 대부분의 유즈 케이스에 대해 충분하지만 필요에 따라 Celery가 정확하게 동작하게 설정할 수 있는 옵션들이 있다. 설정 가능한 옵션들에 익숙해지는 것도 좋은 생각이다. Configuration and defaults

앱에 직접 설정하거나 configuration 전용 모듈을 사용할 수 있다. 예로, task_serializer 설정을 변경하여 태스크 페이로드를 시리얼라이징하는 데에 사용되는 시리얼라이저를 구성할 수 있다.

app.conf.task_serializer = 'json'

update() 메소드로 한번에 여러 설정을 할 수 있다.

...
app.conf.update(
    task_serializer='json',
    accept_content=['json'],	# JSON을 제외한 다른 content 설정들은 무시
    result_serializer='json',
    timezone='Asia/Seoul',
    enable_utc=True,
)

더 큰 프로젝트에서는 config 전용 모듈을 추천한다. 주기적인 태스크 간격을 하드코딩하는 것과 태스크 라우팅 옵션은 권장하지 않는다. 중앙에서 유지하는 것이 훨씬 낫다. 사용자가 태스크 동작을 제어할 수 있도록 하므로 라이브러리에 한해서 특히 맞는 말이다. 중앙 집중식 config를 통해 SysAdmin은 시스템 문제 상황에서 간단하게 변경할 수 있다.

app.config_from_object() 메소드를 호출하여 외부 config 모듈을 사용할 수 있다.

app.config_from_object('celeryconfig')

보통 celeryconfig라고 하지만 원하는대로 이름을 지을 수도 있다.

위 코드대로라면 현재 디렉토리나 Python 경로에 celeryconfig.py 모듈이 있어야 한다. 아래처럼 작성할 수 있다.

# celeryconfig.py

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Seoul'
enable_utc = True

config 파일이 잘 동작하는지, 문법 오류가 없는지 확인하려면 아래 커맨드를 사용하자.

$ python -m celeryconfig

config 파일의 강력함을 보여주기 위해 다음과 같이 오작동하는 태스크를 큐에 라우팅하는 방법이 있다.

# celeryconfig.py

task_routes = {
	'tasks.add': 'low-priority',
}

또는 라우팅 대신 작업 속도를 제한하여 분당 10개의 태스크만 처리할 수 있다.

task_annotations = {
	'tasks.add': {'rate_limit': '10/m'},
}

메시지 브로커로 RabbitMQ나 Redis를 사용한다면 런타임에 워커에게 작업 속도 제한을 거는 것도 가능하다.

$ celery -A tasks control rate_limit tasks.add 10/m

worker@example.com: OK
	new rate limit set successfully

Routing Tasks
task_anntations()
Monitoring and Management Guide

참고 문서

https://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#first-steps

profile
개발자로 크는 중

2개의 댓글

comment-user-thumbnail
2020년 3월 9일

첫 걸음마 축하드려요!

답글 달기
comment-user-thumbnail
2023년 12월 21일

결과처리서버 라고 번역하죠

답글 달기