Spring Boot로 Kafka producer consumer 구현해보자

Karim·2025년 7월 28일
3

kafka 예제

목록 보기
4/4

1. Version

💬

  • jdk : 21
  • Kafka : 3.5.1
  • Spring boot : 3.5.4

2. build.gradle

💬 build.gredle dependencies

implementation 'org.springframework.kafka:spring-kafka'

3. application.yml

server:
  port: 8080

spring:
  application:
    name: kafka_spring

  kafka:
    producer:
      bootstrap-servers: kafka-server:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      bootstrap-servers: kafka-server:9092
      group-id: karim
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
topic:
  consumer:
    name: karim-rcv

4. Kafka Producer 구현

✒️

rest로 받은 데이터를 kafka 설치한 서버에 원하는 메세지를 보내는 코드이다.

  • KafkaProducerConfig
@Service
@RequiredArgsConstructor
public class KafkaProducerConfig {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String msg, String topic) {

        System.out.printf("Produce topic : %s, message : %s%n", topic, msg);
        kafkaTemplate.send(topic, msg);
    }
}
  • ProducerController
@RestController
@RequiredArgsConstructor
public class ProducerController {

    private final KafkaProducerConfig kafkaProducerConfig;

    @PostMapping("/kafka")
    // {"topic":"karim-send", "message":"test"}
    public String sendMsg(@RequestBody ProducerMessage producerMessage) {
        kafkaProducerConfig.sendMessage(producerMessage.getMessage(), producerMessage.getTopic());
        return "OK";
    }
}
  • ProducerMessage
@Getter
@Setter
public class ProducerMessage {

    String topic;
    String message;
}

5. Kafka Consumer 구현

✒️

kafka Console producer 에서 보낸 데이터를 spring boot 서버로 받는 코드이다.

  • KafkaConsumerConfig
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
  • ConsumerListener
@Component
public class ConsumerListener {

    @KafkaListener(topics = "${topic.consumer.name}", groupId = "${spring.kafka.consumer.group-id}")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }
}

6. runining log

💬 producer test

  • postmen

  • spring boot log

Produce topic : karim-send, message : test
  • kafka Console consumer
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic karim-send
test

💬 consumer test

  • kafka Console producer
[root@localhost bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic karim-rcv
>karim
  • spring boot log
Received message: karim

📌


📚 참고

profile
나도 보기 위해 정리해 놓은 벨로그

0개의 댓글