celery 공식문서의 Routing Tasks를 번역하였습니다
task를 라우팅하는 가장 쉬운 방법은 task_create_missing_queues 세팅을 사용하는 것이다.
이 세팅을 키면, 아직 task queues에 이름없는 queue가 생성된다. 이를 통해 task라우팅을 손쉽게 수행할 수 있다.
일반적인 task들을 다루는 두개의 서버 x, y와 feed 관련 작업만 다루는 z 서버가 있다 가정하자. 이러한 구성을 사용 할 수 있다.
task_routes = {'feed.tasks.import_feed': {'queue': 'feeds'}}
이를 활성화 하면, feed 가져오기 작업(import_feed)은 "feeds" 큐로 라우팅 되고, 다른 모든 작업들은 기본적인 queue로 라우팅된다.
혹은 feed.taks 네임스페이스에 있는 모든 태스크들을 매칭하기 위해, 전역 패턴 매칭 심지어 정규 표현식까지 사용할 수 있다.
app.conf.task_routes = {'feed.tasks.*': {'queue': 'feeds'}}
만약 매칭 패턴들의 순서가 중요하다면 items 포맷으로 라우터를 지정해줘야 한다.
task_routes = ([
('feed.tasks.*', {'queue': 'feeds'}),
('web.tasks.*', {'queue': 'web'}),
(re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
],)
task_routes 세팅은 딕셔너리 타입일수도 있고 리스트 타입일수도 있다. 이련 경우에는, 리스트나 딕셔너리를 튜플로 감싸줘야 한다.
라우터 설정이 끝난 뒤에, z 서버만을 feeds 큐를 처리하기 위해 구동시킬 수 있다.
user@z:/$ celery -A proj worker -Q feeds
원하는 만큼 큐를 지정할 수 있어서, 이 서버 프로세스를 아래와 같이 디폴트 큐로 지정할 수 있다.
user@z:/$ celery -A proj worker -Q feeds,celery
app.conf.task_default_queue = 'default'
{'exchange': 'video',
'exchange_type': 'direct',
'routing_key': 'video'}
AMQP가 아닌 REDIS나 SQS는 변경을 지원하지 않으므로, "exchange"와 큐의 이름을 동일하게 해주어야 한다.
다시 일반적인 task들을 다루는 두개의 서버 x, y와 feed 관련 작업만 다루는 z 서버가 있다 가정하자. 아래와 같은 구성을 사용할 수 있다.
from kombu import Queue
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
Queue('default', routing_key='task.#'),
Queue('feed_tasks', routing_key='feed.#'),
)
app.conf.task_default_exchange = 'tasks'
app.conf.task_default_exchange_type = 'topic'
app.conf.task_default_routing_key = 'task.default'
task_queues는 큐 객체들의 리스트이다. 만약 exchange나 echange_type값을 바꾸지 않았다면, 이 값들은 task_default_exchange와 task_default_exchange_type 세팅에서 가져오게 된다.
태스크를 feed_tasks 큐로 라우팅하기 위해, task_routes 세팅을 추가할 수있다.
task_routes = {
'feeds.tasks.import_feed': {
'queue': 'feed_tasks',
'routing_key': 'feed.import',
},
}
또한 routing_key아큐먼트를 사용해서 Task.apply_async() 나 send_task()로 오버라이딩 할 수도 있다.
>>> from feeds.tasks import import_feed
>>> import_feed.apply_async(args=['http://cnn.com/rss'],
queue='feed_tasks',
routing_key='feed.import')
z서버가 feed queue에서만 처리되도록 하기 위해 셀러리 구동시 celery worker -Q 옵션을 사용한다.
celery -A proj worker -Q feed_tasks --hostname=z@%h
서버 x와 y는 디폴트 큐를 사용하도록 반드시 표기해주어야 한다.
user@x:/$ celery -A proj worker -Q default --hostname=x@%h
user@y:/$ celery -A proj worker -Q default --hostname=y@%h
만약 다른 exchange에 있지만 추가하고 싶은 다른 queue가 있다면 exchange와 exchange type을 지정해주기만 하면 된다.
from kombu import Exchange, Queue
app.conf.task_queues = (
Queue('feed_tasks', routing_key='feed.#'),
Queue('regular_tasks', routing_key='task.#'),
Queue('image_tasks', exchange=Exchange('mediatasks', type='direct'),
routing_key='image.compress'),
)
Celerㅛ Redis 전송은 우선순위 필드를 존중(???) 하긴 하지만, 실제로 Redis에는 우선순위 개념이 없다. 따라서 Redis로 우선순위를 구현하기 위해서는 아래의 사항을 참고할 필요가 있다.
우선순위에 따라 작업을 예약하려먼 queue_order_strategy 전송 옵션을 구성해야한다.
app.conf.broker_transport_options = {
'queue_order_strategy': 'priority',
}
우선순위는 각각의 queue에 대해 n개의 리스트를 생성함으로써 지원된다.비록 10개(0~9)의 우선순위 레벨이 있지만, 이 우선순위 레벨들은 자원을 세이브하기 위해 기본적으로 4개 수준으로 통합되었다는 뜻이다(???). 즉, celery라는 큐는 실제로는 4개의 큐로 분할됨을 뜻한다.
celery라 이름붙여진 큐가 가장 우선순위가 높으며, 다른 큐들은 분할자(기본적으로 x06x16)를 갖게 되고, 이 뒤에 우선순위 숫자가 붙게 된다.
['celery', 'celery\x06\x163', 'celery\x06\x166', 'celery\x06\x169']
-> celery + \06\16(분할자) + 3 (우선순위)
만약 더 많은 우선순위 레벨이나 다른 분할자를 지정하고자 할 때에는 broker_transport_options에서 'prioty_steps'와 'sep'옵션을 지정할 수 있다.
app.conf.broker_transport_options = {
'priority_steps': list(range(10)),
'sep': ':',
'queue_order_strategy': 'priority',
}
이렇게 세팅을 하면 아래와 같은 queue가 만들어진다.
['celery', 'celery:1', 'celery:2', 'celery:3', 'celery:4', 'celery:5', 'celery:6', 'celery:7', 'celery:8', 'celery:9']
이렇게 구현된 우선순위들은 절대 서버 수준에서 구현된 우선순위들만큼 좋을리는 없고, 기껏해야 근사치 정도일 수 있다. 다만, 어플리케이션 수준에서는 충분할 수 있다.