AMQP 입문 - celery 공식문서 번역

Jaeuk Ko·2023년 2월 17일
0

celery의 AMQP Primer 를 번역했습니다.

Messages

message는 header와 body로 구성돼있다. Celery는 header를 message의 content type과 내용의 encoding을 저장하는데 쓴다. content type은 보통 message를 직렬화(serialize)하는데 쓰이는 직렬화 포맷(serialization format)이다. body는 실행될 task의 이름과 task의 id(uuid), task에 적용될 아규먼트들, 그리고 재시도 횟수나 ETA(??)같은 몇개의 추가적인 메타 데이터들을 갖고 있다.

아래는 Python dictionary 타입으로 된 task message 예제이다.

{'task': 'myapp.tasks.add',
 'id': '54086c5e-6193-4575-8308-dbab76798756',
 'args': [4, 4],
 'kwargs': {}}

Producers, consumers and brockers

message 발신자를 일반적으로 publisher나 producer라 부르며, message 수신자를 consumer라 한다.

message를 producer에서 consumer로 라우팅 해주는 message server를 broker라 한다.

exchanges, queues, rouing keys

  1. message는 exchange들로 보내어진다.
  2. 하나의 exchange가 message들을 하나 혹은 여러개의 queue로 보낸다. 몇개의 exchange 타입이 존재하며 각각 서로 다른 라우팅 방식을 제공하거나 혹은 서로 다른 라우팅 시나리오를 수행한다.
  3. message들은 consume(메세지 수신 혹은 수행)되기 전 까지 queue 내부에서 대기한다.
  4. message는 acknowledge되면 queue에서 삭제된다.

    consume은 실제로 message가 수행이 됐을때?아니면 consumer로 수신되었을때?
    acknowledge는??

message가 수신/발신되기 위해서는 아래와 같은 단계를 거친다.
1. exchange 생성
2. queue 생성
3. exchange와 queue 바인딩

Celery는 task_queues 속에 queue에 필요한 객체를 자동으로 생성해준다(queue의 auto_declare가 False인 경우에는 안됨)

3개의 queue에 대한 예제. 하나는 video를 위한 'video' queue, 하나는 image를 위한 'image' queue, 마지막으로 나머지 다른 모든것을 위한 'default' queue

from kombu import Exchange, Queue

app.conf.task_queues = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('videos',  Exchange('media'),   routing_key='media.video'),
    Queue('images',  Exchange('media'),   routing_key='media.image'),
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange_type = 'direct'
app.conf.task_default_routing_key = 'default'

exchange types

exchange type은 message가 exchange를 통해 queue로 라우팅 되는 방식을 정의한다. 표준적으로 정의된 exchange type은 direct, topic, fanout, headers가 있다. 또한 비표준 exchange타입도 플러그인으로써 Rabbit-MQ에 적용이 가능하다.

direct exchanges

Direct exchanges는 정확한 라우팅 키로 매칭되어, 만약 큐가 'video' 라는 라우팅 키에 바인딩 되어 있을 때, 그 queue는 오직 'video' 라우팅 키를 갖는 messge만 받는다.

topic exchanges

topic exchange는 라우팅 키들을 "."으로 분리된 단어들과 와일드 카드 문자들(* - 한개의 단어, # - 0 혹은 더 많은 단어들)로 매칭한다.

"usa.news, usa.weather, norway.news, norway.weather" 와 같은 단어들 뿐만 아니라, *.news (모든 뉴스), usa.# (USA에 관련된 모든 것들), usa.weather (모든 USA 날씨) 도 바인딩 될 수 있다.

exchange와 관련된 api 명령들

exchange.declare(exchange_name, type, passive,
durable, auto_delete, internal)

exchange 선언
- passive :exchange가 생성되지 않음. 또 이를 통해 exchange가 이미 존재하는지의 여부를 확인할 수 있다.
- durable : durable(지속적인)한 exchange는 영속적이다. 예를들어, 브로커가 재시작될 때에도 지속된다.

  • auto_delete : 이 exchange를 사용하는 queue가 더 없을 경우, 이 exchange는 자동으로 삭제된다.
queue.declare(queue_name, passive, durable, exclusive, auto_delete)

queue 선언
- exclusive : exclusive한 queue는 오직 현재의 커넥션에서만 사용될 수 있다. exclusive 옵션이 설정되면 auto_delete 가 된다.

queue.bind(queue_name, exchange_name, routing_key)

라우팅 키를 통해 queue와 exchange를 바인딩함.
바인딩되지 않은 queue는 message를 받을 수 없으므로, bind가 반드시 필요하다.

queue.delete(name, if_unused=False, if_empty=False)

queue와 binding 설정을 삭제함.

exchange.delete(name, if_unused=False)

exchange 삭제

선언하는것이 반드시 "생성"을 의미하는것은 아니다. 선언을 할때 이는 객체가 존재하고 그것이 사용 가능하다고 주장하는것이다. 꼭 consumer나 producer가 제일 먼저 exchange/queue/binding을 생성해야 한다는 룰은 없다. 보통 그것들을 필요로 하는 쪽에서 먼저 그것들을 생성한다.

API 실습(hands-on with the api)

Celery에는 AMQP API에 대한 CLI 엑세스에 사용되는 celery amqp라는 도구가 함께 제공되어 queue 및 exchange의 생성/삭제, queue제거 또는 message 전송과 같은 관리작업에 엑세스 할 수 있다. AMQP가 아닌 브로커에도 사용 할 수 있지만, 이 경우에는 모든 커맨드의 사용이 제한 될 수 있다.
(redis는 amqp가 아니므로 이건 다음에 기회가 될때 계속 번역함)

profile
망원동 개발자

0개의 댓글