참고자료
https://ko.wikipedia.org/wiki/%EC%95%84%ED%8C%8C%EC%B9%98_%EC%B9%B4%ED%94%84%EC%B9%B4
https://kafka.apache.org/
➊ 높은 처리량으로 실시간 처리한다.
➋ 임의의 타이밍에서 데이터를 읽는다.
➌ 다양한 제품과 시스템에 쉽게 연동한다.
➍ 메시지를 잃지 않는다.
➊ 메시징 모델과 스케일 아웃형 아키텍처
➋ 디스크로의 데이터 영속화
➌ 이해하기 쉬운 API 제공
➍ 전달 보증
참고 자료
https://www.hanbit.co.kr/media/channel/view.html?cms_code=CMS9400468504
=> Kafka는 분산 데이터 처리 환경에 적합하다. 실시간 로그 처리에 적합하다. 메시징 시스템이며, 빠른 퍼포먼스로 데이터를 처리한다.
메시징 시스템 참고자료
메시지, 메시징에 대한 참고자료
https://velog.io/@pood/%EB%A9%94%EC%8B%9C%EC%A7%95-%EC%8B%9C%EC%8A%A4%ED%85%9C
프로젝트 Git Hub
프로젝트 Git Hub - 프로젝트 구조 / 시스템 흐름
Web Socket/STOMP
, Redis
, Apache Kafka
를 사용했다.자세한 설명이 필요하면 아래 자료 참고
Line
https://engineering.linecorp.com/ko/blog/how-to-use-kafka-in-line-1
RIDI
Kakao Alex
참고자료
https://github.com/wurstmeister/kafka-docker.git
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
restart: unless-stopped
kafka:
build: .
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
DOCKER_API_VERSION: 1.22
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_MESSAGE_MAX_BYTES: 10000000
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
volumes:
- /var/run/docker.sock:/var/run/docker.sock
restart: unless-stopped
docker-compose up -d
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: foo
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer
참고자료
@MessageMapping("/chat/message")
public void message(ChatMessageDto message) {
// 토큰 검사 => 에외 발생 시 Exception
Member requestMember = jwtTokenProvider.getMember(message.getSender());
message.setSender(requestMember.getNickname());
message.setMemberName(requestMember.getUsername());
message.setProfilePath(requestMember.getProfilePath());
message.setMemberId(requestMember.getId());
message.setSendDate(LocalDateTime.now());
if (ChatMessageDto.MessageType.ENTER.equals(message.getType())) {
chatRoomRedisRepository.enterChatRoom(message.getRoomId());
}
// kafka topic 발행
kafkaProducer.sendMessage(message);
// 입장이 아닐때만 저장
if (!ChatMessageDto.MessageType.ENTER.equals(message.getType())) {
chatRoomRedisRepository.setChatMessage(message, message.getRoomId());
}
// redisPublisher.publish(chatRoomRepository.getTopic(message.getRoomId()), message);
}
아래 Kafka Producer 코드 참고
@Service
public class KafkaProducer {
private static final String TOPIC = "exam";
private final KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(Object message){
this.kafkaTemplate.send(TOPIC, message);
}
}
@KafkaListener(topics="exam", groupId = "foo")
public void kafkaListener(String message) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
ChatMessageDto chatMessageDto = objectMapper.readValue(message, ChatMessageDto.class);
System.out.println("publish : " + chatMessageDto.toString());
redisTemplate.convertAndSend(((ChannelTopic) chatRoomRedisRepository.getTopic(chatMessageDto.getRoomId())).getTopic(), chatMessageDto);
}
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String topic = (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());
String publishMessage = (String) redisTemplate.getStringSerializer().deserialize(message.getBody());
if(topic.equals("comments")){
System.out.println("comments");
CommentResponse commentResponse = objectMapper.readValue(publishMessage, CommentResponse.class);
messagingTemplate.convertAndSend("/sub/title-hakwon/comments/", commentResponse);
}else if (topic.equals("likes")){
LikeResponse likeResponse = objectMapper.readValue(publishMessage, LikeResponse.class);
messagingTemplate.convertAndSend("/sub/title-hakwon/comments/likes", likeResponse);
}else{
ChatMessageDto roomMessage = objectMapper.readValue(publishMessage, ChatMessageDto.class);
messagingTemplate.convertAndSend("/sub/chat/room/" + roomMessage.getRoomId(), roomMessage);
}
} catch (Exception e) {
log.error(e.getMessage());
}
}
순서 요약 :
메세지 수신 시, 메세지 처리 후 Kafka Producer를 통해 Kafka Server로 전달.
이후 Consumer를 통해 메세지 처리. Consumer는 Redis Publish 수헹.
Redis Subscribe에서 해당 메세지를 받은 후, STOMP Publish 수행.
STOMP Subscribe는 Client이므로 메세지가 Client에 정상적으로 전달.