Rabbitmq unAcked message

Jae Min·2023년 12월 7일
1
post-thumbnail

현재 서비스에서 결제 서비스와 대출 서비스를 운영하고 있다.
결제 서비스, 대출 서비스 모두 결제 및 대출이 일어나면 해당 액션에 대한 트리거로 웹훅을 전송해주는 모듈이다.

예를 들어서, 결제가 일어났다 그러면 '결제완료' 혹은 대출을 신청했다 그러면 '대출신청' 이라는 이벤트를 웹훅으로 전송해준다.

우리 서비스에 녹여져있는 webhook flow 를 보기 전에 rabbitmq 에 대해서 간단하게 알아보자.

✅ What is RabbitMQ

비동기 작업을 하기 위해서 주로 사용하는 메세지큐의 종류 중 하나이다.
ex) rabbitmq, kafka, etc...

			------------ broker ------------
producer -> [exchange -> queue(binding rule)] -> comsumer

producer 가 메세지를 발송하면, 무조건 모든 메세지는 broker의 exchange 로 전달된다.
exchange 는 어떤 queue 로 보내줄지 binding rule 에 의거해서 queue 로 메세지를 복사하고 이를 consumer 가 수신한다.

consumer 가 message 를 처리하는 방법

  1. consumer 가 메세지를 받기만 하면 큐에서 메세지를 삭제한다.

    • msa 안에서 MQ 를 사용해서 [gateway -> microservivce] 같이 내부 api request 를 처리할 때 주로 사용할 수 있다.
      내부 api에서 일어나기 때문에 언제든 다시 발송하게 할 수 있기 때문
  2. consumer 가 메세지를 받고 잘 받았다는 신호를 보내면 큐에서 삭제한다.
    이 신호를 consumer acknowledge 라고 한다.

    • 외부 모듈 혹은 외부 api 와 통신할 때 주로 사용한다.
      정상적으로 정해진 로직으로 처리를 하지 못하거나 시스템 적으로 문제가 발생하면 다시 수신을 해서 처리를 해야 하는 경우가 생기기 때문.

exchange 종류

binding

각 exchange 에는 어떤 큐에 어떤 방식으로 메세지를 보낼지에 대한 규칙을 지정할 수 있다. 이 규칙은 binding 이라고 한다.

✅ webhook flow

해당 웹훅을 우리 서비스(api server)가 점검중이거나 혹은 예기치 못한 상황에 전송을 할 수도 있기 때문에, API Gateway 가 트리거로 걸린 람다 함수를 사용하고 있다. 아래는 해당 AWS lambda diagam 이다

1.결제 및 모듈에서 우리가 제공한 API Gateway 의 엔드포인트로 웹훅을 전송하면
2.그에 할당된 람다함수가 작동하여 RabbitMQ 로 메세지를 날린다.
3.그러면 서비스 API 서버가 해당 MQ에서 메세지를 가져간다.
4.메세지를 수신하고 로직을 처리하고 나서 Consumer Acknowledge 를 MQ 쪽으로 반환해준다.

✅ subscribe & publish message

@MessagePattern({ cmd: 'webHook' })
  async webHook(@Ctx() context: RmqContext): Promise<void> {
    const content = JSON.parse(context.getMessage().content);
    const data = content?.data;
    const jsonData = data?.body;

    const mqChannel = context.getChannelRef();
    try {
      await this.service.webHook(jsonData);
      mqChannel.ack(context.getMessage());
    } catch (error) {
      mqChannel.ack(context.getMessage());
    }
  }

mqChannel.ack() 를 통해서
"나 메세지 잘 수신했고, 잘 처리했어~" 라고 MQ 에 다시 알려준다.
그러면 해당 메세지를 MQ 는 다시 publish 하지 않는다.
이 말을 반대로 생각하면,
mqChannel.ack() 를 하지 않는다면, "잘 못 받았나? 무슨 문제가 있나?" 라고 간주해서 반복적으로 Publish 한다.

✅ publish noAck message

테스트로나 서비스로나 한번도 발송하지 않은 메세지들이 계속 위 코드를 통해서 수신하고 있던 것을 확인할 수 있었다.

[Nest] 1  - 12/07/2023, 4:58:48 AM   ERROR [RpcExceptionsHandler] Unexpected token o in JSON at position 1
SyntaxError: Unexpected token o in JSON at position 1 at JSON.parse (<anonymous>)

원치 않은 형식의 message 가 날라와서 파싱하는데 있어서 계속 문제가 발생했다.
근데 이게 30분 마다 로그에 반복적으로 찍혔다.
혹시 noAck message 를 계속 전송하는건가?
생각해보니, 개발환경에 올리기 전에 로컬에서 테스트 했었는데, 이때 람다를 자체적으로 실행하기 위해서 input message 형태를 임의로 수정해서 테스트를 했었던 메세지가 아직도 수신되고 있었던 것이었다.
2023.11.29 일에 publish 된 메세지가 2023.12.07 에도 수신하고 있었다.
aws console 창을 확인해보니 다음과 같았다.

  • 첫번째 이미지를 보면, Redelivered 메세지가 반복적으로 날라오고 있었다. consumer가 ack 처리를 하지 않아 재전송한 것으로 보인다.
  • 두번째 이미지를 보면 해당 메세지를 두번째 채널에서 Unacked 처리한 메세지가 하나 있다는 것을 알 수 있다.

리소스 낭비를 막기 위해 해당 메세지를 큐에서 지우기 위해 찾아보니 아래 화면에서 가장 하단부에 위치한 Purge messages 를 통해서 특정 큐에 존재하는 메세지를 지울 수 있다고 한다.

아래는 aws rabbitmq console 이다.

하지만 그래도 계속 위 에러가 발생했다.
에러가 발생하고 나서 콘솔창을 확인해보니 계속해서 Unack 처리된 메세지가 있고, Redelivered 를 하고 있었다.

✅ how to solve

unAck message 가 할당된 channel 을 먼저 close 하고 나서, 그 후에 purge message 를 하라고 나와있다. 개발환경에서 해당 channel에 연결중이고, 결제 docker container 만 잠시 내리고 purge 했다.

다른 방법으로 JSON.parse 가 실패해도 무조건 ack 처리를 하는것도 나쁘지 않을듯 하다. 애초에 정해진 메세지가 아닌 이상한 형태의 메세지가 왔다는 것 자체가 해당 메세지로 로직을 처리할 필요가 없으니까.

해결.


Ref

https://stackoverflow.com/questions/25114230/rabbitmq-purge-a-queue-from-all-of-its-unacked-messages
https://jonnung.dev/rabbitmq/2019/02/06/about-amqp-implementtation-of-rabbitmq/#gsc.tab=0
https://www.rabbitmq.com/tutorials/amqp-concepts.html

profile
자유로워지고 싶다면 기록하라.

0개의 댓글