Kafka 학습을 위한 실습 진행 중 만난 에러...
docker로 kafka 컨테이너를 실행하고 스프링부트 프로젝트에서 작성된 producer 와 consumer 를 실행하면 위와 같은 에러가 무한으로 발생했다.
딱 보면 broker는 하나인데, Replica 는 3개라고 안되는 것 같았다. (근데 어떻게 해결해)
kafka, zookeeper는 docker-compose 로 실행하였다. 기존 작성했던 producer
,consumer
, docker-compose.yml
파일을 살펴보자.
spring-initializer 를 통해 kafka, web, lombok 등의 dependecny 를 추가해주었다.
@RestController @RequiredArgsConstructor public class ProducerController { private final ProducerService producerService; @GetMapping("/send") public String sendMessage(@RequestParam("topic") String topic, @RequestParam("key") String key, @RequestParam("message") String message) { producerService.sendMessage(topic, key, message); return "Message sent to Kafka topic"; } }
@Service @RequiredArgsConstructor public class ProducerService { private final KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String topic , String key, String message) { for (int i = 0; i < 10; i++) { kafkaTemplate.send(topic, key, message + " " + i); } } }
간단하게 kafkaTemplate 을 통해 요청으로 들어온 topic에 메시지를 전송하는 코드이다.
@Configuration public class ProducerApplicationKafkaConfig { @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
kafka 와 연결을 하기 위한 간단한 config이다. 중요한 건
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
를 통해 kafka 브로커와 연결하는 것이다.
spring.application.name=producer server.port=8090 # 카프카 브로커 연결 spring.kafka.bootstrap-servers=localhost:9092 # key, value 직렬/역직렬화 타입 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
여기까지 봤을 때, producer 에 대한 내용은 틀린 게 없어보인다. consumer 도 살펴보자.
config 내용이나 다른 건 동일하고, kafka 의 토픽에 들어온 메시지를 수신하는 역할의 service 코드를 보자.
@Slf4j @Service public class ConsumerService { // 이 메서드는 Kafka에서 메시지를 소비하는 리스너 메서드 // @KafkaListener 어노테이션은 이 메서드를 Kafka 리스너로 설정 @KafkaListener(groupId = "group_a", topics = "topic1") // groupId는 컨슈머 그룹을 지정하여 동일한 그룹에 속한 다른 컨슈머와 메시지를 분배받음 public void consumeFromGroupA(String message) { log.info("Group A consumed message from topic1: " + message); } // 동일한 토픽을 다른 그룹 ID로 소비 @KafkaListener(groupId = "group_b", topics = "topic1") public void consumeFromGroupB(String message) { log.info("Group B consumed message from topic1: " + message); } // 다른 토픽을 다른 그룹 ID로 소비 @KafkaListener(groupId = "group_c", topics = "topic2") public void consumeFromTopicC(String message) { log.info("Group C consumed message from topic2: " + message); } // 다른 토픽을 다른 그룹 ID로 소비 @KafkaListener(groupId = "group_c", topics = "topic3") public void consumeFromTopicD(String message) { log.info("Group C consumed message from topic3: " + message); } @KafkaListener(groupId = "group_d", topics = "topic4") public void consumeFromPartition0(String message) { log.info("Group D consumed message from topic4: " + message); } }
@kafkaListener
를 통해 메소드 단위의 리스너들을 정의했다. groupId 와 topics 를 작성함으로써, 같은 topic에 들어온 메시지를 다른 메소드가 분배해서 소비할 수 있다.
여기까지도 별 문제가 없는 것 같다. kafka 에서 직접 topic을 미리 생성해 놓지 않아 실행 시 초기화 문제가 발생할 수도 있을까? 란 생각은 했지만, 실행 후 kafka-ui 에는 어노테이션에 작성한 4개의 topic이 생성돼 있었다.
하지만, 문제는 kafka 설정과 topic 생성할 때에 있는 것이 맞았다.
kafka 는 기본적으로 replication.factor 을 설정하여 토픽을 생성하는데, 이 값은 kafka 클러스터의 브로커 수에 따라 달라진다.
나는 broker 를 하나만 생성했기에 replication.factor 를 1개로 설정해야 했다. 기본이 3개로 실행돼서 맨 처음 봤던 에러가 발생하나보다.
그래서, docker-compose.yml
에 아래와 같이 설정을 추가해줬다.
version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: sparta-zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
kafka:
image: confluentinc/cp-kafka:latest
container_name: sparta-kafka
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
###### Topic 생성 시의 복제 인자를 1개로 설정
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
######
depends_on:
- zookeeper
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: sparta-kafka-ui
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
해당 설정 내용은 docker-compose 를 사용해서 이미지를 실행시키기 위한 필요 설정들이다.
중요한 부분은 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
이다.
이 한 줄이 없어서 실습 진행도 못하고 흑흑...
실행 후 producer 앱의 api 를 요청하면 topic 에 들어온 message 를 수신한 것과 연결된 consumer를 kafka-ui 에서 쉽게 확인해 볼 수 있었다.
생성된 토픽 중 topic1
에 메시지를 전송한 모습
topic1 에 producer가 메시지를 전송한 모습
topic1 의 메시지를 소비할 수 있는 group의 consumer 들
카프카 설정이 어렵다고는 들었지만, 초반부터 힘이 빠졌다...그래도 여러가지 설정을 적용해가며 이해를 해보자~!~!