이번 포스팅에서는 MSA 전환 후 채팅 마이크로 서비스 개발 하면서 겪었던 트러블 슈팅 두가지에 대해 정리 해볼려고 합니다.
👉 참고 코드 : https://github.com/LminWoo99/PlantBackend/tree/msa-master
채팅 마이크로 서비스를 개발하면서 SSE를 활용한 알림 기능을 구현했었습니다. 보통 SSE 통신을 하는 동안은 HTTP Connection
이 계속 열려있습니다. 만약 SSE 연결 응답 API에서 JPA
를 사용하고 open-in-view 속성이 true로 되어있다면, HTTP Connection이 열려있는 동안 DB Connection
도 같이 열려있게 됩니다.
즉 DB Connection Pool
에서 최대 10개의 Connection을 사용할 수 있다면, 10명의 클라이언트가 SSE 연결 요청을 하는 순간 DB 커넥션도 고갈되게 됩니다. 따라서 이 경우 open-in-view
설정을 반드시 false
로 설정해야 합니다.
단순히 이런 문제점만 생각하고 false
처리를 해버리면 DB Connection이 해제되지 않는 문제는 해결될 것입니다. 하지만 그럴 경우에는 lazy loading
을 사용할 때 문제가 발생할 수 있습니다. EntityManger 가 transaction의 commit
과 함께 닫히기 때문에, 트랜잭션 외부에서 lazy loading
을 사용하면 no Session 이라는 예외가 발생하게 됩니다. 이러한 문제점까지 고려해서 서비스의 요구사항에 맞게 문제점을 해결해야 합니다!
프로젝트를 진행하면서, MSA의 장점을 살리기 위해 서비스마다 단일 DB를 사용하고 있습니다. 현재 개발 완료된 서비스는 기존 모놀리식 서비스였던 중고 식물 거래 서비스
와 채팅 서비스
가 있습니다.
그래서 중고 거래 게시글에 여러개의 채팅방 채팅 내역을 가지게 됩니다. 하지만 게시글과 채팅방 및 채팅 내역들은 서로 다른 DB에 저장되어 관리하고 있습니다.
여기서 문제되는 점이 중고 식물 거래 게시글이 삭제될 경우 해당 게시글의 채팅방, 채팅 내역 데이터들도 연쇄적으로 삭제되게 구현하고 싶었습니다.
즉, 서로 다른 데이터베이스의 동기화가 필요한 상황에서 두 가지 해결 방식을 고민했습니다.
서비스 간의 결합도를 낮추며, 각 서비스가 독립적으로 확장될 수 있도록하는 kafka를 통한 동기화가 적합하다고 판단하여 적용했습니다
public void deletePost(TradeBoardDto tradeBoardDto) {
tradeBoardRepository.delete(tradeBoardDto.toEntity());
/*send this deletePost to the kafka*/
kafkaProducer.send("deletePost", tradeBoardDto);
}
===================================================
@Service
@Slf4j
public class KafkaProducer {
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* 중고 거래 게시글 삭제시 카프카를 통해 plant-chat-service 마이크로서비스에 전달
* @param : String topic, TradeBoardDto tradeBoardDto
*/
public TradeBoardDto send(String topic, TradeBoardDto tradeBoardDto) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try{
jsonInString = mapper.writeValueAsString(tradeBoardDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer send data from the Plant microservice " + tradeBoardDto);
return tradeBoardDto;
}
}
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
private final ChatService chatService;
/**
* plant-service 게시글에 속한 채팅 데이터 kafka를 통해 삭제 메서드
*
* @param : MemberDto memberDto, ChatRequestDto requestDto
*/
@KafkaListener(topics = "deletePost")
public void deleteChat(String kafkaMessage) {
log.info("Kafka Message : ->" + kafkaMessage);
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {
});
//TradeBoard id값 가져오기
Integer tradeBoardNo = (Integer) map.get("id");
if (tradeBoardNo == null) {
throw new IllegalArgumentException("TradeBoard ID is missing in the Kafka message");
}
chatService.deleteChatRoom(tradeBoardNo);
} catch (JsonProcessingException e) {
log.error("Error parsing Kafka message: {}", kafkaMessage, e);
} catch (IllegalArgumentException e) {
log.error("Validation error for Kafka message: {}", kafkaMessage, e);
} catch (Exception e) {
log.error("Error processing Kafka message: {}", kafkaMessage, e);
}
}
}
===================================================
@Transactional
public void deleteChatRoom(Integer tradeBoardNo) {
List<Integer> chatRoomNoList = chatRepository.deleteChatRoomAndReturnChatNo(tradeBoardNo);
deleteChatting(chatRoomNoList);
}
@Transactional
public void deleteChatting(List<Integer> chatRoomNoList) {
// Set 자료구조를 활용하여 중복된 chatNo 제거
Set<Integer> uniqueChatNoSet = new HashSet<>(chatRoomNoList);
// 중복 제거 후의 chatNo에 해당하는 채팅 데이터 삭제
mongoTemplate.remove(query(where("chatRoomNo").in(uniqueChatNoSet)), Chatting.class);
}
위와 같은 방식으로 Kafka를 통해 MSA간 단일 데이터베이스의 동기화 문제를 해결했습니다.