[RabbitMq] MessageMQ

Vorhandenheit ·2022년 10월 6일
0

JS/Node 

목록 보기
63/63

[RabbitMQ] MessageMQ

MQTT 다룰 때 잠깐 언급하고 넘어갔었었는데, 다시 이렇게 마주치게 되었습니다. 오늘 다뤄볼 내용은 messageMQ입니다. Node.js 디자인 패턴이라는 책에도 나오고, 다른 책에서도 messageMQ에 대해 다룰 때 항상 rabbitmq가 나와 설치와 연결 messagemq까지 사용해보고자 합니다.

메세지 큐는 메세지를 정확하게 전달할 수 있습니다. mqtt에서는 메세지를 받지못하면 그냥 날라가지만, 메세지 큐를 사용했을 경우, 장애나 오작동시 서버가 메세지를 받지못하는 상황이 될경우 메세지 큐에 저장되고 누적되고, 해당 서버가 다시 돌아왔을 때 큐에 축적된 메세지가 다시 서버로 전달됩니다.

설치

설치가 끝났다면 localhost:15672 주소로 들어가면 창이뜹니다.

1. 셋팅

위의 localhost:15672 로 들어가서 만들어줘야할께 있습니다.

(1) Exchange

messagemq는 mqtt와 다르게 메세지를 보내면 바로 가는게 아니라, 가운데 거쳐야할 곳이 많습니다.
이 exchange는 다른 queue에게 routing 하는 역할을 합니다.
- Direct Exchange : 전체 라우팅 키를 일치시켜 메시지를 라우팅 합니다.
- Topic Exchange : 라우팅 키와 일치하는 glob-like 패턴을 사용하여 메시지를 배포합니다.
- Fanout exchange : 제공된 라우팅 키를 무시하고 연결된 모든 대기열에 메시지를 브로드캐스트합니다.


위의 화면으로 들어가서 add new Exchange를 해줘야합니다. rabbitTest라는 exchange를 만들 것입니다.

=> refused가 떳다면 권한 설정으로 인해 되지않는 것입니다.
rabbit command 창으로 가서 rabbitmqctl set_permissions [사용자이름] ".*", ".*", ".*" 입력해줘야합니다. 대충 다 허용해준다는 뜻입니다. 궁금하시면 더 찾아보세요 특정 사용자만 가능하도록 권한설정이 가능합니다.
그리고 다시 add Exchange를 하면 정상적으로 만들어지는 걸 볼 수 있습니다.

(2) Queue

MQTT 같은 경우는 메세지를 보내면 UDP와 비슷하게 뿌리기만 하지, 응답을 확인하거나 쌓이거나 하지않습니다. Queue라는 곳에 쌓음으로 혹시나 서버가 다운되더라도 나중에 복구된 이후에 받지못한 데이터를 이 Queue에서 받아 올 수 있습니다.
- duralble : 브로커가 다시 시작되면 대기열이 자동으로 생성됨을 의미합니다.
- exclusive : 대기열이 하나의 특정 가입자 연결에만 바인딩된다는 걸 의미합니다 연결이 닿으면 큐가 삭제됩니다.
- Auto-delete : 마지막 구독자가 연결을 끊을 떄 대기열이 삭제됩니다.


queue 창으로 들어가서 add Queue를 합니삳!

(3) Binding

queue를 만들었다면 이제 exchange에서 queue를 binding 해줘야합니다.

익스체인지와 큐 사이의 링크입니다. 익스체인지에서 도착하는 메시지를 필터링하는데 사용되는 라우팅 키 또는 패턴을 정의합니다.

2. Test

수신 코드

const amqp = require('amqplib/callback_api');
const url = 'amqp://guest:guest@localhost:5672';
// 아이디: 비밀번호 / 15672 하면 안됩ㄴ디
const queueName = 'queueTest';

amqp.connect(url, function(error, connect){
    if(error){
        console.log(error);
        return;
    }
    connect.createChannel(function(error, channel){
        if(error){
            console.log(error);
            return;
        }
        channel.assertQueue(queueName, {durable: true}, function(error){
            let recevieMessage = function(){
                channel.get(queueName, {}, function(error, message){
                    if(error){
                        console.log(error);
                    }
                    else if(message){
                        console.log(message.content.toString());
                        channel.ack(message);
                        setTimeout(recevieMessage, 1000);
                    }
                    else{
                        console.log('NO MESSAGE');
                        setTimeout(recevieMessage, 1000);
                    }
                });
            }
            recevieMessage();
        });
    });
});

송신 코드


var amqp = require('amqplib/callback_api');

const url = 'amqp://guest:guest@localhost:5672';
const queueName = 'queueTest';

amqp.connect(url, function(error, connect){
    if(error){
        console.log(error);
        return;
    }
    connect.createChannel(function(error, channel){
        if(error){
            console.log(error);
            return;
        }
        channel.assertQueue(queueName, {durable: true}, function(error){
            let sendData = {
                type : 'message',
                message : 'test message!'
            };
            channel.sendToQueue(queueName, encode(sendData), {
                persistent: true
            });
        });
    });
});

function encode(doc) {  
    return Buffer.from(JSON.stringify(doc));
}

정상적으로 데이터가 넘어오는 걸 볼 수있습니다.

좀 더 살펴보자

point to pointo 통신 유형

  • send.js
const amqp = require('amqplib/callback_api');
const url = 'amqp://guest:guest@localhost:5672';


async function main () {
    const connection = await amqp.connect(url)
    const channel = await connection.createConfirmChannel()
    await channel.assertQueue('queueTest')


    const generateObj = generateTask(searchHash, alphabet, maxLength, batch_size)

    for (const task of generateObj) {
        channel.sendToQueue('queueTest', Buffer.from(JSON.stringify(task)))
    }

    await channel.waitForConfirms()
    channel.close()
    connection.close()
}
main().catch(err => { console.log(err)})
  • exchange나 binding이 없기 떄문에 설정이 훨씬 간단합니다.
  • confirmChannel : 추가 기능이 있는 채널을 생성합니다.
    - waitForConfirms : 모든 메세지가 로컬 대기열에서 발송되기 전에 애플리케이션이 브로커에 연결을 너무 빨리 닫는 걸 방지해줍니다.
  • receiver
const amqp = require('amqplib/callback_api');
const url = 'amqp://guest:guest@localhost:5672';


async function main () {
    const connection = await amqp.connection(url)
    const channel = await connection.createChannel()
    const { queue } = await channel.assertQueue('queueTest')

    channel.consume(queue, async (rawMessage) => {
        const found = processTaks(JSON.parse(rawMessage.content.toString()))
        if (found) {
            await channel.sendToQueueu('queueTest', Buffer.from('Found'))
        }
        await channel.ack(rawMessage)
    })
}
main().catch(err => { console.log(err)})

channel.consume()을 통해서 작업을 수신하는 방법을 볼 수 있습니다. 일치하는 항목 발견될 때마다 결과를 queueTest를 통해 수집기에 다시 보냅니다
메시지가 완전히 처리된 다음에는 channel.ack()를 사용하여 모든 메시지에 응답 확인을 합니다.

  • 결과 수집기
const amqp = require('amqplib/callback_api');
const url = 'amqp://guest:guest@localhost:5672';


async function main () {
    const connection = await amqp.connect(url)
    const channel = await connection.createChannel()
    const { queue } = await channel.assertQueue('queueTest')

    channel.consume(queue, msg => {
        console.log(msg)
    })
}
main().catch(err => { console.log(err)})
profile
읽고 기록하고 고민하고 사용하고 개발하자!

0개의 댓글