Kafka - Spring boot 실습 정리

이보혁·2024년 10월 20일

Kafka docker compose

기본 설정

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    depends_on:
      - zookeeper

Spring Boot 초기 설정

  • Spring Initializr 에서 Dependencies에 Spring Web, Spring for Apache Kafka 추가한 후 프로젝트 생성
  • 생성된 프로젝트에서 application.yml 파일에 Kafka 설정을 추가
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: group_id
      auto-offset-reset: earliest
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Producer-Consumer 패턴 구현

메시지를 생성하고 처리하는 패턴으로, 모든 메시지 처리는 send 시 함께 보내는 topic을 기준으로 전달된다.
각 topic에 대해서 consumer는 group id를 기준으로 한 번씩 처리된다.

예를 들어, example_topic이라는 topic 기준으로 생성된 메시지는 group_id1, group_id2로 consumer가 붙어있다면 각각 처리되어 2번의 함수 실행이 유발된다는 뜻

Producer

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    private static final String TOPIC = "example_topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        System.out.println("Producing message: " + message);
        kafkaTemplate.send(TOPIC, message);
    }
}

Consumer

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "example_topic", groupId = "group_id")
    public void consume(String message) {
        System.out.println("Consumed message: " + message);
    }
}

파티션과 컨슈머 그룹

파티션

  • Kafka의 Topic은 여러 파티션으로 구성되어 있으며, 각 파티션은 순서가 보장된다.
  • 파티션은 늘릴수는 있지만, 줄일 수는 없다. [[Kafka 파티션 줄이지 못하는 이유]]

컨슈머 그룹

  • Kafka에서는 하나의 토픽에 대해 여러 컨슈머를 둘 수 있는데, 컨슈머들을 그룹으로 묶을 수 있다.
    • 같은 그룹에 속한 컨슈머들은 서로 메시지를 나눠서 처리한다.
    • 즉, 다른 그룹을 만들면 메시지를 중복처리 할 수 있다.
예시
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerGroup1 {

    @KafkaListener(topics = "example_topic", groupId = "group_id_1")
    public void consumeGroup1(String message) {
        System.out.println("Consumer Group 1 - Consumed message: " + message);
    }
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerGroup2 {

    @KafkaListener(topics = "example_topic", groupId = "group_id_2")
    public void consumeGroup2(String message) {
        System.out.println("Consumer Group 2 - Consumed message: " + message);
    }
}

데이터 압축

압축 방식

Kafka에서는 3가지 압축 방식을 지원한다.
다만 압축하려는 데이터 타입에 따라 압축률과 속도가 다를 수 있다.
예를 들어, 텍스트 데이터는 압축률이 높게 나오지만, 바이너리 데이터나 이미 압축된 데이터는 압축률이 낮게 나올 수 있다.

GZIP

  • 압축률 높음, 압축 속도 느림
  • 압축률이 높아서 네트워크 대역폭을 절약할 수 있다.
  • 사용 사례: 대용량 데이터 전송, 대용량 로그 전송 등

Snappy

  • 압축률 중간, 압축 속도 빠름
  • 낮은 CPU를 사용하면서도 상대적으로 괜찮은 압축률을 제공한다.
  • 사용 사례: 데이터 처리 속도가 중요한 실시간 애플리케이션, 데이터 스트리밍, 로그 전송 등

LZ4

  • 압축률 낮음, 압축 속도 매우 빠름
  • 거의 실시간으로 압축/해제가 가능
  • 사용 사례: 초고속 데이터 처리가 필요한 환경, 저지연성이 중요한 실시간 데이터 스트리밍, 대규모 메시지 브로커 시스템 등

압축 설정

Producer 측에만 설정하면 된다.
Consumer는 압축된 데이터를 자동으로 해제한다.

spring:
  kafka:
    producer:
      compression-type: snappy

0개의 댓글