$ wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz
$ brew install wget
$ tar xvf kafka_2.13-2.8.0.tgz
(압축풀기)$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh -daemon config/server.properties
$ rm -rf /tmp/kafka-logs /tmp/zookeeper
Kafka 시작
zookeeper > kafka > intelliJ > ApicKafka 종료
Apic > intelliJ > kafka > zookeeper
zookeeper 구동 후 바로 kafka 구동하면 에러 발생함, 조금 시간 차이를 두고 구동하기
기존 Chatting 관련 Controller, Service, Repository는 유지하고,
Producer와 Consumer와 관련된 Config 파일과 Service 파일만 추가한다.
Producer > 보내는 것
Consumer > 받는 것
# Kafka 서버 정보
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.consumer.bootstrap-servers=localhost:9092
# Producer 설정
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
# Consumer 설정
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.auto-offset-reset=earliest
kafkaTemplate.send(TOPIC, messageDetails);
@KafkaListener
어노테이션을 통해 Kafka 서버로부터 메시지 수신@KafkaListener(topics = "topic", groupId = "chat-group")
// 전
Set<ChatMessageDto> messageList = new LinkedHashSet<>();
for (ChatMessage messages : room.getMessage()) {
messageList.add(new ChatMessageDto(messages));
}
this.message = messageList;
// 후
this.message = chatRoom.getMessages().stream()
.map(MessageDetails::new)
.sorted(Comparator.comparing(MessageDetails::getSendTime))
.collect(Collectors.toCollection(LinkedHashSet::new));