Kafka + STOMP

5w31892p·2023년 4월 29일
0

Kafka

목록 보기
2/2

Kafka 설치

  1. apache kafka 설치
  2. $ wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz
    2-1. (wget 명령어를 찾을 수 없는 경우) $ brew install wget
  3. $ tar xvf kafka_2.13-2.8.0.tgz (압축풀기)

Kafka 시작, 종료

  1. Zookeeper 구동
    $ bin/zookeeper-server-start.sh config/zookeeper.properties
  2. Kafka 구동
    $ bin/kafka-server-start.sh -daemon config/server.properties
  3. 종료 > ctrl + c
    producer, consumer, kafka, zookeeper 순으로
  4. 생성한 이벤트를 포함하여 로컬 Kafka 환경의 데이터도 삭제
    $ rm -rf /tmp/kafka-logs /tmp/zookeeper

Kafka 시작
zookeeper > kafka > intelliJ > Apic

Kafka 종료
Apic > intelliJ > kafka > zookeeper

zookeeper 구동 후 바로 kafka 구동하면 에러 발생함, 조금 시간 차이를 두고 구동하기


기존 Chatting 관련 Controller, Service, Repository는 유지하고,
Producer와 Consumer와 관련된 Config 파일과 Service 파일만 추가한다.

Producer > 보내는 것
Consumer > 받는 것

properties 설정

# Kafka 서버 정보
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.consumer.bootstrap-servers=localhost:9092

# Producer 설정
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

# Consumer 설정
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.auto-offset-reset=earliest
  • Kafka 서버 정보
    • 카프카 서버의 IP : 포트번호
  • Producer
    • key,value-deserializer : 반대로 카프카에 데이터를 보낼 때 하는 직렬화 설정
  • Consumer
    • auto-offset-reset : 컨슈머 그룹에서 소비할 offset 정보가 없을 때, 어떻게 offset을 리셋할 것인지에 대한 설정.
      • latest : 가장 최근 메시지로
      • earliest : 가장 오래된 메시지로
      • none : offset 정보가 없으면 Exception 발생시
    • key, value-deserializer : 카프카에서 메세지를 받아올 때, 역직렬화 설정. String이면 StringDeserializer, JSON형식을 받아오면 JsonDeserializer.
      kafkaTemplate의 key-value를 말함.

Service

  • Producer
    • KafkaTemplate의 send를 통해 kafka 서버로 토픽과 메시지 전송
    kafkaTemplate.send(TOPIC, messageDetails);
  • Consumer
    • @KafkaListener 어노테이션을 통해 Kafka 서버로부터 메시지 수신
    @KafkaListener(topics = "topic", groupId = "chat-group")

최종 테스트 결과

  • Connect도 정상적으로 되고, 메시지도 잘 오가는 것 확인

소스코드


람다식 변경

// 전
Set<ChatMessageDto> messageList = new LinkedHashSet<>();
for (ChatMessage messages : room.getMessage()) {
	messageList.add(new ChatMessageDto(messages));
}
this.message = messageList;


// 후
this.message = chatRoom.getMessages().stream()
    .map(MessageDetails::new)
    .sorted(Comparator.comparing(MessageDetails::getSendTime))
    .collect(Collectors.toCollection(LinkedHashSet::new));

0개의 댓글