
💡 Stomp + Kafka로 실시간 채팅 구현하기
앞선 포스팅에서는 Stomp와 RabbitMQ를 이용하여서 실시간 채팅을 구현하였다.
RabbitMQ를 이용해서 Stomp만 사용하였을 때 발생하는 내부 메시지 브로커의 문제를 해결할 수 있었다.
사실 1대1 채팅이나 소규모 채팅방 같은 경우에는 RabbitMQ가 빠른 속도로 메시지를 전송 할 수 있기 때문에 적합하다고 생각한다.
하지만 이번 snail에서 진행하는 "외국인 가정 정착 지원 어플리케이션"은 1대1 채팅방 뿐만 아니라, 모임, 모임 일정등 1대 다 채팅방도 많이 있고, 무엇보다 많은 인원이 채팅에 참가할 수 도 있는 대규모 채팅방의 가능성이 있기 때문에 Kafka를 선택하기로 하였다.
그리고, 우리 서비스는 MSA 아키텍쳐를 사용하기 때문에, MSA에 좀 더 적합한 Kafka를 통해 채팅을 구현하여 보고자 한다.
장점
단점:
장점:
단점:
위와 같은 장단점이 존재하기 때문에 우리의 프로젝트 상 특성을 고려해야하였다.
우리 프로젝트의 특성상 채팅이 중요한 요소였기도 하고, 사용자의 채팅을 확실하게 관리할 수 있는게 더 좋다고 생각하였다.
또한, 생각하여 봤을때, Consumer에서 DB를 저장하게 되면, 하나의 DB에 여러개의 Consumer가 동일한 DB를 저장하려고 접근하는 것이 성능적인 측면을 감소시킬 수 있겠다고생각하였다.
따라서, Producer에서 보낼 메시지를 Kafka 브로커로 전송을 한 후, 전송이 제대로 성공하면 DB에 저장하여, 메시지의 유실을 막아주길 결정하였다.
위 방식은 카프카 초기 설정을 참고하여서 그 이후 진행과정을 기록하였다.
// EC2 서버 접속
$ ssh -v -i "kafka-test.pem" ec2-user@{본인 EC2 Public Ip)
// kakfa 설치된 파일로 이동
$ cd kafka_2.12-2.5.0
// 동작중인 프로세스 확인
$ jps -m
// zookeeper 실행
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
// kafka 실행
$ bin/kafka-server-start.sh -daemon config/server.properties
// kakfa 설치된 파일로 이동
$ cd kafka_2.12-2.5.0
// zookeeper 실행
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
// kafka 실행
$ bin/kafka-server-start.sh -daemon config/server.properties


$ bin/kafka-topics.sh --bootstrap-server {본인 EC2 Public Ip}:9092 --create --topic chat-test --partitions 3
// Kafka Producer로 데이터 넣기
$ bin/kafka-console-producer.sh --bootstrap-server {본인 EC2 Public IP}:9092
--topic chat-service
// Kafka Consumer로 넣은 데이터 확인
$ bin/kafka-console-consumer.sh --bootstrap-server {본인 EC2 Public IP}:9092
--topic chat-service --from-beginning

우선적으로 띄워놓은 EC2 서버와 내가 구현한 채팅 서버와 잘 연결이 되고, KafKa를 통한 통신이 잘 이루어지는 지 확인하기 위해서 EC2 서버 1개를 대상으로 적용을 진행하였다. 향후 이번에 연결한 인스턴스를 Leader Broker로 지정하고, 나머지 2개를 Follower Broker로 지정하여서, replication가 잘 이루어져, 통신이 잘 이루어지는 지 확인해볼 생각이다.
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String prod_bootstrap_server;
// Kafka에서 메시지를 생성하는 역할을 하는 Producer 객체 생성
@Bean
public ProducerFactory<String, ChatMessageDto> producerFactory() {
Map<String, Object> config = new HashMap<>();
// 카프카 클러스터 주소 세팅 - 현재 열려있는 카프카 브로커 주소
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, prod_bootstrap_server);
// serializer key & value 세팅
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
// Kafka Producer 객체 템플릿
@Bean
public KafkaTemplate<String, ChatMessageDto> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Configuration
public class KafkaConsumerConfig {
private final KafkaProperties kafkaProperties;
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String cons_bootstrap_server;
@Value("${spring.kafka.consumer.group-id}")
private String cons_group_id;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String cons_auto_offset_reset;
public KafkaConsumerConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
// Kafka 메시지 수신하는 Consumer 객체 생성
@Bean
public ConsumerFactory<String, ChatMessageDto> consumerFactory(){
// Kafka에서 받은 메시지를 서버에 전달할 때 ChatMessageDto 객체로 변환해주는 Json 역질렬화
JsonDeserializer<ChatMessageDto> deserializer = new JsonDeserializer<>();
// 역질렬화 할 수 있는 패키지 지정
deserializer.addTrustedPackages("com.song.chatpractice.kafka.dto");
Map<String,Object> props = new HashMap<>();
// 카프카 클러스터 주소 세팅 - 현재 열려있는 카프카 브로커 주소
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cons_bootstrap_server);
// 카프카 Consumer group Id 세팅 - 모든 서버에 메시지 전송을 위해서는 모두 다른 아이디 설정 필요
props.put(ConsumerConfig.GROUP_ID_CONFIG, cons_group_id);
// 카프카 Consumer offset 세팅 ( offset을 사용할 수 없거나 정보를 찾을 수 없을 떄 option)
// earlist - 가장 처음부터 읽기 , latest - 가장 마지막부터 읽기
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, cons_auto_offset_reset);
// 카프카 deserializer 설정
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
}
// Kafka 메시지를 수신 & 처리하는 리스너 컨테이너
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ChatMessageDto> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ChatMessageDto> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
/* 설명. Kafka Producer의 sendMessage */
@Override
public void sendMessage(String roomId, ChatMessageDto chatMessageDto) {
// Kafka로 메시지 전송 (다른 서버들도 메시지를 받을 수 있도록)
String topicName = appName; // Kafka Topic 설정
log.info("메시지 보낸 토픽 이름 {}: 보낸 메시지 {}", topicName, chatMessageDto);
// ListenableFuture가 deprecated 되었음으로 대체
CompletableFuture<SendResult<String, ChatMessageDto>> future
// Kafka로 메시지 전송 ( 비동기 통신은 try-catch보다 completableFuture 사용)
= kafkaTemplate.send(topicName, roomId, chatMessageDto);
future.whenComplete((result, ex) -> {
if (ex == null){
log.info("메시지가 {}에 잘 전송되었습니다.", topicName);
// 메시지 전송 성공시 DB에 저장
ChatMessage chatMessage = new ChatMessage();
chatMessage.setRoomId(chatMessageDto.getRoomId());
chatMessage.setSender(chatMessageDto.getSender());
chatMessage.setMessage(chatMessageDto.getMessage());
chatMessage.setType(ChatMessage.MessageType.valueOf(chatMessageDto.getType().name()));
chatMessage.setSendDate(LocalDateTime.now());
chatMessageRepository.save(chatMessage);
log.info("{}가 잘 저장되었습니다..", chatMessage);
} else {
log.error("Kafka 토픽에 메시지 전송 실패 {}: {}",topicName, chatMessageDto, ex);
}
});
}
/* 설명. Kafka Consumer의 receiveMessage */
@KafkaListener(topics = "${APP_NAME}", groupId = "${KAFKA_GROUP_ID}")
public void receiveMessage(ChatMessageDto chatMessageDto) {
log.info("Received message in group {}: {}", "groupId", chatMessageDto);
// 모든 WebSocket 서버가 Kafka 메시지를 받아서 WebSocket으로 전달!
simpMessagingTemplate.convertAndSend("/topic/room/" + chatMessageDto.getRoomId(), chatMessageDto);
}
위의 추가적인 코드들을 통해서 다른 서버의 Kafka Broker를 등록하고 해당 경로로 데이터를 보내고 받으면서, 채팅 서버를 구성하였다.
