01.RabbitMQ

이지훈·2024년 6월 7일

RabbitMQ

목록 보기
1/1
post-thumbnail

1. RabbitMQ 설치 [다운로드]

2. RabbitMQ 도커로 실행

docker-compose-rabbitmq.yml

version: '3'
services:
  rabbitmq:
    image: rabbitmq:3-management-alpine
    container_name: rabbitmq
    volumes:
      - ./rabbitmq/etc/:/etc/rabbitmq/
      - ./rabbitmq/data/:/var/lib/rabbitmq/
      - ./rabbitmq/logs/:/var/log/rabbitmq/
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_ERLANG_COOKIE: "RabbitMQ-My-Cookies"
      RABBITMQ_DEFAULT_USER: "mbti"
      RABBITMQ_DEFAULT_PASS: "1234"

docker-compose 실행 명령어

docker-compose -f docker-compose-rabbitmq.yml up -d

docker shell 접속 명령어

docker exec -it rabbitmq /bin/bash

docker 명령 실행 - management Plugin 활성화

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_management

명령 정리

사용자 생성하기
rabbitmqctl add_user mbti 1234
가상 호스트 만들기 (옵션)
rabbitmqctl add_vhost myvhost
사용자에게 권한 할당하기
rabbitmqctl set_permissions -p myvhost mbti ".*" ".*" ".*"
Management Plugin 활성화
rabbitmq-plugins enable rabbitmq_management
관리자 계정  추가
rabbitmqctl set_user_tags mbti administrator
계정 삭제
rabbitmqctl delete_user mbti

http://localhost:15672/ 접속하기

3. 코드 구현

기본 테스트 코드 구현 [참조사이트]

application.properties

#### RabbitMQ
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=mbti
spring.rabbitmq.password=1234
rabbitmq.queue.name=chat-queue
rabbitmq.exchange.name=chat-exchange
rabbitmq.routing.key=key

PostMan으로 테스트를 위한 선행 작업

응답 처리를 위해 에이전트 설치
Postman-Agent-win64-0.4.25-Setup.exe

Spring Security에서 csrf 옵션을 사용 중지
http.csrf().disable();

테스트 결과

RabbitMQConfig

package com.example.jhta_3team_finalproject.config.chat;

import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@RequiredArgsConstructor
@Configuration
public class RabbitMQConfig {

    @Value("${spring.rabbitmq.host}")
    private String rabbitmqHost;

    @Value("${spring.rabbitmq.port}")
    private String rabbitmqPort;

    @Value("${spring.rabbitmq.username}")
    private String rabbitmqUserName;

    @Value("${spring.rabbitmq.password}")
    private String rabbitmqPassword;

    @Value("${rabbitmq.queue.name}")
    private String queueName;

    @Value("${rabbitmq.exchange.name}")
    private String exchangeName;

    @Value("${rabbitmq.routing.key}")
    private String routingKey;

    // org.springframework.amqp.core.Queue
    @Bean
    public Queue queue() {
        return new Queue(queueName);
    }

    /**
     * 지정된 Exchange 이름으로 Direct Exchange Bean 을 생성
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(exchangeName);
    }

    /**
     * 주어진 Queue 와 Exchange 을 Binding 하고 Routing Key 을 이용하여 Binding Bean 생성
     * Exchange 에 Queue 을 등록한다고 이해하자
     **/
    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingKey);
    }

    /**
     * RabbitMQ 연동을 위한 ConnectionFactory 빈을 생성하여 반환
     **/
    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(rabbitmqHost);
        connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
        connectionFactory.setUsername(rabbitmqUserName);
        connectionFactory.setPassword(rabbitmqPassword);
        return connectionFactory;
    }

    /**
     * RabbitTemplate
     * ConnectionFactory 로 연결 후 실제 작업을 위한 Template
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    /**
     * 직렬화(메세지를 JSON 으로 변환하는 Message Converter)
     */
    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

RabbitMQController

package com.example.jhta_3team_finalproject.controller;//package com.example.jhta_3team_finalproject.controller;

import com.example.jhta_3team_finalproject.domain.chat.ChatMessage;
import com.example.jhta_3team_finalproject.service.chat.RabbitMQService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;

@Slf4j
@RequiredArgsConstructor
@Controller
public class RabbitMQController {

    private final RabbitMQService rabbitMQService;

    @RequestMapping("/chat/emergency")
    public void sendMessage(ChatMessage chatMessage) throws Exception {
        this.rabbitMQService.sendMessage(chatMessage);
    }

}

RabbitMQService

package com.example.jhta_3team_finalproject.service.chat;//package com.example.jhta_3team_finalproject.service.chat;

import com.example.jhta_3team_finalproject.domain.User.User;
import com.example.jhta_3team_finalproject.domain.chat.ChatMessage;
import com.example.jhta_3team_finalproject.domain.chat.ChatRoom;
import com.example.jhta_3team_finalproject.handler.chat.SocketHandler;
import com.example.jhta_3team_finalproject.mybatis.mapper.chat.ChatMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
 * Queue 로 메세지를 발핼한 때에는 RabbitTemplate 의 ConvertAndSend 메소드를 사용하고
 * Queue 에서 메세지를 구독할때는 @RabbitListener 을 사용
 *
 **/
@Slf4j
@RequiredArgsConstructor
@Service
public class RabbitMQService {

    @Value("${rabbitmq.queue.name}")
    private String queueName;

    @Value("${rabbitmq.exchange.name}")
    private String exchangeName;

    @Value("${rabbitmq.routing.key}")
    private String routingKey;

    private final RabbitTemplate rabbitTemplate;

    private final SocketHandler socketHandler;

    private final ChatService chatService;

    private final ChatSseService chatSseService;

    private final ChatMapper dao;

    /**
     * 1. Queue 로 메세지를 발행
     * 2. Producer 역할 -> Direct Exchange 전략
     */
    public void sendMessage(ChatMessage chatMessage) throws Exception {
        log.info("Send Message : {}", chatMessage.toString());

        /**
         * 2024-06-24, 모든 부서 채팅방의 방 번호를 구함
         */
        List<ChatRoom> deptChatRoomList = dao.getDeptChatRoomNum();
        deptChatRoomList.forEach(chatRoom -> {
            chatMessage.setChatRoomNum(chatRoom.getChatRoomNum());
            this.rabbitTemplate.convertAndSend(exchangeName, routingKey, chatMessage);
        });
    }

    /**
     * 1. Queue 에서 메세지를 구독
     */
    @Transactional
    @RabbitListener(queues = "${rabbitmq.queue.name}")
    public void receiveMessage(ChatMessage chatMessage) throws Exception {
        log.info("Received Message : {}", chatMessage.toString());
        chatMessage = chatService.createMessage(chatMessage);

        if(chatMessage != null) {
            ChatRoom chatRoom = new ChatRoom();
            chatRoom.setChatRoomNum(Long.valueOf(chatMessage.getChatRoomNum()));
            chatRoom.setChatSessionId(chatMessage.getSenderId());
            List<User> users = chatService.chatRoomParticipateList(chatRoom);

            /**
             * 2024-06-24, SSE 비동기 처리로 채팅방 목록 업데이트
             */
            users.forEach(user ->
                CompletableFuture.runAsync(() ->
                    chatSseService.chatRoomListRefresh(user, "chatRoomListRefresh"))
                .exceptionally(throwable -> {
                    // 개발자 담당자한테 web hook 및 전달할 있게 처리하기.
                    log.error("Exception occurred: " + throwable.getMessage());
                    return null;
                })
            );
        }

        socketHandler.sendEmergencyMessage(chatMessage);
    }

}

4. 시연 영상 [영상]

profile
ziru.log

0개의 댓글