[Message Queue] 분리된 서버간의 통신

장준혁·2024년 2월 18일

MessageQueue

목록 보기
1/1
post-thumbnail

📨 서버 통신


현재 진행하고 있는 프로젝트의 구조이며 Web 과 Application이 분리되어 있음을 확인 할 수 있다.

초기에 기획했던 프로젝트는 Application만이 존재 했으며 웹 사이트는 구현 예정에 없었다.

포스기를 통한 매장 관리 목적으로 웹 사이트를 추가 하면서 규모가 커지게 되었다.

당시 App 서버는 Nest로 구현이 이루어지고 있었기에 추가 투입되는 개발자는 동일한 기술 스택을 가지고 있어야 했다.

하지만 당시 학생들 사이에서 생소했던 프레임워크라 개발 인력을 모집하기 쉽지 않았고 그로 인해 웹 사이트는 개발 인력이 상대적으로 풍부한 Spring Boot을 이용해 개발하기로 결정했다.

🤔 어떤 부분에서 Server 통신이 이루어 지는가?

프로젝트에서 Server 통신이 필요한 부분의 일부를 나열해 보겠다.

1. 사용자의 메뉴 주문, 매장 점주의 신규 주문 요청 확인
2. 매장 점주의 주문 접수, 사용자에게 주문 접수 처리 통보
3. 매장 점주의 주문 취소, 사용자에게 주문 취소 처리 통보
4. 사용자의 메뉴 수령 수령 완료 및 리뷰 요청 알림 전송
5. 매장을 찜 사용자 전체에게 매장 광고 알림 전송...

서버와 서버 간의 통신이 필요하고 경우에 각 Client에 알림을 화면에 출력해야 하기 때문에 FCM(Firebase Cloud Messaging) 역시 필요로 했다.

🧐 Sever to Server Connection

그렇다면 어떤 방식으로 Server 간의 메시지를 송신하고 수신할 수 있을까?

Socket 통신

  • 클라이언트와 서버가 특정 포트를 통해 양방향 통신을 하는 방식

  • 데이터 전달 후 연결이 끊어지는 것이 아니라 계속해서 연결을 유지 → HTTP에 비해 더 많은 자원 소모

  • 클라이언트와 서버가 실시간으로 계속하여 데이터를 주고받아야 하는 경우에 유리

  • 실시간 동영상 스트리밍이나 온라인 게임 등에 사용

Socket 통신을 사용하기 위해서는 큰 문제가 있었는데 만약 Web, App 중 어떤 서버라도 다운이 된다면 메시지가 휘발될 수 있다는 것이다.

서버 통신을 사용하기 위한 상황에는 주로 주문 관련 상황이 많았는데 메시지 자체가 휘발된다면 후 처리가 쉽지 않아 보였다.

그래서 서버가 다운되더라도 메시지 자체가 휘발되지 않는 다른 방식을 찾아봐야 했다.

물론 메시지가 휘발되지 않도록 추가적인 작업(추가 서버, 백업)을 통해 보완할 수 있지만 너무 번거로웠다.

Mysql Polling

  • 일정 주기마다 서버에 요청(Request)을 보내서 상태를 확인 한다.
  • 요청 주기가 짧을 수록 부하가 커진다.
  • 실시간처럼 보일 수는 있지만 실시간은 아니다.
  • 요청 주기가 길면 빠른 응답을 해야 하는 상황에 대처하기가 힘들다.

데이터베이스에 저장된 데이터를 통해서 변경 감지를 하기 때문에 서버가 다운 되어도 후 처리가 가능할 것이다.

데이터베이스의 변경 점이 생긴다면 Polling을 통해서 후 처리를 해주는 방식을 해줄 수 있을 것이다.

하지만 즉각적인 응답이 불가능 하며 상황이 발생했을 때 즉각적인 반응이 필요한 경우라면 적합하지 않을 수 있다.

데이터베이스의 변경 점이 없어도 매번 감지해야 하기에 서버 부하가 우려된다.

Polling 직후 이벤트가 발생 한다면 다음 Polling 까지 인식이 불가능 할 수 있다.

RabbitMQ

  • 비동기(Asynchronous): Queue에 넣어두기 때문에 나중에 처리할 수 있다.
  • 낮은 결합도(Decoupling): 애플리케이션과 분리할 수 있다.
  • 탄력성(Resilience): 일부가 실패 시 전체에 영향을 받지 않는다.
  • 과잉(Redundancy): 실패할 경우 재실행이 가능하다.
  • 신뢰성(Guarantees): 작업이 처리된 걸 확인할 수 있다.
  • 확장성(Scalable): 다수의 프로세스가 큐에 메시지를 보낼 수 있다.

서버가 다운되어도 메시지를 큐에 저장하고 있기 때문에 소실될 위험을 방지할 수 있다.

메시지를 송신하는 서버에서 Queue에 메시지 송신 시에 Push 메커니즘을 통해서 수신하는 서버에게 알려주기에 수신 서버가 매번 변경 감지 등을 할 필요가 없다.

기능이 커질수록 관리가 복잡해질 수 있다.

🔊 Call Back


Polling 간격 사이에 주문 및 알림 요청이 발생할 경우 다음 Polling 이 실행 되기 전까지 인식 하지 못 할 수 있다.
그렇기에 변경 감지를 위한 많은 Call 이 필요하다.

API 요청 ~ 종료까지 대략 30분 정도의 시간이 걸린다고 가정, Polling 주기를 1분으로 잡는다면 서버에서 Call이 대략 25 ~ 30번 정도 발생 할 것 이다.


반면 RabbitMQ 는 Push 방식을 사용하기 때문에 매번 변경 감지를 할 필요가 없다.
(Message Enqueue 시에 RabbitMQ 에서 수신 서버로 Message Push)

따라서 API 요청 ~ 종료 까지의 시간이 달라도 Call 수는 동일 할 것 이다.

🚀 서버 부하

Polling 은 데이터베이스의 변경점이 없어도 주기적으로 조회 쿼리를 생성하는데 이때 서버의 부하는 없는지에 대해서 의문점이 생겼다.

먼저 @Scheduled 을 적용해서 Order 테이블에 새로 업데이트 된 주문 내역이 있는지 확인

📄 PollingServiceImpl.Class 의 일부

	@Override
    @Scheduled(fixedDelay = 5000)
    public void pollingUpdatedOrder() {

        try {
            // Redis Polling 기록 확인

            RedisOrderPollingTimeDto orderLastPollingTimeDto =
                    redisService.findLastOrderPollingTimeById("orderLastPollingTime");
            //레디스 Order Polling Time Dto 반환


            // 기록이 없다면 Exception 발생


            // 기록 되어있다면..

            Timestamp lastPollingTime = Timestamp.valueOf(orderLastPollingTimeDto.getPollingTime());
            //get Redis Polling Time

            List<Orders> updatedOrders = oOrderRepository.getUpdatedOrdersByLastPollingTime(
                    lastPollingTime
            );
            //새로 변경 및 추가된 주문들

            // 주문 변경 및 로직 생략...

        } catch (OrderPollingTimeIsNotPresentException e) {
            // Redis 에 기록된 Polling Time 이 존재 하지 않는다면..

            List<Orders> updatedOrders = oOrderRepository.getUpdatedOrdersByLastPollingTime(
                    new Timestamp(
                            System.currentTimeMillis()
                    )
            );
            //새로 변경 및 추가된 주문들

        }

        redisService.saveOrderPollingTime(
                RedisOrderPollingTimeDto
                        .builder()
                        .pollingOrderId("orderLastPollingTime")
                        .pollingTime(
                                (new Timestamp(System.currentTimeMillis())).toString())
                        //현재 시간을 문자열로 변경 후 저장
                        .build()
        );
        //Redis Poling Time Update

    }

Last Polling Time 은 Redis에 저장해서 관리 했으며 Redis 는 Timestamp 타입 관리가 번거로워 String 로 변경 후 상황에 맞게 변환 하는 형태로 진행 했다.

해당 코드를 주기적으로 실행 시킨 후 PrometheusGrafana 로 시각화 해서 확인 했다.

SpringBoot 시각화 (Prometheus, Grafana) 와 차이점이 살짝 있지만 거의 동일한 환경에서 테스트 하였다. (DashBoard 만 다르게 적용 해서 테스트)

  • Polling Scheduling Server Status
  • Default State Server

주문 로직을 고려하며 주기를 짧게 설정했으며 Polling 을 적용한 서버의 부하가 증가한 것을 확인 할 수 있었다.

구현

그렇다면 Docker 및 Spring Boot 로 RabbitMQ를 구현 해보겠다.

🐳 Docker 설정

RabbitMQ 3.12.11 버전의 이미지를 Pull 한 후 Container 환경 에서 실행 한다.

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --restart=unless-stopped rabbitmq:3.12.11

Docker 컨테이너 내부에서 RabbitMQ의 관리 플러그인을 활성화

docker exec {컨테이너 ID} rabbitmq-plugins enable rabbitmq_management

RabbitMQ 작동

http://localhost:15672

기본적으로 Username, Password 는 guest 로 설정 되어있다.
Default Username = guest
Default Password = guest

⌨️ Spring Boot 설정

📄 RabbitMQConfig

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
public class RabbitMQConfig {

    @Value("${spring.rabbitmq.host}")
    private String rabbitmqHost;

    @Value("${spring.rabbitmq.port}")
    private int rabbitmqPort;

    @Value("${spring.rabbitmq.username}")
    private String rabbitmqUsername;

    @Value("${spring.rabbitmq.password}")
    private String rabbitmqPassword;

    @Value("${rabbitmq.queue.name}")
    private String queueName;

    @Value("${rabbitmq.exchange.name}")
    private String exchangeName;

    @Value("${rabbitmq.routing.key}")
    private String routingKey;

    /**
     * 지정된 큐 이름으로 Queue 빈을 생성
     *
     * @return Queue 빈 객체
     */
    @Bean
    public Queue queue() {
        return new Queue(queueName);
    }

    /**
     * 지정된 익스체인지 이름으로 DirectExchange 빈을 생성
     *
     * @return TopicExchange 빈 객체
     */
    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(exchangeName);
    }

    /**
     * 주어진 큐와 익스체인지를 바인딩하고 라우팅 키를 사용하여 Binding 빈을 생성
     *
     * @param queue    바인딩할 Queue
     * @param exchange 바인딩할 TopicExchange
     * @return Binding 빈 객체
     */
    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingKey);
    }

    /**
     * RabbitMQ 연결을 위한 ConnectionFactory 빈을 생성하여 반환
     *
     * @return ConnectionFactory 객체
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        // CachingConnectionFactory 객체를 생성합니다.
        // RabbitMQ 서버와의 연결을 캐시하여 재사용 가능하게 만드는 클래스
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();

        connectionFactory.setHost(rabbitmqHost); // RabbitMQ 서버의 호스트명 또는 IP 주소를 설정
        connectionFactory.setPort(rabbitmqPort); // RabbitMQ 서버의 포트 번호를 설정
        connectionFactory.setUsername(rabbitmqUsername); // RabbitMQ 서버에 로그인하는 데 사용할 사용자 이름을 설정
        connectionFactory.setPassword(rabbitmqPassword); // RabbitMQ 서버에 로그인하는 데 사용할 비밀번호를 설정

        // Publisher Confirms 기능을 활성화하고, 그 모드를 SIMPLE 로 설정
        // SIMPLE 모드에서는 각 메시지가 RabbitMQ 서버에 성공적으로 전달된 후에 개별적으로 confirm(확인)을 받음.
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);

        // 설정이 완료된 CachingConnectionFactory 객체를 반환
        return connectionFactory;
    }

    /**
     * RabbitTemplate을 생성하여 반환
     *
     * @param connectionFactory RabbitMQ와의 연결을 위한 ConnectionFactory 객체
     * @return RabbitTemplate 객체
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // JSON 형식의 메시지를 직렬화하고 역직렬할 수 있도록 설정

        // RabbitTemplate에 confirm callback을 설정합니다.
        // Publisher Confirms 기능이 활성화된 상태에서 RabbitMQ 서버가 메시지를 성공적으로 받았는지 여부를 확인할 때 사용됩니다.
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            // 메시지가 성공적으로 RabbitMQ 서버에 전달되었다면 "ACK"를 출력합니다.
            if(ack) {
                System.out.println("ACK");
            } else {
                // 메시지가 RabbitMQ 서버에 전달되지 못했다면 "NACK"과 함께 원인을 출력합니다.
                System.out.println("NACK: " + cause);
            }
        });

        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    /**
     * Jackson 라이브러리를 사용하여 메시지를 JSON 형식으로 변환하는 MessageConverter 빈을 생성
     *
     * @return MessageConverter 객체
     */
    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    //////////////////CONSUMER
    @Bean
    MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
            ConnectionFactory connectionFactory
    ) {
        final SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(messageConverter());
        return factory;
    }



    //설정 파일
    @Bean(name = "sendType")
    public PropertiesFactoryBean propertiesFactoryBean() throws Exception {
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        ClassPathResource classPathResource = new ClassPathResource("properties/server-to-server.properties");

        propertiesFactoryBean.setLocation(classPathResource);

        return propertiesFactoryBean;
    }
}

Rabbit Template은 메시지를 보내는 역할을 하게 된다.
이는 일반적으로 Push 메커니즘에 해당하며 Rabbit Template를 사용하여 메시지를 보낼 때, 메시지는 RabbitMQ로 전송(push) 될 것이다.

SimpleRabbitListenerContainerFactory는 메시지를 받는 역할을 하게 된다.

Rabbit 서버에 메시지가 도착하면 이를 자동으로 받아오는 역할을 하며 마찬가지로 Push 메커니즘에 해당한다.

서버와 서버가 One to One 관계로 구성 되어있기 때문에 Direct Exchange를 선택하게 되었다.

📤 Producer (송신 서버)

📄 OrderController.Class (주문 접수)

    @ApiOperation(
            value = "주문 접수",
            notes = "사장의 신규 주문 요청에 대한 주문을 접수 요청 진행"
    )
    @PostMapping("/order/accept")
    public NonDataSuccessResponseDto orderAccept(
            HttpServletRequest request,
            @ApiParam(value = "주문 접수를 진행 할 주문 엔티티 DB 식별 ID", required = true)
            @RequestParam Long orderId
    ) {
        manageOrderService.orderAccept(
                (Long) request.getAttribute("ownerStoreId"),
                orderId
        );

        return cControllerMapper.noContentResponseSuccessDto("주문 접수 상태로 변경 되었습니다.");
    }

주문 접수 요청
📄 OrderServiceImpl.Class (주문 접수)

	@Override
    @Transactional(rollbackFor = Exception.class)
    public void orderAccept( //신규 주문 을 접수
            Long storeId,
            Long orderId
    ) {

        Orders orders = oOrderRepository.
                getOrderWithDetails(
                        storeId,
                        orderId
                );
        //주문 ID 로 해당하는 주문과 주문 정보들을 조인해서 가져온다
        //매장 토큰에 해당하는 정상적인 주문 ID 값 자격 검증

        orders.setOrderAccept();
        //신규 요청 주문의 상태 를 접수로 변경
        //주문 상세 엔티티에 매장의 주문 접수 시간 설정

        oOrderRepository.save(orders);
        //주문 엔티티 업데이트 , save

        messageEnqueueService.sendOrderToMessageQueue(
                exchangeName, //Queue Exchange
                routingKey, //Queue Routing Key
                orderAcceptCode,    //Order Type Code
                RabbitOrderDto
                        .builder()
                        .orderId(orderId) // 상태를 변경 할 주문 ID
                        .storeId(storeId) // 해당 주문 을 보유하는 매장 ID
                        .build()
        );
    }

RabbitMQ 에서 관리하고 있는 Message Queue 를 지정 하기 위해
ExchangeName, RoutingKey, 전송할 객체 를 지정 해준다.

📄 MessageEnqueueServiceImpl.Class (메시지 Enqueue)


	@Override
    public void sendOrderToMessageQueue(
            String exchangeName,
            String routingKey,
            Integer orderType,
            RabbitOrderDto rabbitOrderDto //데이터 객체
    ) {
        try {
            Boolean sendMessageStatus = rabbitTemplate.invoke(operations -> { //메시지 송신 성공 여부
                rabbitTemplate.convertAndSend( 
                //RabbitMQ 객체로 변환 및 송신
                        exchangeName, //Exchange Name 지정
                        routingKey, //Routing Key 지정
                        RabbitMessageDto //전송할 객체
                                .builder() //Build
                                .type(orderType) // CustomCode
                                .data( //해당 데이터 객체
                                        rabbitOrderDto
                                )
                                .build()
                );
                //Direct Exchange

                // RabbitMQ 서버로부터 Publisher Confirm 을 대기.
                // 만약 모든 메시지가 성공적으로 전송되지 않았다면, 예외를 발생
                rabbitTemplate.waitForConfirmsOrDie(5000);

                return true;
            });
            log.info("send Order To Message Queue : {}", sendMessageStatus);
            // 메시지 전송이 모두 성공했을 때의 로직을 여기에 작성합니다.
        } catch (AmqpException e) {
            // 메시지 전송에 실패했을 때의 로직을 여기에 작성합니다.
            // 예를 들어, 실패 로그를 출력하거나, 실패한 메시지를 다시 보내는 등의 동작을 수행할 수 있습니다.
            log.error("RabbitMQ Order Message send failed : {}", e.getMessage());
        }
    }

코드에서 작성한 RabbitMessageDto 는 전송할 데이터 객체를 뜻하며 상황에 맞게 변경이 가능하다.

📄 RabbitMessageDto.Class

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RabbitMessageDto {
    private Integer type;
    private Object data;
}

📥 Consumer (수신 서버)

📄 MessageDequeueServiceImpl.Class (메시지 Dequeue)

    @Override
    @Transactional(rollbackFor = Exception.class)
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(
                    value = "bootToNode.queue", //전달 받을 Queue
                    durable = "true"
            ),
            exchange = @Exchange(value = "bootToNode.exchange"),
            key = "bootToNode.key")
    )
    //Consumer
    public void receivedBootMessage(
            RabbitMessageDto rabbitMessageDto
    ) {
        log.info("전달받은 Message Data : {}", rabbitMessageDto);

    }

@Rabbit Listener : 방법을 RabbitMQ 메시지 듣는 이로 선언, RabbitMQ로부터 메시지를 수신한다.

bindings = @Queue Binding(...) : 큐, 교환기, 그리고 라우팅 키 사이의 관계를 설정.

value = @Queue(...) : 이 부분은 메시지가 전달될 큐를 정의.
durable = "true"는 큐가 지속적이라는 것을 나타낸다.
즉, 브로커가 재시작될 경우에도 큐가 유지.

exchange = @Exchange(value = "Exchange 명"): 이는 메시지를 받기 위해 사용할 교환기를 정의.

key = "Key 명" : 라우팅 키를 정의, 라우팅 키는 교환기가 큐를 결정하는 데 사용하는 값이다. 라우팅 키를 사용하여 교환기는 메시지를 "bootToNode. queue"라는 큐로 전달 한다.

결과



http://localhost:15672 에 접속 하여 단건의 메시지 트레픽이 발생 한 것을 알 수 있었다.

Queue에 Consumer 가 존재하지 않는다면 Message 가 소비되지 않으므로 Queue 에 계속 남아 있게 된다.

Consumer 가 존재 한다면 즉시 Dequeue 하므로 RabbitMQ 대시보드에서는 확인이 힘들 수 있다.

📗 정리

프로젝트를 진행 하면서 가장 난해 했던 부분이 분리된 서버 였었다.

초기 개발 팀원 들의 기술 스택이 달라서 서버가 분리 될 수 밖에 없었고 서버가 분리 되면서 Web 과 App 도 분리 되었다.

App에서 일반 사용자의 주문 및 결제가 이루어지면 Web의 매장 승인이 이뤄져야 하는데 서버가 분리 되어있기에 데이터를 교환이 꼭 필요했다.

일반적으로 서버가 분리 되어 프로젝트를 진행했던 경험이 없었기에 더 난해한 상황이였다.
흐름도를 구상하면서 겁도 많이 먹었다;;

Polling 을 사용하면 로직 자체가 굉장히 단순하게 구현이 되기 때문에 추후 유지 보수할때의 이점이 있을거라 판단했으며 RabbitMQ 를 도입한다는 것 자체가 추가적인 기술이 포함 되는 것이므로 추후 로직이 추가된다면 개별적으로 관리하기가 복잡하다고 생각했다.

가장 아쉽게 느꼈던 부분은 바로 Polling을 사용하면서 즉각적인 응답을 받을 수 없다는 점이였으며, 실시간처럼 보이게 하려면 Polling 주기를 짧게 가져가야 하는데, 실제 프로젝트에서 이를 적용하면 단순 조회 이상의 많은 쿼리가 생성된다.

Polling 주기를 짧게 설정하면 서버에 부담을 주는 단점이 있고, 반대로 Polling 주기를 길게 가져가면 실시간성이 크게 떨어지는 문제가 발생했다.

결국 RabbitMQ 를 사용하면서 해결 했지만 간헐적으로 이벤트가 발생하는 상황 이라면 Polling 을 적용해봐도 좋을 것 같다.

당시 가지고 있던 기술 개념으로 작성된 글 이므로 정확하지 않을 수 있으며 문제가 발견 된다면 수정 하겠습니다.

profile
wkd86591247@gmail.com

0개의 댓글