Spring boot로 kafka와 연결하기(kafka replication error)

twonezero·2024년 8월 19일
1

Kafka 학습을 위한 실습 진행 중 만난 에러...
docker-compose-kafka-error

docker로 kafka 컨테이너를 실행하고 스프링부트 프로젝트에서 작성된 producer 와 consumer 를 실행하면 위와 같은 에러가 무한으로 발생했다.

딱 보면 broker는 하나인데, Replica 는 3개라고 안되는 것 같았다. (근데 어떻게 해결해)

확인 작업

kafka, zookeeper는 docker-compose 로 실행하였다. 기존 작성했던 producer ,consumer, docker-compose.yml 파일을 살펴보자.

Producer

spring-initializer 를 통해 kafka, web, lombok 등의 dependecny 를 추가해주었다.

Controller & Service 코드

@RestController
@RequiredArgsConstructor
public class ProducerController {
    private final ProducerService producerService;
    @GetMapping("/send")
    public String sendMessage(@RequestParam("topic") String topic,
                              @RequestParam("key") String key,
                              @RequestParam("message") String message) {
        producerService.sendMessage(topic, key, message);
        return "Message sent to Kafka topic";
    }
}
@Service
@RequiredArgsConstructor
public class ProducerService {
    private final KafkaTemplate<String, String> kafkaTemplate;
    public void sendMessage(String topic , String key, String message) {
        for (int i = 0; i < 10; i++) {
            kafkaTemplate.send(topic, key, message + " " + i);
        }
    }
}

간단하게 kafkaTemplate 을 통해 요청으로 들어온 topic에 메시지를 전송하는 코드이다.

Configuration for kafka

@Configuration
public class ProducerApplicationKafkaConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

kafka 와 연결을 하기 위한 간단한 config이다. 중요한 건 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG 를 통해 kafka 브로커와 연결하는 것이다.

application.properties

spring.application.name=producer
server.port=8090
# 카프카 브로커 연결
spring.kafka.bootstrap-servers=localhost:9092
# key, value  직렬/역직렬화 타입
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

여기까지 봤을 때, producer 에 대한 내용은 틀린 게 없어보인다. consumer 도 살펴보자.

Consumer

config 내용이나 다른 건 동일하고, kafka 의 토픽에 들어온 메시지를 수신하는 역할의 service 코드를 보자.

Kafka Listener

@Slf4j
@Service
public class ConsumerService {
    // 이 메서드는 Kafka에서 메시지를 소비하는 리스너 메서드
    // @KafkaListener 어노테이션은 이 메서드를 Kafka 리스너로 설정
    @KafkaListener(groupId = "group_a", topics = "topic1")
    // groupId는 컨슈머 그룹을 지정하여 동일한 그룹에 속한 다른 컨슈머와 메시지를 분배받음
    public void consumeFromGroupA(String message) {
        log.info("Group A consumed message from topic1: " + message);
    }
    // 동일한 토픽을 다른 그룹 ID로 소비
    @KafkaListener(groupId = "group_b", topics = "topic1")
    public void consumeFromGroupB(String message) {
        log.info("Group B consumed message from topic1: " + message);
    }
    // 다른 토픽을 다른 그룹 ID로 소비
    @KafkaListener(groupId = "group_c", topics = "topic2")
    public void consumeFromTopicC(String message) {
        log.info("Group C consumed message from topic2: " + message);
    }
    // 다른 토픽을 다른 그룹 ID로 소비
    @KafkaListener(groupId = "group_c", topics = "topic3")
    public void consumeFromTopicD(String message) {
        log.info("Group C consumed message from topic3: " + message);
    }
    @KafkaListener(groupId = "group_d", topics = "topic4")
    public void consumeFromPartition0(String message) {
        log.info("Group D consumed message from topic4: " + message);
    }
}

@kafkaListener 를 통해 메소드 단위의 리스너들을 정의했다. groupId 와 topics 를 작성함으로써, 같은 topic에 들어온 메시지를 다른 메소드가 분배해서 소비할 수 있다.

여기까지도 별 문제가 없는 것 같다. kafka 에서 직접 topic을 미리 생성해 놓지 않아 실행 시 초기화 문제가 발생할 수도 있을까? 란 생각은 했지만, 실행 후 kafka-ui 에는 어노테이션에 작성한 4개의 topic이 생성돼 있었다.

해결 : docker-compose.yml 수정

하지만, 문제는 kafka 설정과 topic 생성할 때에 있는 것이 맞았다.

kafka 는 기본적으로 replication.factor 을 설정하여 토픽을 생성하는데, 이 값은 kafka 클러스터의 브로커 수에 따라 달라진다.

나는 broker 를 하나만 생성했기에 replication.factor 를 1개로 설정해야 했다. 기본이 3개로 실행돼서 맨 처음 봤던 에러가 발생하나보다.

그래서, docker-compose.yml 에 아래와 같이 설정을 추가해줬다.

version: '3.7'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: sparta-zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: sparta-kafka
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      
 ###### Topic 생성 시의 복제 인자를 1개로 설정
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
 ######
    depends_on:
      - zookeeper

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: sparta-kafka-ui
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181



해당 설정 내용은 docker-compose 를 사용해서 이미지를 실행시키기 위한 필요 설정들이다.

중요한 부분은 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR 이다.
이 한 줄이 없어서 실습 진행도 못하고 흑흑...

실행 후 producer 앱의 api 를 요청하면 topic 에 들어온 message 를 수신한 것과 연결된 consumer를 kafka-ui 에서 쉽게 확인해 볼 수 있었다.

  • 생성된 토픽 중 topic1에 메시지를 전송한 모습

  • topic1 에 producer가 메시지를 전송한 모습

  • topic1 의 메시지를 소비할 수 있는 group의 consumer 들

카프카 설정이 어렵다고는 들었지만, 초반부터 힘이 빠졌다...그래도 여러가지 설정을 적용해가며 이해를 해보자~!~!

profile
소소한 행복을 즐기는 백엔드 개발자입니다😉

0개의 댓글