Rabbitmq with Golang

Divan·2022년 6월 12일
1
post-thumbnail

Go에서 rabbitmq를 연결하고 메세지를 전달 해보자.
rabbitmq에는 publish 하는 4가지 방식 중 topic 방식을 테스트 진행하겠습니다.(Topic방식이 pub-sub방식이라고 생각하면된다.)

  • Direct Exchange
    -> Message의 Routing Key와 정확히 일치하는 Binding된 Queue로 Routing
  • Fanout Exchange
    -> Binding된 모든 Queue에 Message를 Routing
  • Topic Exchange
    -> 특정 Routing Pattern이 일치하는 Queue로 Routing
  • Headers Exchange
    ->key-value로 정의된 Header 속성을 통한 Routing

출처: https://coding-start.tistory.com/371 [코딩스타트:티스토리]


사용법 요약

  • Send
    연결 생성 → 채널 생성 → exchange선언 -> publish
  • Receive
    연결 생성 → 채널 생성 -> exchange선언 -> queue선언 → queue바인딩 → 메시지 consume

Golang code

Send

  1. 연결 생성
// 해당 parameter값으로 TCP 소켓으로 연결을 한다.

connection, err := amqp.Dial(os.Getenv("AMQP_URL"))
defer connection.Close()
  1. 채널 생성
// 채널 생성 (메세지를 주고 받는 통로라고 생각하면 될거 같습니다.)
channel, err := connection.Channel()
defer channel.Close()
  1. Exchage선언
/* rabbitmq에 borker에는 exchage와 messagequeue가 있는데, publish할때는 exchage를 통해서하게 된다.
동일한 exchange가 없을때 생선된다. 
arguments 설명
name string,-> exchage의 이름
kind string, -> Direct, Fanout, Topic, Headers
durable bool, -> 직역하면 내구성이 있는, 오래가는 이라는 뜻을 가진다.
이 파라미터는 이 뜻처럼 RabbitMQ 서비스가 종료되었을 때, 이 큐에 있는 메시지들을 보존할 것인가? 에 대한 물음이다. true를 하는 경우, 메시지의 속성에 따라 메시지를 보존하며, false의 경우에는 모든 메시지가 사라진다
autoDelete bool, ->  큐에 대한 subscriber가 없을 때, 큐를 삭제할 것인지에 대한 여부이다.
false일 경우에는 Queue를 삭제하기 위해서 QueueDelete같은 함수를 호출해 삭제해야 한다.
internal bool, -> exchange가 publisher에 의해 연결되는 것이 아니라, exchange가 다른 exchange에 의해 연결될 수 있다고 한다. 그러나 아쉽게도 혹은 다행히도 버전이 올라가면서 deprecated 되었다고 한다. 그러므로 패스!
noWait bool, -> 여부는 async 모드로 동작할지에 대한 여부를 묻는 것이다.
만약 false일 경우, 큐 선언 생성 함수는 서버에 큐를 생성하라 요청하고 그에 대한 결과를 받은 이후에 함수가 종료된다.
반대로 true인 경우, 큐 선언 함수는 서버에 큐를 생성하라고 요청하고, 즉시 함수가 종료된다.
만약 문제가 발생하는 경우에는 Channel 사용 도중에 에러가 발생하므로, 그냥 쓰면 된다.
다른 함수에도 noWait이 있는데, 동작은 다 동일하므로, 이후 설명에서는 제외한다.
args amqp.Table ->  exchange 타입에서 설명한 header 타입일때 사용하는 것과 Queue optional 설정들을 하기 위한 테이블이다.
*/

err = channel.ExchangeDeclare("exchageName", "topic", true, false, false, false, nil)

ps. 위의 내용은 한번만 실행됨(setup), 이후에는 chennel에 연결하고 publish만하면 된다.
4. Publish

/*메세지를 exchage에 전달합니다.
exchange string,-> 연결하고자하는 exchage Name
key string, -> routing key
mandatory bool, -> 직역하면 의무적인이라는 뜻을 가지며,
true이면 적어도 하나 이상의 queue에 메시지가 넘어가야 publish가 성공한다.
false이면 큐에 넘어가던 안가던 신경쓰지 않는다.
immediate bool, -> 한명의 consumer가 이 메시지를 가져갔는지에 대한 확인이다.
만약 true일때, 전달된 메시지가 있는 큐에 한명도 consumer가 없으면 에러가 발생한다.
false이면, consumer가 받던 말던 신경쓰지 않는다.
msg amqp.Publishing -> 보내고자하는 메세지
*/
channel, err := connection.Channel()
	if err != nil {
		return err
	}

err = channel.Publish(exchange, EventName(), false, false, msg)
return err

Recive

  1. 연결 생성 → 채널 생성
// 설명은 생략
connection, err := amqp.Dial(os.Getenv("AMQP_URL"))
defer connection.Close()

channel, err := connection.Channel()
defer channel.Close()
  1. exchange선언
// 설명생략(send)와 같음 
err = channel.ExchangeDeclare("exchageName", "topic", true, false, false, false, nil)
  1. queue선언
/* arg 설명
name string, -> 큐의 이름, 큐를 구분하기위한수단,
noWait bool,args amqp.Tabledurable bool, autoDelete bool, -> 설명생략
exclusive bool, -> 해당 Queue를 혼자 쓸 것인지에 대한 여부이다.
true일 경우, 오직 하나의 subscriber만 이를 사용할 수 있으며, 해당 subscriber가 연결을 종료하면 Queue는 삭제된다.
*/
_, err = channel.QueueDeclare("queueName", true, false, false, false, nil)

ps. 위의 내용은 한번만 실행됨(setup), 이후에는 chennel에 연결하고 binding, consum 만하면 된다.
4. queue바인딩

/*
name string, -> 바인딩하고자하는 queue이름
key string, ->#(여러단어), *(한단어)를 통한 문자열 패턴 매칭, publish 할때 설정되는 key에 맞게 설정하면 해당 key 일치하는 메세지만 받고, "#"으로 바인딩하면 다른 key로 매칭된 메세지도 해당 큐에 메세지로 전달된다.
exchange string, -> 바이딩하고자하는 exchage이름,
noWait bool,args amqp.Table -> 설명생략
*/
err := channel.QueueBind(l.queue, event, l.exchange, false, nil)
  1. consume
/*
queue string,  -> queue에 연결되므로 큐의 이름을 설정
consumer string, -> subscriber를 identifying 하기 위해서 conumer 이름을 갖는다.
만약 consumer가 빈 문자열이면, RabbitMQ에서 자동으로 unique한 이름을 설정해준다.
autoAck bool, exclusive bool, noLocal bool, -> 생력
noWait bool, args amqp.Table -> 생략
*/
msgs, err := channel.Consume("my_queue", "", false, false, false, false, nil)

출처 : 쪼랩전사 (https://chobowarrior.tistory.com/3)

제가 공부하고자 정리한 내용입니다. 문제가 있을시 삭제 및 수정하겠습니다. 참고자료 출저에 대한 내용은 링크를 달아 두었습니다. 감사합니다.

profile
하루 25분의 투자

0개의 댓글