이제 드디어 메시지큐를 이용한 작업을 시작할 예정이다.
먼저 RabbitMQ를 이용한 작업을 할 차례였고,
RabbitMQ를 통한 메시지 처리도 API Gateway 통해야 하는 줄 알았다.
그리고 각 서비스마다 독립적인 DB를 가지기 때문에 테이블간의 연관관계 설정을 위해서,
예를 들면 Product가 User을 FK로 가지기 위해서 연관 관계를 설정하는 타이밍을 위해
User 서비스에서 User정보를 넘겨주려는 API 역시 생성하려 하였다.
이 두가지 논점을 먼저 해결하고 이번 포스팅의 목적인 메시지큐 구축 작업을 작성하도록 하겠다.
먼저 User 정보를 위한 API 설계는 진행하지 않았다.
이유 역시 두가지 정도가 있는데
기존에 생성한 CustomHeader인 X-User-Id에 User를 식별할 정보가 담겨져있다.
메시지 큐를 이용한 데이터 교환은 비동기 작업이다.
이 두 가지 내용이다.
X-User-Id에는 UserId로 식별할 수 있는 email이 담겨져있고,
해당 헤더는 이미 존재하는지 확인하는 작업후 생성되는 헤더이기 때문에 일반적인 상황에서는 추가적으로 확인하는 작업이 필요하지 않다.
따라서 Product를 생성하는 동기 처리까지 오기 전에 이미 검증을 끝내었기 때문에 API를 생성할 이유가 없는 것이다.
다만 User 서비스에 실제 해당 ID(email)가 존재하는지 100% 보장할 수는 없기 때문에 문제가 발생할 수 있으나 해당 문제는 밑에서 다시 다루도록 하겠다.
위의 이유가 사실 따지고보자면 1개의 이유이긴 하다.
일단 내용을 보자
Product 서비스에서 Product를 생성하는 과정 : 동기 작업
Product 서비스에서 User 서비스 API를 호출하여 정보 교환 : 동기 작업
그렇지만 소제목으로 적어뒀듯이 메시지큐는 비동기 작업이다.
이유는 User의 삭제, 변경등의 이유이다.
기존 RDB의 Cascade 역할을 대체하기 위함으로도 볼 수 있는 과정으로
서로 독립된 DB를 사용하는 만큼 데이터 동기화를 통한 일관성(Consistency) 원칙을 지키기 위함이다.
정확하게는 최종적 일관성(Eventual Consistency) 을 보장하기 위함이다.
(나중에 SAGA 패턴에 대해 공부하고 글을 적어보자...)
위에서 했던 얘기를 조금 더 이어나가보자면
(잘못된 정보 주의)
사실 확인하는게 데이터 자체의 안정성 면에서는 더 좋다고 생각된다.
해당 부분을 해결하려면 Auth - User 관계처럼 로그인할 때 한 번의 HTTP 동기 처리가 이루어지는 것 처럼 User 정보를 받아오는것으로 해결할 수 있다.
단 처리해야 하는 시점을 명확히 해야 한다.
내가 생각하기에 USER를 확인하는 시점은 Product가 생성될 때 1번만 User 정보를 받아오면 된다.
해당 과정은 비동기 처리로 하기에는 적절하지 않은 동기 작업이기에 비동기 처리에 엮을 필요가 없다는 생각이다.
이쯤으로 User 정보 API에 대한 얘기를 마치고 본격적으로 RabbitMQ 작업을 진행하겠다.
처음에 했던 얘기를 다시 가져오자면 RabbitMQ 메시지 처리를 API Gateway를 통해야 하는 줄 알았지만 둘의 역할은 다르다.
- API Gateway는 주로 HTTP 요청을 라우팅하고 인증, 로깅 등의 기능을 제공하는 반면,
- RabbitMQ는 비동기 메시지 큐로, 서비스 간의 직접적인 통신을 줄이고 비동기 이벤트 처리를 가능하게 한다.
따라서 각 서비스가 RabbitMQ를 직접 퍼블리싱(Publish)하고 서브스크라이브(Subscribe)하는 구조로 설계하는 게 일반적이다.
기본적으로 User 서비스에서 User 삭제 이벤트를 발생시키면, Product 서비스가 이를 수신하여 관련 데이터를 정리하는 구조다.
RabbitMQ를 활용하는 이유는 서비스 간의 결합도를 낮추고, 데이터 동기화 문제를 해결하기 위함이다.
먼저 RabbitMQConfig를 통신하려는 서비스 (현재는 User - Product) 양쪽에 작성한다.
@Configuration
public class RabbitMQConfig {
// Exchange 설정
public static final String EXCHANGE = "user.exchange";
// Queue 설정
public static final String USER_UPDATED_QUEUE = "user.updated.queue";
// RoutingKey 설정
public static final String USER_UPDATED_ROUTING_KEY = "user.updated";
// Bean 등록
// exchange 설정
@Bean
public DirectExchange userExchange() {
return new DirectExchange(EXCHANGE);
}
@Bean
public Queue userDeletedQueue() {
return new Queue(USER_DELETED_QUEUE);
}
// binding 설정
@Bean
public Binding userDeletedBinding(Queue userDeletedQueue, DirectExchange exchange) {
return BindingBuilder.bind(userDeletedQueue)
.to(exchange)
.with(USER_DELETED_ROUTING_KEY);
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
}
일단 간략하게 User삭제를 위한 Exchange, Queue, RoutingKey 를 선언해줬다.
일반적으로 아래와 같이 사용되며
Exchange(교환기): 메시지를 전달하는 중심 허브 역할.
Queue(큐): 메시지를 저장하고, 구독한 서비스가 가져가도록 함.
Routing Key(라우팅 키): 특정 큐에 메시지를 전달하기 위한 식별자.
RabbitTemplate: 메시지를 변환하고, RabbitMQ에 전송하는 역할.
해당 서비스에서는 다음과 같은 역할을 가진다.
DirectExchange: 특정 라우팅 키를 가진 메시지를 해당 큐로 보냄.
Queue: 삭제 이벤트를 저장하는 큐.
Binding: RoutingKey를 기준으로 큐를 Exchange에 연결.
RabbitTemplate: JSON 메시지를 변환하여 RabbitMQ로 전송하는 역할.
다음으로는 User와 관련된 Event가 발생되었을 때 이벤트를 발급할 수 있는 코드를 작성하도록 하겠다. (User Service 내부)
@Service
public class UserEventPublisher {
private final RabbitTemplate rabbitTemplate;
@Autowired
public UserEventPublisher(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void publishUserDeleted(String userId) { // 삭제 이벤트 추가
rabbitTemplate.convertAndSend(
RabbitMQConfig.EXCHANGE,
RabbitMQConfig.USER_DELETED_ROUTING_KEY,
userId
);
}
}
publishUserDeleted(userId): 삭제된 유저 ID를 RabbitMQ에 발행.
Exchange(user.exchange) → Queue(user.deleted.queue)로 메시지가 전달됨.
해당 이벤트를 비지니스로직에 작성하므로써
요청이 들어올 경우 이벤트를 발급한다.
@Transactional
public void deleteUser(String email) {
User user = userRepository.findByEmail(email)
.orElseThrow(() -> new CustomException(ErrorCode.USER_NOT_FOUND));
userRepository.delete(user);
eventPublisher.publishUserDeleted(email);
}
이제 발급받은 event를 수신할 UserEventListener를 Product 서비스 내에 작성한다.
@Component
@RequiredArgsConstructor
public class UserEventListener {
private final ProductRepository productRepository;
@RabbitListener(queues = RabbitMQConfig.USER_DELETED_QUEUE)
public void handleUserDeletedEvent(String userId) {
List<Product> products = productRepository.findByOwner(userId);
if (!products.isEmpty()) {
productRepository.deleteAll(products);
}
}
}