MQTT 다룰 때 잠깐 언급하고 넘어갔었었는데, 다시 이렇게 마주치게 되었습니다. 오늘 다뤄볼 내용은 messageMQ입니다. Node.js 디자인 패턴이라는 책에도 나오고, 다른 책에서도 messageMQ에 대해 다룰 때 항상 rabbitmq가 나와 설치와 연결 messagemq까지 사용해보고자 합니다.
메세지 큐는 메세지를 정확하게 전달할 수 있습니다. mqtt에서는 메세지를 받지못하면 그냥 날라가지만, 메세지 큐를 사용했을 경우, 장애나 오작동시 서버가 메세지를 받지못하는 상황이 될경우 메세지 큐에 저장되고 누적되고, 해당 서버가 다시 돌아왔을 때 큐에 축적된 메세지가 다시 서버로 전달됩니다.
설치가 끝났다면 localhost:15672 주소로 들어가면 창이뜹니다.
위의 localhost:15672 로 들어가서 만들어줘야할께 있습니다.
messagemq는 mqtt와 다르게 메세지를 보내면 바로 가는게 아니라, 가운데 거쳐야할 곳이 많습니다.
이 exchange는 다른 queue에게 routing 하는 역할을 합니다.
- Direct Exchange : 전체 라우팅 키를 일치시켜 메시지를 라우팅 합니다.
- Topic Exchange : 라우팅 키와 일치하는 glob-like 패턴을 사용하여 메시지를 배포합니다.
- Fanout exchange : 제공된 라우팅 키를 무시하고 연결된 모든 대기열에 메시지를 브로드캐스트합니다.
위의 화면으로 들어가서 add new Exchange를 해줘야합니다. rabbitTest라는 exchange를 만들 것입니다.
=> refused가 떳다면 권한 설정으로 인해 되지않는 것입니다.
rabbit command 창으로 가서 rabbitmqctl set_permissions [사용자이름] ".*", ".*", ".*"
입력해줘야합니다. 대충 다 허용해준다는 뜻입니다. 궁금하시면 더 찾아보세요 특정 사용자만 가능하도록 권한설정이 가능합니다.
그리고 다시 add Exchange를 하면 정상적으로 만들어지는 걸 볼 수 있습니다.
MQTT 같은 경우는 메세지를 보내면 UDP와 비슷하게 뿌리기만 하지, 응답을 확인하거나 쌓이거나 하지않습니다. Queue라는 곳에 쌓음으로 혹시나 서버가 다운되더라도 나중에 복구된 이후에 받지못한 데이터를 이 Queue에서 받아 올 수 있습니다.
- duralble : 브로커가 다시 시작되면 대기열이 자동으로 생성됨을 의미합니다.
- exclusive : 대기열이 하나의 특정 가입자 연결에만 바인딩된다는 걸 의미합니다 연결이 닿으면 큐가 삭제됩니다.
- Auto-delete : 마지막 구독자가 연결을 끊을 떄 대기열이 삭제됩니다.
queue 창으로 들어가서 add Queue를 합니삳!
queue를 만들었다면 이제 exchange에서 queue를 binding 해줘야합니다.
익스체인지와 큐 사이의 링크입니다. 익스체인지에서 도착하는 메시지를 필터링하는데 사용되는 라우팅 키 또는 패턴을 정의합니다.
const amqp = require('amqplib/callback_api');
const url = 'amqp://guest:guest@localhost:5672';
// 아이디: 비밀번호 / 15672 하면 안됩ㄴ디
const queueName = 'queueTest';
amqp.connect(url, function(error, connect){
if(error){
console.log(error);
return;
}
connect.createChannel(function(error, channel){
if(error){
console.log(error);
return;
}
channel.assertQueue(queueName, {durable: true}, function(error){
let recevieMessage = function(){
channel.get(queueName, {}, function(error, message){
if(error){
console.log(error);
}
else if(message){
console.log(message.content.toString());
channel.ack(message);
setTimeout(recevieMessage, 1000);
}
else{
console.log('NO MESSAGE');
setTimeout(recevieMessage, 1000);
}
});
}
recevieMessage();
});
});
});
var amqp = require('amqplib/callback_api');
const url = 'amqp://guest:guest@localhost:5672';
const queueName = 'queueTest';
amqp.connect(url, function(error, connect){
if(error){
console.log(error);
return;
}
connect.createChannel(function(error, channel){
if(error){
console.log(error);
return;
}
channel.assertQueue(queueName, {durable: true}, function(error){
let sendData = {
type : 'message',
message : 'test message!'
};
channel.sendToQueue(queueName, encode(sendData), {
persistent: true
});
});
});
});
function encode(doc) {
return Buffer.from(JSON.stringify(doc));
}
정상적으로 데이터가 넘어오는 걸 볼 수있습니다.
const amqp = require('amqplib/callback_api');
const url = 'amqp://guest:guest@localhost:5672';
async function main () {
const connection = await amqp.connect(url)
const channel = await connection.createConfirmChannel()
await channel.assertQueue('queueTest')
const generateObj = generateTask(searchHash, alphabet, maxLength, batch_size)
for (const task of generateObj) {
channel.sendToQueue('queueTest', Buffer.from(JSON.stringify(task)))
}
await channel.waitForConfirms()
channel.close()
connection.close()
}
main().catch(err => { console.log(err)})
const amqp = require('amqplib/callback_api');
const url = 'amqp://guest:guest@localhost:5672';
async function main () {
const connection = await amqp.connection(url)
const channel = await connection.createChannel()
const { queue } = await channel.assertQueue('queueTest')
channel.consume(queue, async (rawMessage) => {
const found = processTaks(JSON.parse(rawMessage.content.toString()))
if (found) {
await channel.sendToQueueu('queueTest', Buffer.from('Found'))
}
await channel.ack(rawMessage)
})
}
main().catch(err => { console.log(err)})
channel.consume()을 통해서 작업을 수신하는 방법을 볼 수 있습니다. 일치하는 항목 발견될 때마다 결과를 queueTest를 통해 수집기에 다시 보냅니다
메시지가 완전히 처리된 다음에는 channel.ack()를 사용하여 모든 메시지에 응답 확인을 합니다.
const amqp = require('amqplib/callback_api');
const url = 'amqp://guest:guest@localhost:5672';
async function main () {
const connection = await amqp.connect(url)
const channel = await connection.createChannel()
const { queue } = await channel.assertQueue('queueTest')
channel.consume(queue, msg => {
console.log(msg)
})
}
main().catch(err => { console.log(err)})