Apache Kafka 기술 공유

강혜성·2023년 4월 26일
0
post-thumbnail

Apache Kafka

  • Spring Boot Project에서 활용되는 Apache Kafka
  • APache Kafka에 대한 개요를 설명한다.
  • 이후 Messaging System에 대해서 간략하게 짚고,
  • 프로젝트에서 어떻게 활용했는지 설명한다.
  • 해당 이해들을 바탕으로 Apache Kafka에 대해서 자세하게 설명한다.

Apache Kafka란?

  • 아파치 카프카(Apache Kafka)는 아파치 소프트웨어 재단이 스칼라로 개발한 오픈 소스 메시지 브로커 프로젝트이다.
  • 오픈 소스 분산 이벤트 스트리밍 플랫폼

참고자료

https://ko.wikipedia.org/wiki/%EC%95%84%ED%8C%8C%EC%B9%98_%EC%B9%B4%ED%94%84%EC%B9%B4
https://kafka.apache.org/

Kafka 탄생

  • 카프카는 2011년 미국 링크드인에서 출발
  • 카프카는 링크드인 웹사이트에서 생성되는 로그를 처리하여 웹사이트 활동을 추적하는 것을 목적으로 개발
  • 웹에서 생성되는 대량의 로그를 분석하여 사용자가 웹에서 하는 활동을 모니터링하고 서비스 개선에 활용하는 목적
  • 링크드인의 실현 목적 ( 요구 사항)
➊ 높은 처리량으로 실시간 처리한다.
➋ 임의의 타이밍에서 데이터를 읽는다.
➌ 다양한 제품과 시스템에 쉽게 연동한다.
➍ 메시지를 잃지 않는다.
  • 실현 수단
➊ 메시징 모델과 스케일 아웃형 아키텍처
➋ 디스크로의 데이터 영속화
➌ 이해하기 쉬운 API 제공
➍ 전달 보증

참고 자료

https://www.hanbit.co.kr/media/channel/view.html?cms_code=CMS9400468504

  • 요약 : 확장 가능하고 안정성이 있으며 호환성이 좋은 메시징 모델이 없어서 Kafka를 개발했다.

Apache Kafka 이해하기

  • Apache Kafka는 분산 스트리밍 플랫폼이다.
  • 데이터 파이프 라인을 만들 때 주로 사용되는 오픈소스 솔루션이다.
  • 대용량의 실시간 로그처리에 특화되어 있는 솔루션이다.
  • 데이터를 유실없이 안전하게 전달하는 것이 주목적인 메세지 시스템에서 Fault-Tolerant한 안정적인 아키텍처와 빠른 퍼포먼스로 데이터를 처리한다.

=> Kafka는 분산 데이터 처리 환경에 적합하다. 실시간 로그 처리에 적합하다. 메시징 시스템이며, 빠른 퍼포먼스로 데이터를 처리한다.

메시징 시스템, Messaging System 이란?

  • 메시징 시스템으로는 Kafka, RabbitMQ, Active MQ, AWS SQS, Java JMS 등이 있다.
  • 메시지를 문자, 이메일이라 생각하면 안됨.
  • 여기서 메시지는 로그 데이터, 데이터, 이벤트 메시지 등 API로 호출할 때 보내는 데이터드르을 처리하는 시스템.

메시징 시스템 참고자료

https://victorydntmd.tistory.com/343

메시지, 메시징에 대한 참고자료

https://smallake.kr/?p=2139

  • 메시징 시스템은 서로 다른 프로그램 끼리 정보를 교환하기 위해 통합채널로 활용한다.

https://velog.io/@pood/%EB%A9%94%EC%8B%9C%EC%A7%95-%EC%8B%9C%EC%8A%A4%ED%85%9C

Zzalu Project에서 Kafka가 사용된 방식

  • 실제 진행한 Zzalu Project에서 Kafka가 어떤 방식으로 사용되었는지 설명한다.

프로젝트 Git Hub

https://github.com/Zzalu/Zzalu

프로젝트 Git Hub - 프로젝트 구조 / 시스템 흐름

https://github.com/Zzalu/Zzalu/tree/main/exec

프로젝트 Apache Kafka 사용 이유

  • 우선 해당 프로젝트에서는 채팅 시스템을 개발하기 위해서 Apache Kafka를 사용했다.
  • 채팅 시스템을 구현하기 위해서, Web Socket/STOMP, Redis, Apache Kafka를 사용했다.
  • 채팅 시스템에서 Kafka를 사용하는 이유는 안정성과 확장성이다.
  1. 채팅 메세지 순서 보장
  2. 모든 내역을 로그로 기록
  3. 서버에 장애가 생길 경우 Kafka가 해당 메세지를 다른 서버로 보내거나, 가지고 있다가 복구되는데로 처리한다.

자세한 설명이 필요하면 아래 자료 참고

https://velog.io/@comet_strike/Kafka-Redis-Web-Socket-Stomp-%EB%A5%BC-%ED%99%9C%EC%9A%A9%ED%95%9C-%EC%B1%84%ED%8C%85-%EC%84%9C%EB%B2%84-%ED%9A%8C%EA%B3%A0

프로젝트에서 실제 처리 과정

  • 사용자가 특정 Event를 발생시킬 경우 처리 과정을 설명한다.
  • 해당 프로젝트에서는 채팅 메세지를 보낼경우 Kafka를 이용해 처리하므로 채팅 메세지를 보냈을 경우에 처리 방식을 설명한다.
  1. 사용자가 채팅방에 입장할 때, Server와 Client는 Web Socket으로 연결되어 있다.
  2. 사용자가 채팅 메세지를 보낼 경우, Front에서 특정 URL로 메세지를 보낸다.
  3. Spring Boot Server에서 해당 메세지를 수신한다.
  4. 수신한 메세지를 Kafka Producer를 이용해서 특정 Topic으로 발행한다.
  5. Kafka Server는 이를 수신하고 Kafka Consumer에 전달한다.
    (Kafka Consumer는 Spring Boot Server에서 @KafaListner 어노테이션을 이용해서 구현한다.)
  6. Kafka Consumer는 받은 메세지를 Redis로 발행한다.
  7. Redis SUB에서는 STOMP를 통해 채팅 메세지를 수신한다.

Kafka 사용하는 서비스

Line에서 Kafka

  • LINE에서 Kafka를 사용하는 방법은 크게 두 가지
  1. 단순하게 분산 큐잉 시스템으로 사용
  2. 데이터 허브로 사용

RIDI에서 Kafka

  • 리디에서는 다양한 사업적 요구를 분석하기 위해서 여러 종류의 분석 플랫폼들을 활용하고 있는데, 리디 서비스와 이러한 플랫폼들 간의 데이터 통로로도 Kafka를 활용

Kakao Alex에서 Kafka

  • 메시지 브로커로 카프카를 사용

Kafka 설명 요약

  • Kafka는 링크드인에서 개발한 Messaging System이다.
  • Kafka는 여러 방식으로 사용된다.
  • Kafka를 데이터의 중간 통로로 이용한다.

Kafka 구성

참고자료

https://hoing.io/archives/5108

실제 Spring Boot 프로젝트에 적용하는 방법

1. Docker를 이용한 Kafka 설치

  • Kafka, Zookeeper 설치
  • Git Clone https://github.com/wurstmeister/kafka-docker.git
  • 클론한 폴더에서 docker-compose.yml 파일 수정
version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    restart: unless-stopped

  kafka:
    build: .
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      DOCKER_API_VERSION: 1.22
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_MESSAGE_MAX_BYTES: 10000000
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_DELETE_TOPIC_ENABLE: 'true'
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    restart: unless-stopped
  • 해당 폴더에서 터미널 실행 docker-compose up -d

2. Spring Boot와 연결

  • application.properties에 kafka 내용 추가
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: foo
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer

참고자료

https://velog.io/@taehodot/SpringBoot-%EC%B9%B4%ED%94%84%EC%B9%B4%EC%99%80-%EC%8A%A4%ED%94%84%EB%A7%81%EB%B6%80%ED%8A%B8-%EC%97%B0%EB%8F%99

3. 실제 사용 코드

  1. 사용자가 채팅 메세지를 보낼경우 수신 받은 후, Kafkao Producer로 발행
	@MessageMapping("/chat/message")
    public void message(ChatMessageDto message) {

        // 토큰 검사 => 에외 발생 시 Exception
        Member requestMember = jwtTokenProvider.getMember(message.getSender());
        message.setSender(requestMember.getNickname());
        message.setMemberName(requestMember.getUsername());
        message.setProfilePath(requestMember.getProfilePath());
        message.setMemberId(requestMember.getId());
        message.setSendDate(LocalDateTime.now());

        if (ChatMessageDto.MessageType.ENTER.equals(message.getType())) {
            chatRoomRedisRepository.enterChatRoom(message.getRoomId());
        }

        // kafka topic 발행
        kafkaProducer.sendMessage(message);
        // 입장이 아닐때만 저장
        if (!ChatMessageDto.MessageType.ENTER.equals(message.getType())) {
            chatRoomRedisRepository.setChatMessage(message, message.getRoomId());
        }
//        redisPublisher.publish(chatRoomRepository.getTopic(message.getRoomId()), message);
    }

아래 Kafka Producer 코드 참고

@Service
public class KafkaProducer {

    private static final String TOPIC = "exam";
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(Object message){
        this.kafkaTemplate.send(TOPIC, message);
    }

}
  1. Kafka Consumer / Redis Publisher 설정
    @KafkaListener(topics="exam", groupId = "foo")
    public void kafkaListener(String message) throws JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();
        ChatMessageDto chatMessageDto = objectMapper.readValue(message, ChatMessageDto.class);
        System.out.println("publish : " + chatMessageDto.toString());
        redisTemplate.convertAndSend(((ChannelTopic) chatRoomRedisRepository.getTopic(chatMessageDto.getRoomId())).getTopic(), chatMessageDto);
    }
  • @KafkaListener 어노테이션을 이용해서 SUB 처리
  • redisTemplate.convertAndSend로 PUB 처리
  1. Redis Subscribe 설정 (Topic에 따라 다르게 처리), STOMP PUB 처리
 @Override
    public void onMessage(Message message, byte[] pattern) {
        try {

            String topic = (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());
            String publishMessage = (String) redisTemplate.getStringSerializer().deserialize(message.getBody());

            if(topic.equals("comments")){
                System.out.println("comments");
                CommentResponse commentResponse =  objectMapper.readValue(publishMessage, CommentResponse.class);
                messagingTemplate.convertAndSend("/sub/title-hakwon/comments/", commentResponse);
            }else if (topic.equals("likes")){
                LikeResponse likeResponse = objectMapper.readValue(publishMessage, LikeResponse.class);
                messagingTemplate.convertAndSend("/sub/title-hakwon/comments/likes", likeResponse);
            }else{
                ChatMessageDto roomMessage = objectMapper.readValue(publishMessage, ChatMessageDto.class);
                messagingTemplate.convertAndSend("/sub/chat/room/" + roomMessage.getRoomId(), roomMessage);
            }

        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }

순서 요약 :
메세지 수신 시, 메세지 처리 후 Kafka Producer를 통해 Kafka Server로 전달.
이후 Consumer를 통해 메세지 처리. Consumer는 Redis Publish 수헹.
Redis Subscribe에서 해당 메세지를 받은 후, STOMP Publish 수행.
STOMP Subscribe는 Client이므로 메세지가 Client에 정상적으로 전달.

0개의 댓글