Spring Boot Kafka 사용

최병훈·2024년 10월 9일
post-thumbnail

1. Spring Boot Kafka 사용

1)JDK 17 이상 버전 설치

  • 설치 확인
    • Java Runtime(실행 환경) 확인: java -version
    • Java Development Kit(개발 환경) 확인: javac -version

2)Spring Framework IDE 설치

  • Intelli J Ultimate 버전 설치

3)하나의 자바 애플리케이션에서 게시와 구독

  • Spring Boot Project 생성

    • 의존성 설정 (JDK가 기본적으로 제공하지 않는 라이브러리를 사용)
      - Lombok
      - Spring Boot DevTools
      - Spring Web
      - Spring Apache Kafka
  • yaml(yml) 파일 작성 요령

    • 내부 속성을 설정 : : 를 하고 들여쓰기를 한 후 작성
    • 속성 : 의 형태로 작성
    • 배열 : - 값 의 형태로 줄 단위로 나열
  • kafka 설정

    • resources 디렉토리의 application.properties 파일을 삭제하고 application.yml 파일을 생성하고 작성

      spring:
        kafka:
          bootstrap-servers: localhost:9092
          consumer:
            group-id: bh
            auto-offset-reset: earliest
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          producer:
            key-serializer: org.apache.kafka.common.serialization.StringDeserializer
            value-serializer: org.apache.kafka.common.serialization.StringDeserializer
    • 설정 클래스를 추가하고 작성(KafkaConfiguration)

      package com.bh.kafka_example;
      
      // Java 에서 import 는 짧게 쓰기 위해서 사용
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.common.serialization.StringSerializer;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.kafka.core.DefaultKafkaProducerFactory;
      import org.springframework.kafka.core.KafkaTemplate;
      import org.springframework.kafka.core.ProducerFactory;
      
      import java.util.HashMap;
      import java.util.Map;
      
      // 환경 설정 클래스라는 것을 알려주는 어노테이션
      // 인스턴스를 생성해서 직접 관리하지 않고 프레임워크가 생명 주기를 관리 : 제어의 역전, IoC(Inversion of Control)
      @Configuration
      public class KafkaConfiguration {
          // 설정 파일에서 값을 가지고 와서 주입하는 코드
          @Value("${spring.kafka.bootstrap-servers}")
          private String bootstrapServers;
      
          // 메세지를 게시하는 프로듀서의 설정
          @Bean
          public ProducerFactory<String, String> producerFactory() {
              Map<String,Object> configs = new HashMap<>();
              configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
              configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
              configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
              return new DefaultKafkaProducerFactory(configs);
          }
      
          // 카프카 사용을 위한 인스턴스를 생성해주는 메서드
          @Bean
          public KafkaTemplate<String, String> kafkaTemplate() {
              return new KafkaTemplate<>(producerFactory());
          }
      }
    • 메시지를 게시하는 클래스를 생성(KafkaProducer)

      package com.bh.kafka_example;
      
      import lombok.RequiredArgsConstructor;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.kafka.core.KafkaTemplate;
      import org.springframework.stereotype.Service;
      
      //서비스 클래스 라는 것을 명시하고 인스턴스를 자동으로 생성해달라고 하는 어노테이션
      @Service
      //자동 주입되는 인스턴스를 대입받는 생성자를 만들어 달라는 어노테이션
      @RequiredArgsConstructor
      public class KafkaProducer {
         //토픽 이름 설정
         private static final String TOPIC = "exam-topic";
      
         //의존성 주입을 받기 위한 어노테이션
         @Autowired
         private  KafkaTemplate<String, String> kafkaTemplate;
         //로그 출력하기 위한 인스턴스를 생성
         private final Logger log = LoggerFactory.getLogger(getClass());
      
         //메시지 전송하는 메서드
         public void sendMessage(String name, int age) {
             log.info("Produce message : {}{}", name, age);
             //System.out.println("전송된 메시지:" +  name + age);
             String message = "{\"name\":" + "\"" + name + "\"" + ", \"age\":" + age + "}";
             //실제 메시지 전송
             this.kafkaTemplate.send(TOPIC, message);
         }
      }
    • JSON 파싱을 위한 라이브러리의 의존성을 설정(build.gradle 파일의 dependencies 에 추가)

      implementation 'org.json:json:20190722'

      추가하고 gradle 업데이트를 수행

    • 메시지 구독 클래스를 생성(KafkaConsumer)

      package com.bh.kafka_example;
      
      import org.json.JSONObject;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.springframework.kafka.annotation.KafkaListener;
      import org.springframework.stereotype.Service;
      
      import java.io.IOException;
      
      @Service
      public class KafkaConsumer {
          private final Logger log = LoggerFactory.getLogger(getClass());
      
          //exam-topic에 들어오는 메시지를 읽어내는 메서드: 비동기적으로 백그라운드에서 수행
          //토픽이 들어오면 자동으로 호출
          @KafkaListener(topics = "exam-topic", groupId = "adamsoft")
          public void consume(String message) throws IOException {
              log.info("Consumed message : {}", message);
              JSONObject messageObj = new JSONObject(message);
              log.info(messageObj.getString("name"));
              log.info(messageObj.getInt("age") + "");
          }
      }
    • 사용자의 요청을 처리하는 클래스 생성(KafkaController)

      package com.bh.kafka_example;
      
      import lombok.RequiredArgsConstructor;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.web.bind.annotation.*;
      
      //데이터를 리턴하는 컨트롤러: REST API를 만들기 위한 어노테이션
      @RestController
      //요청 경로
      @RequestMapping(value = "/kafka")
      @Slf4j
      @RequiredArgsConstructor
      public class KafkaController {
      
          @Autowired
          private KafkaProducer producer;
      
          //POST 방식으로 요청이 오면 처리
          @PostMapping
          @ResponseBody
          public String sendMessage(@RequestParam("name") String name, @RequestParam("age") int age) {
              this.producer.sendMessage(name, age);
              return "success";
          }
      }
    • 실행 : 8080번 포트로 애플리케이션 시작

    • POSTMAN 을 이용해서 http://127.0.0.1:8080/kafka 에 POST 방식으로 name 과 age 의 값을 설정해서 전송

    • 인텔리제이의 콘솔창에서 메시지가 출력되는지 확인
      : KafkaConsumer 가 출력

    • 실제 구현을 할 때는 하나의 애플리케이션이 구독과 게시를 모두 할 수 도 있고 어느 하나만 수행하기도 합니다.

0개의 댓글