Dplanner는 공유 자원 예약을 위한 폐쇄형 커뮤니티입니다.
예약 시스템을 구현 할 때 가장 고려해야하는 점은 중복된 예약이 발생되어서는 안된다는 것입니다. 이를 해결하기 위해 생각해 볼 수 있는 방법은
1. 자바 어플리케이션 코드 단에서 동기화
2. 데이터베이스 락
3. 메시지, 이벤트 큐
4. 캐싱을 통한 중복 체크
가 있습니다.
이번에는 저희 프로젝트에서 메시지 큐를 이용하여 중복예약을 어떻게 처리했는지에 대해 알아보겠습니다.
예약 도메인에는 어떤 자원에 대한 예약인지(Resource), 누가 예약을 했는지(ClubMember), 예약 시간(Period) 등의 정보를 가지고 있습니다.
@Entity
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Reservation extends BaseEntity{
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@ManyToOne(fetch = LAZY)
@JoinColumn(name = "resource_id")
private Resource resource;
@ManyToOne(fetch = LAZY)
@JoinColumn(name = "club_member_id")
private ClubMember clubMember;
@Embedded
private Period period; //start, end time 포함
...
}
예약을 처리하는 비즈니스 로직을 담당하는 클래스입니다.
@Transactional
public ReservationDto.Response createReservation(Long clubMemberId, ReservationDto.Create createDto) {
Long resourceId = createDto.getResourceId();
LocalDateTime startDateTime = createDto.getStartDateTime();
LocalDateTime endDateTime = createDto.getEndDateTime();
if (!isReservable(resourceId, startDateTime, endDateTime)) {
throw new ReservationException(RESERVATION_UNAVAILABLE);
}
... 생략 ...
Reservation reservation = reservationRepository.save(createDto.toEntity(clubMember, resource));
return ReservationDto.Response.of(reservation);
}
isReservable(resourceId, startDateTime, endDateTime) 를 이용해서 , 해당 요청이 유효한 요청인지 검사합니다. 문제가 발생하는 이유는 많은 요청이 한 번에 몰렸을 때, 각 요청마다 각 스레드에서 처리가 되기 때문에, 여러 요청이 동시에 isReservable 이 True가 될 수 있고 각 요청들이 모두 save 되어 중복된 예약이 발생할 수 있습니다.
메시지 큐를 이용해서 중복 예약을 처리하는 과정은 다음과 같습니다.
1. 클라이언트에서 예약 요청을 서버로 보낸다.
2. 서버(Producer)에서는 해당 요청을 메시지로 바꿔 메시지 큐로 전달한다.
3. 각 큐를 consume하고 있는 Consumer가 해당 메시지를 가져와 차례로 예약을 처리한다.
이 때 가장 중요한 점은 예약 요청의 club id를 키 값으로 하여 큐에 바인딩하였다는 점입니다. 이렇게 하면 특정 club에서의 예약 요청이 특정 큐로만 전달되게 됩니다. 따라서 해당 큐를 consume하는 Consumer는 특정 club에서의 예약 요청을 순차적으로 처리할 수 있게 됩니다.
@Configuration
@Profile("production")
public class RabbitMQConfig {
@Value("${spring.rabbitmq.host}")
private String rabbitmqHost;
@Value("${spring.rabbitmq.port}")
private int rabbitmqPort;
@Value("${spring.rabbitmq.username}")
private String rabbitmqUsername;
@Value("${spring.rabbitmq.password}")
private String rabbitmqPassword;
@Value("${rabbitmq.exchange.name}")
private String exchangeName;
@Value("${rabbitmq.queue.name.1}")
private String queueName1;
@Value("${rabbitmq.routing.key.1}")
private String routingKey1;
@Value("${rabbitmq.queue.name.2}")
private String queueName2;
@Value("${rabbitmq.routing.key.2}")
private String routingKey2;
@Value("${rabbitmq.queue.name.3}")
private String queueName3;
@Value("${rabbitmq.routing.key.3}")
private String routingKey3;
/**
* 지정된 큐 이름으로 Queue 빈을 생성
*
* @return Queue 빈 객체
*/
@Bean
public Queue queue1() {
return new Queue(queueName1);
}
@Bean
public Queue queue2() {
return new Queue(queueName2);
}
@Bean
public Queue queue3() {
return new Queue(queueName3);
}
/**
* 지정된 익스체인지 이름으로 DirectExchange 빈을 생성
*
* @return TopicExchange 빈 객체
*/
@Bean
public DirectExchange exchange() {
return new DirectExchange(exchangeName);
}
/**
* 주어진 큐와 익스체인지를 바인딩하고 라우팅 키를 사용하여 Binding 빈을 생성
*
* @param queue 바인딩할 Queue
* @param exchange 바인딩할 TopicExchange
* @return Binding 빈 객체
*/
@Bean
public Binding binding1(Queue queue1, DirectExchange exchange) {
return BindingBuilder.bind(queue1).to(exchange).with(routingKey1);
}
@Bean
public Binding binding2(Queue queue2, DirectExchange exchange) {
return BindingBuilder.bind(queue2).to(exchange).with(routingKey2);
}
@Bean
public Binding binding3(Queue queue3, DirectExchange exchange) {
return BindingBuilder.bind(queue3).to(exchange).with(routingKey3);
}
/**
* RabbitMQ 연결을 위한 ConnectionFactory 빈을 생성하여 반환
*
* @return ConnectionFactory 객체
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(rabbitmqHost);
connectionFactory.setPort(rabbitmqPort);
connectionFactory.setUsername(rabbitmqUsername);
connectionFactory.setPassword(rabbitmqPassword);
return connectionFactory;
}
/**
* RabbitTemplate을 생성하여 반환
*
* @param connectionFactory RabbitMQ와의 연결을 위한 ConnectionFactory 객체
* @return RabbitTemplate 객체
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// JSON 형식의 메시지를 직렬화하고 역직렬할 수 있도록 설정
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
/**
* Jackson 라이브러리를 사용하여 메시지를 JSON 형식으로 변환하는 MessageConverter 빈을 생성
*
* @return MessageConverter 객체
*/
@Bean
public MessageConverter jackson2JsonMessageConverter() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule()); // JavaTimeModule 추가
return new Jackson2JsonMessageConverter(objectMapper);
}
}
추가해야할 부분은 Message가 RabbitMQ에 제대로 전달되지 않았을 때 예외처리가 필요하다.
@Controller
@RequiredArgsConstructor
public class ReservationRabbitMQController {
private final RabbitTemplate rabbitTemplate;
@Value("${rabbitmq.exchange.name}")
private String exchangeName;
@Value("${rabbitmq.routing.key.1}")
private String routingKey1;
@Value("${rabbitmq.routing.key.2}")
private String routingKey2;
@Value("${rabbitmq.routing.key.3}")
private String routingKey3;
@PostMapping(value = "/reservations")
public ResponseEntity createReservation(@AuthenticationPrincipal PrincipalDetails principal,
@RequestBody RabbitMQDto messageDto) {
messageDto.setClubMemberId(principal.getClubMemberId());
if (principal.getClubId() % 3 == 0){
rabbitTemplate.convertAndSend(exchangeName, routingKey1, messageDto);
}else if(principal.getClubId() % 3 == 1){
rabbitTemplate.convertAndSend(exchangeName, routingKey2, messageDto);
}else{
rabbitTemplate.convertAndSend(exchangeName, routingKey3, messageDto);
}
return ResponseEntity.ok("Message sent to RabbitMQ!");
}
}
추가해야할 부분은 createReservation이 제대로 안되었을 때 사용자에게 인앱 메시지나 푸쉬 메시지 알람 로직을 추가할 예정이다.
@Service
@Slf4j
@RequiredArgsConstructor
public class RabbitMQConsumer {
private final ReservationService reservationService;
@RabbitListener(queues = {"${rabbitmq.queue.name.1}"}) // , concurrency = "3") // 여러 스레드로 하려면 동시성 문제 해결 필요
public void consumeJsonMessage(RabbitMQDto messageDto) {
log.info("current Thread : " + Thread.currentThread().getName());
log.info("Received message start");
ReservationDto.Create createDto = ReservationDto.Create.builder()
.resourceId(messageDto.getResourceId())
.startDateTime(messageDto.getStartDateTime())
.endDateTime(messageDto.getEndDateTime())
.title(messageDto.getTitle())
.usage(messageDto.getUsage())
.sharing(messageDto.isSharing())
.build();
try {
reservationService.createReservation(messageDto.getClubMemberId(), createDto);
} catch (Exception e) {
log.error("Can not Create Reservation");
}
log.info("Received message finish");
}
}
@RabbitListener(queues = {"${rabbitmq.queue.name.1}"})// concurrency = "3"
Consumer가 동작할 때 스레드 수를 설정할 수 있습니다. 이 때 Consumer에서 concurrency 설정을 1개 이상으로 설정하면, 마찬가지로 중복 예약이 발생할 수 있습니다. 따라서 스레드 수를 1개로 고정해서 사용해야 합니다. 여러 스레드를 사용하고 싶으면 추가적인 동기화 작업이 필요합니다. 혹은 더 빠르게 처리하고 싶다면 큐를 늘려 Consumer 수를 늘려주는 방법도 있습니다.
(카프카도 마찬가지로 동시성 보장하면 여러 스레드 사용 가능. 카프카는 1개 스레드 사용하는 대신에 여러 worker 스레드가 있음)