예약 시스템 - 중복 예약 처리1 Rabbit MQ 적용하기

Chan Young Jeong·2024년 1월 30일
0

프로젝트 Dplanner

목록 보기
1/5

Dplanner는 공유 자원 예약을 위한 폐쇄형 커뮤니티입니다.

예약 시스템을 구현 할 때 가장 고려해야하는 점은 중복된 예약이 발생되어서는 안된다는 것입니다. 이를 해결하기 위해 생각해 볼 수 있는 방법은

1. 자바 어플리케이션 코드 단에서 동기화
2. 데이터베이스 락
3. 메시지, 이벤트 큐
4. 캐싱을 통한 중복 체크

가 있습니다.

이번에는 저희 프로젝트에서 메시지 큐를 이용하여 중복예약을 어떻게 처리했는지에 대해 알아보겠습니다.


Reservation.class

예약 도메인에는 어떤 자원에 대한 예약인지(Resource), 누가 예약을 했는지(ClubMember), 예약 시간(Period) 등의 정보를 가지고 있습니다.

@Entity
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Reservation extends BaseEntity{

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @ManyToOne(fetch = LAZY)
    @JoinColumn(name = "resource_id")
    private Resource resource;

    @ManyToOne(fetch = LAZY)
    @JoinColumn(name = "club_member_id")
    private ClubMember clubMember;

    @Embedded
    private Period period; //start, end time 포함

    ...
}

ReservationService.class

예약을 처리하는 비즈니스 로직을 담당하는 클래스입니다.

 @Transactional
    public ReservationDto.Response createReservation(Long clubMemberId, ReservationDto.Create createDto) {

        Long resourceId = createDto.getResourceId();
        LocalDateTime startDateTime = createDto.getStartDateTime();
        LocalDateTime endDateTime = createDto.getEndDateTime();

        if (!isReservable(resourceId, startDateTime, endDateTime)) {
            throw new ReservationException(RESERVATION_UNAVAILABLE);
        }

								... 생략 ... 

        Reservation reservation = reservationRepository.save(createDto.toEntity(clubMember, resource));


        return ReservationDto.Response.of(reservation);
    }


isReservable(resourceId, startDateTime, endDateTime)
를 이용해서 , 해당 요청이 유효한 요청인지 검사합니다. 문제가 발생하는 이유는 많은 요청이 한 번에 몰렸을 때, 각 요청마다 각 스레드에서 처리가 되기 때문에, 여러 요청이 동시에 isReservable 이 True가 될 수 있고 각 요청들이 모두 save 되어 중복된 예약이 발생할 수 있습니다.

메시지 큐 도입

메시지 큐를 이용해서 중복 예약을 처리하는 과정은 다음과 같습니다.

1. 클라이언트에서 예약 요청을 서버로 보낸다.
2. 서버(Producer)에서는 해당 요청을 메시지로 바꿔 메시지 큐로 전달한다.
3. 각 큐를 consume하고 있는 Consumer가 해당 메시지를 가져와 차례로 예약을 처리한다.

이 때 가장 중요한 점은 예약 요청의 club id를 키 값으로 하여 큐에 바인딩하였다는 점입니다. 이렇게 하면 특정 club에서의 예약 요청이 특정 큐로만 전달되게 됩니다. 따라서 해당 큐를 consume하는 Consumer는 특정 club에서의 예약 요청을 순차적으로 처리할 수 있게 됩니다.

RabbitMQConfig

@Configuration
@Profile("production")
public class RabbitMQConfig {

    @Value("${spring.rabbitmq.host}")
    private String rabbitmqHost;

    @Value("${spring.rabbitmq.port}")
    private int rabbitmqPort;

    @Value("${spring.rabbitmq.username}")
    private String rabbitmqUsername;

    @Value("${spring.rabbitmq.password}")
    private String rabbitmqPassword;

    @Value("${rabbitmq.exchange.name}")
    private String exchangeName;

    @Value("${rabbitmq.queue.name.1}")
    private String queueName1;

    @Value("${rabbitmq.routing.key.1}")
    private String routingKey1;

    @Value("${rabbitmq.queue.name.2}")
    private String queueName2;

    @Value("${rabbitmq.routing.key.2}")
    private String routingKey2;

    @Value("${rabbitmq.queue.name.3}")
    private String queueName3;

    @Value("${rabbitmq.routing.key.3}")
    private String routingKey3;

    /**
     * 지정된 큐 이름으로 Queue 빈을 생성
     *
     * @return Queue 빈 객체
     */
    @Bean
    public Queue queue1() {
        return new Queue(queueName1);
    }

    @Bean
    public Queue queue2() {
        return new Queue(queueName2);
    }


    @Bean
    public Queue queue3() {
        return new Queue(queueName3);
    }


    /**
     * 지정된 익스체인지 이름으로 DirectExchange 빈을 생성
     *
     * @return TopicExchange 빈 객체
     */
    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(exchangeName);
    }

    /**
     * 주어진 큐와 익스체인지를 바인딩하고 라우팅 키를 사용하여 Binding 빈을 생성
     *
     * @param queue   바인딩할 Queue
     * @param exchange 바인딩할 TopicExchange
     * @return Binding 빈 객체
     */
    @Bean
    public Binding binding1(Queue queue1, DirectExchange exchange) {
        return BindingBuilder.bind(queue1).to(exchange).with(routingKey1);
    }

    @Bean
    public Binding binding2(Queue queue2, DirectExchange exchange) {
        return BindingBuilder.bind(queue2).to(exchange).with(routingKey2);
    }

    @Bean
    public Binding binding3(Queue queue3, DirectExchange exchange) {
        return BindingBuilder.bind(queue3).to(exchange).with(routingKey3);
    }

    /**
     * RabbitMQ 연결을 위한 ConnectionFactory 빈을 생성하여 반환
     *
     * @return ConnectionFactory 객체
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(rabbitmqHost);
        connectionFactory.setPort(rabbitmqPort);
        connectionFactory.setUsername(rabbitmqUsername);
        connectionFactory.setPassword(rabbitmqPassword);
        return connectionFactory;
    }

    /**
     * RabbitTemplate을 생성하여 반환
     *
     * @param connectionFactory RabbitMQ와의 연결을 위한 ConnectionFactory 객체
     * @return RabbitTemplate 객체
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // JSON 형식의 메시지를 직렬화하고 역직렬할 수 있도록 설정
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    /**
     * Jackson 라이브러리를 사용하여 메시지를 JSON 형식으로 변환하는 MessageConverter 빈을 생성
     *
     * @return MessageConverter 객체
     */
    @Bean
    public MessageConverter jackson2JsonMessageConverter() {

        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.registerModule(new JavaTimeModule()); // JavaTimeModule 추가
        return new Jackson2JsonMessageConverter(objectMapper);
    }

}

RabbitMQController

추가해야할 부분은 Message가 RabbitMQ에 제대로 전달되지 않았을 때 예외처리가 필요하다.

@Controller
@RequiredArgsConstructor
public class ReservationRabbitMQController {

    private final RabbitTemplate rabbitTemplate;

    @Value("${rabbitmq.exchange.name}")
    private String exchangeName;

    @Value("${rabbitmq.routing.key.1}")
    private String routingKey1;

    @Value("${rabbitmq.routing.key.2}")
    private String routingKey2;

    @Value("${rabbitmq.routing.key.3}")
    private String routingKey3;


    @PostMapping(value = "/reservations")
    public ResponseEntity createReservation(@AuthenticationPrincipal PrincipalDetails principal,
                                            @RequestBody RabbitMQDto messageDto) {

        messageDto.setClubMemberId(principal.getClubMemberId());

        if (principal.getClubId() % 3 == 0){
            rabbitTemplate.convertAndSend(exchangeName, routingKey1, messageDto);
        }else if(principal.getClubId() % 3 == 1){
            rabbitTemplate.convertAndSend(exchangeName, routingKey2, messageDto);
        }else{
            rabbitTemplate.convertAndSend(exchangeName, routingKey3, messageDto);
        }

        return ResponseEntity.ok("Message sent to RabbitMQ!");
    }

}

RabbitMQConsumer

추가해야할 부분은 createReservation이 제대로 안되었을 때 사용자에게 인앱 메시지나 푸쉬 메시지 알람 로직을 추가할 예정이다.

@Service
@Slf4j
@RequiredArgsConstructor
public class RabbitMQConsumer {
    private final ReservationService reservationService;

    @RabbitListener(queues = {"${rabbitmq.queue.name.1}"}) // , concurrency = "3")  // 여러 스레드로 하려면 동시성 문제  해결 필요
    public void consumeJsonMessage(RabbitMQDto messageDto) {
        log.info("current Thread : " + Thread.currentThread().getName());
        log.info("Received  message start");

        ReservationDto.Create createDto = ReservationDto.Create.builder()
                .resourceId(messageDto.getResourceId())
                .startDateTime(messageDto.getStartDateTime())
                .endDateTime(messageDto.getEndDateTime())
                .title(messageDto.getTitle())
                .usage(messageDto.getUsage())
                .sharing(messageDto.isSharing())
                .build();

        try {
            reservationService.createReservation(messageDto.getClubMemberId(), createDto);
        } catch (Exception e) {
            log.error("Can not Create Reservation");
        }

        log.info("Received message finish");

    }
}

주의할점

@RabbitListener(queues = {"${rabbitmq.queue.name.1}"})// concurrency = "3"

Consumer가 동작할 때 스레드 수를 설정할 수 있습니다. 이 때 Consumer에서 concurrency 설정을 1개 이상으로 설정하면, 마찬가지로 중복 예약이 발생할 수 있습니다. 따라서 스레드 수를 1개로 고정해서 사용해야 합니다. 여러 스레드를 사용하고 싶으면 추가적인 동기화 작업이 필요합니다. 혹은 더 빠르게 처리하고 싶다면 큐를 늘려 Consumer 수를 늘려주는 방법도 있습니다.

(카프카도 마찬가지로 동시성 보장하면 여러 스레드 사용 가능. 카프카는 1개 스레드 사용하는 대신에 여러 worker 스레드가 있음)

0개의 댓글