TIL : Kafka: 포괄적인 개요와 실습

Skadi·2024년 8월 22일
0

1. Kafka 소개


1.1 Kafka란 무엇인가?

Kafka는 실시간 데이터 피드를 처리하기 위해 설계된 분산 스트리밍 플랫폼입니다. 원래 LinkedIn에서 개발되어 Apache Software Foundation에 오픈 소스로 기여된 Kafka는 실시간 데이터 파이프라인 구축과 스트리밍 애플리케이션에 널리 사용됩니다. 기존의 메시지 브로커와 달리, Kafka는 대규모 데이터를 처리하는 데 최적화되어 있으며, 많은 빅 데이터 아키텍처에서 중요한 역할을 합니다.

1.2 Kafka의 주요 역할

  • 실시간 데이터 처리: Kafka는 대규모 데이터를 실시간으로 처리하고 분석할 수 있어, 데이터에서 즉각적인 인사이트를 도출해야 하는 애플리케이션에 적합합니다.
  • 데이터 통합: Kafka는 다양한 소스에서 데이터를 수집하고 통합하여 분석할 수 있도록 지원합니다.
  • 내결함성: Kafka는 데이터 손실 없이 안정적으로 데이터를 저장하고 전송하며, 하드웨어 장애 발생 시에도 데이터를 보호하기 위해 여러 브로커에 데이터를 복제합니다.

1.3 Kafka의 장단점

1.3.1 장점

  • 신뢰성:

    • 데이터 복제: Kafka는 데이터를 여러 브로커에 복제하여 저장하므로, 브로커 장애 시에도 데이터 손실을 방지할 수 있습니다.
    • 확인 메커니즘: Kafka는 데이터가 소비자에게 성공적으로 전달되었는지 확인하는 기능을 제공합니다.
  • 유연성:

    • 다양한 소비자 패턴: Kafka는 여러 소비자가 동시에 데이터를 구독할 수 있도록 지원합니다.
    • 프로토콜 지원: Kafka는 자체 프로토콜을 주로 사용하지만, 다양한 클라이언트를 통해 다른 언어에서도 사용할 수 있습니다.
  • 확장성:

    • 분산 시스템: Kafka의 아키텍처는 데이터를 여러 노드에 분산 처리할 수 있어 대규모 데이터 처리에 적합합니다.
    • 수평 확장성: Kafka는 브로커와 파티션을 추가하여 쉽게 확장할 수 있습니다.
  • 성능:

    • 높은 처리량: Kafka는 대규모 데이터를 빠르게 처리할 수 있어 고처리량을 요구하는 사용 사례에 적합합니다.
    • 저지연: Kafka는 데이터 전송 지연을 최소화하여 실시간 데이터 처리가 가능합니다.
  • 관리 및 모니터링:

    • 관리 도구: Kafka는 다양한 관리 도구를 제공하여 클러스터를 모니터링하고 관리할 수 있습니다.
    • 플러그인 시스템: Kafka는 플러그인을 통해 기능을 확장할 수 있습니다.

1.3.2 단점

  • 설정 및 운영 복잡성:

    • 복잡한 설정: Kafka의 초기 설정은 다소 복잡할 수 있으며, 클러스터링 및 분산 환경에서는 더 많은 설정이 필요할 수 있습니다.
    • 운영 관리: 대규모 환경에서 Kafka를 운영하고 관리하는 데는 추가적인 자원과 전문 지식이 필요할 수 있습니다.
  • 성능 문제:

    • 브로커 오버헤드: 높은 트래픽 상황에서는 브로커에 오버헤드가 발생하여 성능 저하가 발생할 수 있습니다.
    • 대규모 메시지 처리: 매우 대규모 메시지를 처리할 때 성능 저하가 발생할 수 있으며, 이러한 경우 적절한 클러스터링 및 최적화가 필요합니다.
  • 운영 비용:

    • 리소스 소비: Kafka는 메모리와 CPU 자원을 많이 소비할 수 있어, 충분한 하드웨어 자원을 제공해야 원활하게 운영될 수 있습니다.
    • 지속적인 모니터링과 유지보수: Kafka는 지속적인 모니터링과 유지보수가 필요하며, 이를 위해 추가적인 인력과 비용이 발생할 수 있습니다.
  • 러닝 커브:

    • 학습 필요성: Kafka의 개념과 설정을 이해하는 데 시간이 걸릴 수 있으며, 다소 난이도가 높습니다.

2. Kafka의 핵심 구성 요소


2.1 메시지 (Message)

  • 정의: 메시지는 Kafka에서 전달되는 기본 데이터 단위로, 로그 데이터, 이벤트 데이터 등이 될 수 있습니다.
  • 구성: 메시지는 키(key), 값(value), 타임스탬프(timestamp), 그리고 몇 가지 메타데이터로 구성됩니다.

2.2 프로듀서 (Producer)

  • 역할: 프로듀서는 메시지를 생성하고 Kafka로 보내는 역할을 합니다. 예를 들어, 웹 애플리케이션이 로그 데이터를 Kafka에 전송하는 경우 프로듀서가 됩니다.
  • 기능: 프로듀서는 특정 토픽(topic)에 메시지를 전송합니다.

2.3 토픽 (Topic)

  • 정의: 토픽은 메시지가 전송되고 소비자에게 읽혀지는 논리적인 채널입니다.
  • 구조: 토픽은 여러 파티션(partition)으로 나누어질 수 있으며, 각 파티션은 메시지를 순서대로 저장합니다. 이 파티셔닝을 통해 병렬 처리가 가능합니다.
  • 예시: "user-activity"라는 토픽은 사용자의 활동 로그를 저장할 수 있습니다.

2.4 파티션 (Partition)

  • 역할: 파티션은 토픽을 물리적으로 나눈 단위로, 각 파티션은 독립적으로 메시지를 저장하고 관리합니다. 파티션 내의 메시지는 고유한 오프셋(offset)으로 식별됩니다.
  • 기능: 파티션을 통해 데이터를 병렬로 처리할 수 있으며, 클러스터 내의 여러 브로커에 분산시켜 저장할 수 있습니다.

2.5 키 (Key)

  • 목적: 키는 메시지를 특정 파티션에 할당하는 데 사용됩니다. 동일한 키를 가진 메시지는 항상 동일한 파티션에 저장됩니다.
  • 예시: 특정 사용자 ID를 키로 사용하여 해당 사용자의 모든 이벤트가 동일한 파티션에 저장되도록 할 수 있습니다.

2.6 컨슈머 (Consumer)

  • 역할: 컨슈머는 토픽에서 메시지를 읽어 처리하는 역할을 합니다. 컨슈머는 특정 컨슈머 그룹(consumer group)에 속하며, 같은 그룹에 속한 컨슈머들은 토픽의 파티션을 분산 처리합니다.
  • 스티키 파티셔닝: 컨슈머는 일반적으로 특정 파티션에 붙어 계속해서 데이터를 처리하는 스티키 파티셔닝을 사용하여, 데이터 지역성을 높이고 캐시 히트율을 증가시켜 처리 성능을 향상시킵니다.

2.7 브로커 (Broker)

  • 정의: 브로커는 Kafka 클러스터의 서버로, 메시지를 저장하고 전송하는 역할을 합니다.
  • 기능: 하나의 Kafka 클러스터는 여러 브로커로 구성될 수 있으며, 각 브로커는 하나 이상의 토픽 파티션을 관리합니다.

2.8 주키퍼 (Zookeeper)

  • 역할: 주키퍼는 Kafka 클러스터를 관리하고 조정하는 분산 코디네이션 서비스입니다. 브로커의 메타데이터를 저장하고 브로커 간의 상호작용을 조정합니다.

3. Kafka와 RabbitMQ 비교


3.1 설계 철학

  • RabbitMQ: 안정적인 메시지 전달과 큐잉에 중점을 둔 전통적인 메시지 브로커입니다.
  • Kafka: 대규모 실시간 데이터 스트림의 저장과 분석에 중점을 둔 분산 스트리밍 플랫폼입니다.

3.2 메시지 모델

  • RabbitMQ: 큐(queue)를 중심으로 메시지를 전달합니다. 메시지는 큐에 저장되고, 큐에서 하나 이상의 컨슈머에게 전달됩니다.
  • Kafka: 토픽(topic)을 중심으로 메시지를 저장합니다. 메시지는 토픽의 파티션에 저장되고, 컨슈머는 이 파티션에서 메시지를 읽습니다.

3.3 메시지 지속성

  • RabbitMQ: 메시지를 메모리나 디스크에 저장할 수 있으며, 일반적으로 단기 저장을 목표로 합니다.
  • Kafka: 메시지를 디스크에 저장하며, 장기 저장을 목표로 합니다. 데이터 로그는 설정된 기간 동안 보존됩니다.

3.4 사용 사례

  • RabbitMQ: 작업 큐, 요청/응답 패턴, 비동기 작업 처리 등 전통적인 메시지 큐 사용 사례에 적합합니다.
  • Kafka: 실시간 데이터 스트리밍, 로그 수집 및 분석, 이벤트 소싱 등 대규모 데이터 스트림 처리에 적합합니다.

4. Kafka 실습


**4.1

Kafka 설치**

4.1.1 Docker Compose를 사용한 설치

  1. Docker Compose 파일 생성: 아래 내용을 docker-compose.yml로 저장합니다.

    version: '3.8'
    services:
      zookeeper:
        image: wurstmeister/zookeeper:3.4.6
        platform: linux/amd64
        ports:
          - "2181:2181"
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
    
      kafka:
        image: wurstmeister/kafka:latest
        platform: linux/amd64
        ports:
          - "9092:9092"
        environment:
          KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
          KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
          KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
    
      kafka-ui:
        image: provectuslabs/kafka-ui:latest
        platform: linux/amd64
        ports:
          - "8080:8080"
        environment:
          KAFKA_CLUSTERS_0_NAME: local
          KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
          KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
          KAFKA_CLUSTERS_0_READONLY: "false"
  2. Docker Compose 실행: Kafka와 Zookeeper 컨테이너를 시작하려면 아래 명령어를 실행합니다.

    docker compose up -d
  3. Kafka UI 접속: 컨테이너가 실행된 후, 브라우저에서 localhost:8080에 접속하여 Kafka UI에 접속할 수 있습니다.

4.2 프로듀서 애플리케이션 구축

4.2.1 프로젝트 설정

  1. Spring Boot 프로젝트 생성: start.spring.io를 사용하여 새로운 프로젝트를 생성합니다. Web과 Spring Kafka 의존성을 포함합니다.

  2. Gradle 디펜던시 설정: build.gradle 파일에 아래와 같은 디펜던시를 추가합니다.

    dependencies {
    	implementation 'org.springframework.boot:spring-boot-starter-web'
    	implementation 'org.springframework.kafka:spring-kafka'
    	compileOnly 'org.projectlombok:lombok'
    	annotationProcessor 'org.projectlombok:lombok'
    	testImplementation 'org.springframework.boot:spring-boot-starter-test'
    	testImplementation 'org.springframework.kafka:spring-kafka-test'
    	testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
    }

4.2.2 설정

  1. 애플리케이션 설정: application.properties 파일에서 Kafka 프로듀서 설정을 아래와 같이 구성합니다.

    spring.application.name=producer
    server.port=8090
    
    spring.kafka.bootstrap-servers=localhost:9092
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  2. Kafka 설정 클래스 작성: ProducerApplicationKafkaConfig.java 파일을 생성하고 아래 코드를 추가합니다.

    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class ProducerApplicationKafkaConfig {
        @Bean
        public ProducerFactory<String, String> producerFactory() {
            Map<String, Object> configProps = new HashMap<>();
            configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    }
  3. 프로듀서 컨트롤러 작성: ProducerController.java 파일을 생성하고 아래 코드를 추가합니다.

    import lombok.RequiredArgsConstructor;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @RequiredArgsConstructor
    public class ProducerController {
    
        private final ProducerService producerService;
    
        @GetMapping("/send")
        public String sendMessage(@RequestParam("topic") String topic,
                                  @RequestParam("key") String key,
                                  @RequestParam("message") String message) {
            producerService.sendMessage(topic, key, message);
            return "Message sent to Kafka topic";
        }
    }
  4. 프로듀서 서비스 작성: ProducerService.java 파일을 생성하고 아래 코드를 추가합니다.

    import lombok.RequiredArgsConstructor;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    @RequiredArgsConstructor
    public class ProducerService {
    
        private final KafkaTemplate<String, String> kafkaTemplate;
    
        public void sendMessage(String topic, String key, String message) {
            for (int i = 0; i < 10; i++) {
                kafkaTemplate.send(topic, key, message + " " + i);
            }
        }
    }

4.3 컨슈머 애플리케이션 구축

4.3.1 프로젝트 설정

  1. Spring Boot 프로젝트 생성: start.spring.io에서 새로운 프로젝트를 생성하고, Web과 Spring Kafka 의존성을 추가합니다.

  2. Gradle 디펜던시 설정: build.gradle 파일을 아래와 같이 구성합니다.

    dependencies {
    	implementation 'org.springframework.boot:spring-boot-starter-web'
    	implementation 'org.springframework.kafka:spring-kafka'
    	compileOnly 'org.projectlombok:lombok'
    	annotationProcessor 'org.projectlombok:lombok'
    	testImplementation 'org.springframework.boot:spring-boot-starter-test'
    	testImplementation 'org.springframework.kafka:spring-kafka-test'
    	testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
    }

4.3.2 설정

  1. 애플리케이션 설정: application.properties 파일을 아래와 같이 구성합니다.

    spring.application.name=consumer
    server.port=8091
    
    spring.kafka.bootstrap-servers=localhost:9092
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  2. Kafka 설정 클래스 작성: ConsumerApplicationKafkaConfig.java 파일을 생성하고 아래 코드를 추가합니다.

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @EnableKafka
    @Configuration
    public class ConsumerApplicationKafkaConfig {
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> configProps = new HashMap<>();
            configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return new DefaultKafkaConsumerFactory<>(configProps);
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    }
  3. 컨슈머 서비스 작성: ConsumerService.java 파일을 생성하고 아래 코드를 추가합니다.

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Service;
    
    @Slf4j
    @Service
    public class ConsumerService {
    
        @KafkaListener(groupId = "group_a", topics = "topic1")
        public void consumeFromGroupA(String message

) {
log.info("Group A consumed message from topic1: " + message);
}

    @KafkaListener(groupId = "group_b", topics = "topic1")
    public void consumeFromGroupB(String message) {
        log.info("Group B consumed message from topic1: " + message);
    }

    @KafkaListener(groupId = "group_c", topics = "topic2")
    public void consumeFromTopicC(String message) {
        log.info("Group C consumed message from topic2: " + message);
    }

    @KafkaListener(groupId = "group_c", topics = "topic3")
    public void consumeFromTopicD(String message) {
        log.info("Group C consumed message from topic3: " + message);
    }

    @KafkaListener(groupId = "group_d", topics = "topic4")
    public void consumeFromPartition0(String message) {
        log.info("Group D consumed message from topic4: " + message);
    }
}
```

4.4 Kafka UI를 통해 확인하기

  1. 애플리케이션 실행: 위에서 작성한 프로듀서와 컨슈머 애플리케이션을 실행합니다.

  2. Kafka UI 확인: Kafka UI에서 Topics 탭과 Consumers 탭을 확인합니다.

  3. 메시지 발행: 프로듀서 애플리케이션을 통해 토픽을 지정하고 메시지를 발행합니다. 예를 들어, 토픽을 topic1으로 지정하고 메시지를 발행하면, Kafka UI에서 해당 메시지가 수신된 것을 확인할 수 있습니다.

  4. 컨슈머 확인: 컨슈머 애플리케이션의 로그를 확인하여 메시지가 적절하게 수신되었는지 확인합니다. 그룹 ID에 따라 메시지가 분산 처리된 것을 확인할 수 있습니다.

0개의 댓글