NodeJS and RabbitMQ 구현

임쿠쿠·2022년 2월 20일
0
post-thumbnail

1. 요약

1) AuthService : 회원가입 및 로그인 후 토큰 발급 / 7070 포트
2) ProductService: 물품 생성 및 구입 / 8080 포트
3) OrderService: 구입한 물품 List 오더 처리 / 9090 포트

4) rabbitMQ 적용

  • Product Service에서 물품 구입 시, Order Queue에 구입한 물품 List 전달
  • Order Service에서 Order Queue Consume 후, 결과에 대해 Product Queue 전달
  • Product Service에서 Order Queue Consume 후 결과 리턴

5) 소스코드
https://github.com/kimkevin90/node_rabitMQ

2. 코드 구현

  • sequelize 및 express 에 대한 내용은 생략합니다.

npm i express jsonwebtoken amqplib nodemon
npm install --save mysql2 sequelize

1) AuthService 생성

const express = require('express');

const app = express();
const PORT = process.env.PORT_ONE || 7070;
const { sequelize, User } = require('./models');


const jwt = require('jsonwebtoken');

app.use(express.json());

// 로그인 구현 비밀번호 해쉬 생략
app.post('/auth/login', async (req, res) => {
  const { email, password } = req.body;
  const user = await User.findOne({ where: { email } });
  if (!user) {
    return res.json({ message: "User doesn't exist" });
  }
  if (password !== user.password) {
    return res.json({ message: 'Password Incorrect' });
  }
  const payload = {
    email,
    name: user.name,
  };
  jwt.sign(payload, 'secret', (err, token) => {
    if (err) console.log(err);
    else return res.json({ token });
  });
});


// 회원가입
app.post('/auth/register', async (req, res) => {
  const { email, password, name } = req.body;
  const userExists = await User.findOne({ where: { email } });
  if (userExists) {
    return res.json({ message: 'User already exists' });
  }
  const newUser = new User({
    email,
    name,
    password,
  });
  await newUser.save();
  return res.json(newUser);
});


sequelize
  .sync({ force: true })
  .then(() => {
    console.log('데이터베이스 연결 성공');
  })
  .catch((err) => {
    console.error(err);
  });


app.listen(PORT, () => {
  console.log(`Product-Service at ${PORT}`);
});

2) ProductService 생성

const express = require('express');

const app = express();
// const { Sequelize } = require('sequelize');
const PORT = process.env.PORT_ONE || 8080;
const { sequelize } = require('./models');
const amqp = require('amqplib');

const { Product } = require('./models');
const isAuthenticated = require('../isAuthenticated');

const { Op } = require('sequelize');

let order;

let channel,
  connection;

app.use(express.json());


sequelize
  .sync({ force: false })
  .then(() => {
    console.log('데이터베이스 연결 성공');
  })
  .catch((err) => {
    console.error(err);
  });

async function connect() {
  const amqpServer = 'amqp://localhost:5672';
  connection = await amqp.connect(amqpServer);
  channel = await connection.createChannel();
  await channel.assertQueue('PRODUCT');
}
connect();

app.post('/product/buy', isAuthenticated, async (req, res) => {
  const { ids } = req.body;
  const products = await Product.findAll({
    where: { id: { [Op.in]: ids } },
  });
  // DB에서 User가 구입한 Product List를 찾은 후 ORDER Queue 생성
  channel.sendToQueue(
    'ORDER',
    Buffer.from(
      JSON.stringify({
        products,
        userEmail: req.user.email,
      }),
    ),
  );
  
  // Order Service에서 PRODUCT Queue Consume
  channel.consume('PRODUCT', (data) => {
    console.log('Consume Product');
    order = JSON.parse(data.content);
    channel.ack(data);
  });
  return res.json(order);
});

app.post('/product/create', isAuthenticated, async (req, res) => {
  const { name, description, price } = req.body;
  const newProduct = new Product({
    name,
    description,
    price,
  });
  await newProduct.save();
  return res.json(newProduct);
});

app.listen(PORT, () => {
  console.log(`Product-Service at ${PORT}`);
});

3) OrderService 생성

const express = require('express');

const app = express();
const PORT = process.env.PORT_ONE || 9090;
const { sequelize } = require('./models');
const amqp = require('amqplib');

const { Order } = require('./models');

let channel,
  connection;

sequelize
  .sync({ force: false })
  .then(() => {
    console.log('데이터베이스 연결 성공');
  })
  .catch((err) => {
    console.error(err);
  });

app.use(express.json());

async function createOrder(products, userEmail) {
  let total = 0;
  for (let t = 0; t < products.length; ++t) {
    total += products[t].price;
  }
  const newOrder = new Order({
    products: JSON.stringify(products),
    user: userEmail,
    total_price: total,
  });
  await newOrder.save();
  return newOrder;
}

async function connect() {
  const amqpServer = 'amqp://localhost:5672';
  connection = await amqp.connect(amqpServer);
  channel = await connection.createChannel();
  await channel.assertQueue('ORDER');
}
// PRODUCT SERVICE에서 받아온 ORDER QUEUE Consume
connect().then(() => {
  channel.consume('ORDER', async (data) => {
    console.log('Consuming ORDER service');
    const { products, userEmail } = JSON.parse(data.content);
    const newOrder = await createOrder(products, userEmail);
    channel.ack(data);
    // DB에 저장 후 결과 값 PRODUCT QUEUE로 생성
    channel.sendToQueue(
      'PRODUCT',
      Buffer.from(JSON.stringify({ newOrder })),
    );
  });
});

app.listen(PORT, () => {
  console.log(`Order-Service at ${PORT}`);
});

4) 인증 미들웨어 생성

const jwt = require('jsonwebtoken');

module.exports = async function isAuthenticated(req, res, next) {
  const token = req.headers.authorization.split(' ')[1];

  jwt.verify(token, 'secret', (err, user) => {
    if (err) {
      return res.json({ message: err });
    }
    req.user = user;
    next();
  });
};

3. 테스트

1) Docker로 rabbitMQ 실행

docker run -p 5672:5672 rabbitmq

2) AuthService / ProductService 실행

3) 회원가입 및 로그인 후 토큰 적용

4) ProductService 물품 구입 POST 요청

5) OrderService 실행 및 Consume 확인

  • 물품 구입 POST 요청 3번 후, OrderService 실행하면 다음과 같이 3번은 Consume 실행

4. 결론

  • Service 레이어를 분리한 후, 해당 서비스에 오류가 난 후 재 가동하면, 쌓인 Queue를 Consume하여 서비스를 재게한다.
  • 추후 AWS AutoScaling 및 Multi - AZ를 적용한 후 테스트 해보고 싶다.
profile
Pay it forward

0개의 댓글