참고) https://dkswnkk.tistory.com/705
https://velog.io/@raddaslul/Stomp%EB%A5%BC-%EC%9D%B4%EC%9A%A9%ED%95%98%EC%97%AC-%EC%B1%84%ED%8C%85-%EB%B0%8F-item-%EC%82%AC%EC%9A%A9%ED%95%98%EA%B8%B0#8-redissubscriber
https://velog.io/@ha0kim/%EC%8A%A4%ED%94%84%EB%A7%81-%EC%9D%B8-%EC%95%A1%EC%85%98-8.%EB%B9%84%EB%8F%99%EA%B8%B0-%EB%A9%94%EC%8B%9C%EC%A7%80-%EC%A0%84%EC%86%A1%ED%95%98%EA%B8%B0
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 소비자도 yml로 설정가능
# kafka:
# consumer:
# bootstrap-servers: localhost:9092 # Kafka 클러스에 대한 초기 연결에 사용할 호스트 : 포트 목록
# group-id: consumer_group01 # Group Id
# auto-offset-reset: earliest # offset 이 없거나 더 이상 없는 경우 어떻게 처리할지 전략 결정
## Deserialze 방법은 KafkaConsumerConfig 로 설정
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
public class KafkaConstants {
public static final String KAFKA_AI_TOPIC = "chat-ai-topic";
public static final String KAFKA_TOPIC = "chat-topic";
public static final String GROUP_ID = "chat-group";
public static final String KAFKA_BROKER = "localhost:9092";
public static List<Integer> partitionList;
}
@EnableKafka
@Configuration
@Slf4j
public class KafkaConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ChatMessageDto> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ChatMessageDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
ContainerProperties prop = factory.getContainerProperties();
prop.setConsumerRebalanceListener(rebalanceListener());
return factory;
}
@Bean
public ConsumerFactory<String, ChatMessageDto> consumerFactory() {
final Map<String, Object> config = new HashMap<>();
JsonDeserializer<ChatMessageDto> deserializer = new JsonDeserializer<>(ChatMessageDto.class);
deserializer.addTrustedPackages("*");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
config.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}
@Bean
public ConsumerAwareRebalanceListener rebalanceListener() {
return new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// here partitions
List<Integer> partList = new ArrayList<>();
for (TopicPartition partition : partitions) {
int partition1 = partition.partition();
partList.add(partition1);
log.info("사용중인파티션:{}", partition1);
}
KafkaConstants.partitionList = partList;
}
};
}
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, ChatMessageDto> producerFactory(){
return new DefaultKafkaProducerFactory<>(kafkaProducerConfiguration());
}
@Bean
public Map<String, Object> kafkaProducerConfiguration() {
final Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return config;
}
@Bean
public KafkaTemplate<String, ChatMessageDto> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Service
@RequiredArgsConstructor
public class MessageReceiverService {
private final SimpMessageSendingOperations template;
@KafkaListener(topics = KafkaConstants.KAFKA_TOPIC, groupId = KafkaConstants.GROUP_ID)
public void receiveMessage(ChatMessageDto message) {
try {
// 메세지객체 내부의 채팅방 ID 참조 -> 구독자에게 메세지 발송
template.convertAndSend("/sub/chat/room/" + message.getRoomId(), message);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@KafkaListener(topics = KafkaConstants.KAFKA_AI_TOPIC, groupId = KafkaConstants.GROUP_ID)
public void receiveAIMessage(ChatMessageDto message) {
try {
// 메세지객체 내부의 채팅방 ID 참조 -> 구독자에게 메세지 발송
template.convertAndSend("/sub/chat/ai/room/" + message.getRoomId(), message);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
@Service
@RequiredArgsConstructor
@Slf4j
public class MessageSenderService {
private final KafkaTemplate<String, ChatMessageDto> kafkaTemplate;
private final ChatMessageService chatService;
private final ChatRoomService chatRoomService;
private final ChatRoomUserService chatRoomUserService;
private final ChatClient chatClient;
public void send(ChatMessageDto message) {
message.setCreateDate(LocalDateTime.now());
// Kafka Template 을 사용하여 메세지를 지정된 토픽으로 전송
try {
if (message.getType().equals(MessageType.ENTER) && chatRoomUserService.isUserAlreadyInRoom(message.getRoomId(), message.getLoginId())) {
chatRoomUserService.activateUser(message);
}
if (message.getType().equals(MessageType.TALK)) {
chatService.save(message);
kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC, message);
chatRoomUserService.getUserInactiveAndNotNotified(message);
}
else if (message.getType().equals(MessageType.ENTER) && !chatRoomUserService.isUserAlreadyInRoom(message.getRoomId(), message.getLoginId())) {
message.setMessage(message.getSender() +"님 입장!!");
chatRoomService.addUserInChat(message.getRoomId(), message.getLoginId());
chatService.save(message);
kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC, message);
} else if (message.getType().equals(MessageType.LEAVE)) {
message.setMessage(message.getSender() + "님 퇴장!!");
chatRoomService.deleteUserInChat(message.getRoomId(), message.getLoginId());
chatService.save(message);
kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC, message);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}