현재 서비스에서 결제 서비스와 대출 서비스를 운영하고 있다.
결제 서비스, 대출 서비스 모두 결제 및 대출이 일어나면 해당 액션에 대한 트리거로 웹훅을 전송해주는 모듈이다.
우리 서비스에 녹여져있는 webhook flow 를 보기 전에 rabbitmq 에 대해서 간단하게 알아보자.
비동기 작업을 하기 위해서 주로 사용하는 메세지큐의 종류 중 하나이다.
ex) rabbitmq, kafka, etc...
------------ broker ------------
producer -> [exchange -> queue(binding rule)] -> comsumer
producer 가 메세지를 발송하면, 무조건 모든 메세지는 broker의 exchange 로 전달된다.
exchange 는 어떤 queue 로 보내줄지 binding rule 에 의거해서 queue 로 메세지를 복사하고 이를 consumer 가 수신한다.
consumer 가 메세지를 받기만 하면 큐에서 메세지를 삭제한다.
consumer 가 메세지를 받고 잘 받았다는 신호를 보내면 큐에서 삭제한다.
이 신호를 consumer acknowledge
라고 한다.
각 exchange 에는 어떤 큐에 어떤 방식으로 메세지를 보낼지에 대한 규칙을 지정할 수 있다. 이 규칙은 binding 이라고 한다.
해당 웹훅을 우리 서비스(api server)가 점검중이거나 혹은 예기치 못한 상황에 전송을 할 수도 있기 때문에, API Gateway 가 트리거로 걸린 람다 함수를 사용하고 있다. 아래는 해당 AWS lambda diagam 이다
1.결제 및 모듈에서 우리가 제공한 API Gateway 의 엔드포인트로 웹훅을 전송하면
2.그에 할당된 람다함수가 작동하여 RabbitMQ
로 메세지를 날린다.
3.그러면 서비스 API 서버가 해당 MQ에서 메세지를 가져간다.
4.메세지를 수신하고 로직을 처리하고 나서 Consumer Acknowledge
를 MQ 쪽으로 반환해준다.
@MessagePattern({ cmd: 'webHook' })
async webHook(@Ctx() context: RmqContext): Promise<void> {
const content = JSON.parse(context.getMessage().content);
const data = content?.data;
const jsonData = data?.body;
const mqChannel = context.getChannelRef();
try {
await this.service.webHook(jsonData);
mqChannel.ack(context.getMessage());
} catch (error) {
mqChannel.ack(context.getMessage());
}
}
mqChannel.ack()
를 통해서
"나 메세지 잘 수신했고, 잘 처리했어~" 라고 MQ 에 다시 알려준다.
그러면 해당 메세지를 MQ 는 다시 publish 하지 않는다.
이 말을 반대로 생각하면,
mqChannel.ack()
를 하지 않는다면, "잘 못 받았나? 무슨 문제가 있나?" 라고 간주해서 반복적으로 Publish 한다.
테스트로나 서비스로나 한번도 발송하지 않은 메세지들이 계속 위 코드를 통해서 수신하고 있던 것을 확인할 수 있었다.
[Nest] 1 - 12/07/2023, 4:58:48 AM ERROR [RpcExceptionsHandler] Unexpected token o in JSON at position 1
SyntaxError: Unexpected token o in JSON at position 1 at JSON.parse (<anonymous>)
원치 않은 형식의 message 가 날라와서 파싱하는데 있어서 계속 문제가 발생했다.
근데 이게 30분 마다 로그에 반복적으로 찍혔다.
혹시 noAck message 를 계속 전송하는건가?
생각해보니, 개발환경에 올리기 전에 로컬에서 테스트 했었는데, 이때 람다를 자체적으로 실행하기 위해서 input message 형태를 임의로 수정해서 테스트를 했었던 메세지가 아직도 수신되고 있었던 것이었다.
2023.11.29 일에 publish 된 메세지가 2023.12.07 에도 수신하고 있었다.
aws console 창을 확인해보니 다음과 같았다.
리소스 낭비를 막기 위해 해당 메세지를 큐에서 지우기 위해 찾아보니 아래 화면에서 가장 하단부에 위치한 Purge messages
를 통해서 특정 큐에 존재하는 메세지를 지울 수 있다고 한다.
하지만 그래도 계속 위 에러가 발생했다.
에러가 발생하고 나서 콘솔창을 확인해보니 계속해서 Unack
처리된 메세지가 있고, Redelivered
를 하고 있었다.
unAck message 가 할당된 channel 을 먼저 close 하고 나서, 그 후에 purge message 를 하라고 나와있다. 개발환경에서 해당 channel에 연결중이고, 결제 docker container 만 잠시 내리고 purge 했다.
해결.
https://stackoverflow.com/questions/25114230/rabbitmq-purge-a-queue-from-all-of-its-unacked-messages
https://jonnung.dev/rabbitmq/2019/02/06/about-amqp-implementtation-of-rabbitmq/#gsc.tab=0
https://www.rabbitmq.com/tutorials/amqp-concepts.html