[서버] Kafka 와 Spring Boot 애플리케이션 연동

최동근·2023년 10월 21일
0

안녕하세요 이번 포스팅에서는 Kafka 서버를 구축하고 Spring Boot 서버와 연동시키는 간단한 실습을 해보겠습니다.
[서버] Kafka 에 대해서 포스팅을 먼저 참고해주세요 ❗️

🎮 카프카(Kafka) 서버 구축하기

1. Apache Kafka 공식 홈페이지에서 Kafka 다운로드 및 구성 살펴보기

가장 먼저, Apache Kafka 다운로드 공식 홈페이지에 들어가 Kafka 최신 버전을 다운받습니다.
저는 2023 년 10월에 출시된 3.6.0 버전을 다운받겠습니다.

저는 다운로드 받은 압축 파일을 제 로컬 기준 Desktop/ForCoding 폴더에 위치시키겠습니다.

해당 이미지 처럼 다운로드 받은 Kafka 관련 폴더와 tgz 파일이 하나씩 존재합니다.
이번에는 kafka_2.13-3.6.0 폴더에 들어가서 구성을 살펴보겠습니다 ❗️

여기서 주로 사용하는 폴더는 binconfig 폴더입니다.
bin 폴더는 kafka 관련 각종 실행 쉘 커맨드 파일이 있으며, config 폴더에는 설정 관련 파일이 존재합니다.

2. Kafka Zookeeper 구동하기

모든 쉘 커맨드 실행 위치는 kafka_2.13-3.6.0 입니다.

첫번째로, Kakfa Cluster 을 관리하고 분산 처리를 조정하는 역할을 하는 Zookeeper 을 구동시킵니다.
Zookeeper 는 밑에 있는 쉘 커맨드를 통해 구동시키겠습니다.

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

명령어에서 알 수 있듯이, 현재 위치에서 bin 폴더에 존재하는 zookeeper-server-start.sh 쉘 커맨드를 구동하는데 이때, config 폴더에 존재하는 zookeepr.properties 설정 파일을 참조합니다.

Zookeeper 는 기본적으로 2181 포트에서 구동됩니다 ❗️

3. Kafka Broker 구동하기

Kafka 의 핵심인 Kafka Broker 을 구동해보겠습니다.
주의할 점은 Zookeeper 서버가 구동된 상태일때만 Kafka Broker 구동이 가능합니다 👨‍💻
마찬가지로 Apache Kafka 에서 다운받았던 Kafka 폴더로 제공되는 쉘 커맨드를 통해 구동합니다.

./bin/kafka-server-start.sh ./config/server.properties

명령어 구성이 Zookeeper 서버를 구동할때랑 비슷합니다.
bin 폴더에 존재하는 kafka-server-start.sh 쉘 커맨드를 구동하며, cofig 폴더의 server.properties 설정 파일을 참조합니다.

Kafka Broker 는 기본적으로 9092 포트에서 구동됩니다 ❗️

4. Topic 생성 및 확인

마지막으로 구동시킨 Kafka Brokermy-topic 이라는 topic 을 만들겠습니다.
다운받았던 Kafka 폴더는 topic 관련 쉘 커맨드 또한 제공합니다.

  • topic 생성

    ./bin/kafka-topics.sh --create --topic [생성할 topic 이름] --bootstrap-server localhost:9092 --partitions 1

    여기서 bootstrap-serverKafka Cluster 접속하기 위한 초기 연결 주소를 나타냅니다.
    과거 Kafka 버전에서는 Zookeeper 을 통해 초기 연결을 진행했으나, 최신 버전에서는 bootstrap-server 옵션을 통해 클라이언트가 Kafka Broker 에 직접 연결을 시도합니다.
    이를 통해 Kafka Broker 에 연결할 수 있으며, 해당 주소를 통해 Kafka 클라이언트는 Topic 을 생성하거나 데이터를 전송할 수 있습니다.
    여기서는 로컬의 9092 포트 즉, 앞에서 생성한 Kafka Broker 연결을 했습니다.

    또한, 여기서 Partitons 이란 topic 을 구성하는 partition 의 복제수를 의미합니다.
    Kafka 의 고가용성(High Availability) 을 보장합니다 ❗️

    : [Kafka] Kafka Topic Replication

  • topic 삭제

    ./bin/kafka-topics.sh --delete --topic [삭제할 topic 이름] --bootstrap-server localhost:9092

    해당 명령어를 통해 생성한 topic 을 삭제할 수 있습니다.

  • topic 목록 확인

    ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

    이번에는 생성한 topic 목록을 확인해보겠습니다.

    이미지 처럼 생성한 topic 인 my-topic 이 조회되는 것을 확인할 수 있습니다.

  • topic 정보 확인

    ./bin/kafka-topics.sh --describe --topic [확인할 topic 이름] --bootstrap-server localhost:9092

    해당 명령어를 통해 생성한 topic 의 정보을 확인할 수 있습니다.

    topic 이름, topic id , 복제수 등의 정보를 확인할 수 있습니다 👨‍💻

이렇게 간단하게 쉘 커맨드와 설정 파일을 통해 Zookeeper 서버와 Kafka Broker 을 구동시켰습니다 ❗️

5. Docker 을 이용해 카프카 서버 구축하기 (번외 편)

지금까지 Apache Kafka 홈페이지에서 다운로드 받은 Kafka 을 이용해서 Zookeeper 와 Kafka Broker 을 구동시켜보았습니다.
여기서는 Docker 을 이용해서 동일하게 Zookeeper 와 Kafka 을 구동시켜보겠습니다 ❗️


🎮 Producer & Consumer 을 구동하고 테스트 진행

이번에는 메세지(이벤트) 을 생성하는 Producer 와 Consumer 을 쉘 커맨드를 통해 구현해보고 테스트해보겠습니다.
Producer 와 Consumer 을 구동하기 위해서는 앞서 구동시켰던 ZookeeperKafka Broker 가 구동된 상태여야 합니다 ❗️

1. Producer 구동하기

./bin/kafka-console-producer.sh --broker-list localhost 9092 --topic [발행할 topic 이름]

동일하게 다운받았던 Kafka 폴더로 제공된 쉘 커맨드로 구동됩니다.

명령어가 정상적으로 처리가 되어 > 입력창이 뜨는 것을 확인할 수 있습니다.
여기에 메세지(이벤트) 을 발행하면 해당 topic(=my-topic) 을 구독한 Consumer 가 해당 메세지(이벤트) 을 읽어올 수 있습니다.

2. Consumer 구동하기

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [구독할 topic 이름] --from-beginning

해당 명령어를 통해 my-topic 을 구독하는 Consumer 을 구동시킬 수 있습니다.
이때 --from-beginning 옵션은 Consumer 가 구독하는 topic 에 저장된 모든 이전 메세지까지 읽어오는 기능입니다.

3. Producer 을 통해 메세지을 발행하고 Consumer 을 통해 메세지 읽기

  • Producer로 my-topic 에 메세지 발행
  • Consumer로 my-topic 으로부터 메세지 읽기

🎮 Kafka 와 Spring boot 연동하기

이번 파트에서는 Kafka 와 Spring boot 애플리케이션을 연동해보겠습니다 ❗️
Spring boot 을 통해 ProducerConsumer 을 구현할 것이기 때문에 앞서 구현했던 ZookeeperKafka Broker 가 구동된 상태여야 합니다.
자세한 코드는 여기 에서 확인 가능합니다.

1. Spring Kafka 의존성 추가

implementation 'org.springframework.kafka:spring-kafka'

spring kafka 프로젝트는 Spring 에서 Apache Kafka 을 쉽게 사용할 수 있도록 추상화하여 제공하는 프로젝트입니다.
Producer,Consumer 애플리케이션의 build.gradle 에 이와 같이 의존성을 추가해줍니다 ❗️

2. Producer application 구성하기(8080 port)

  • application.yml
server:
  port: 8080


spring:
  application:
    name: producer_application

  kafka:
    producer:
      bootstrap-servers: localhost:9092 # Kafka 클러스터에 대한 초기 연결에 사용할 호스트 : 포트 목록
       ## serializer 방법은 KafkaProducerConfig 로 설정
#      key-serializer: org.apache.kafka.common.serialization.StringSerializer
#      value-serializer: org.apache.kafka.common.serialization.StringSerializer

topic:
  name: my-topic
  • KafkaProducerConfig.Class
@Configuration
public class KafkaProducerConfig {

    private final Environment env;

    KafkaProducerConfig(Environment environment) {
        this.env = environment;
    }


    public Map<String, Object> producerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                env.getProperty("spring.kafka.producer.bootstrap-servers"));

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
                , StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
                , StringSerializer.class);
        return props;
    }


    public ProducerFactory<String, String> producerFactory() {
       return new DefaultKafkaProducerFactory<>(this.producerConfig());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(this.producerFactory());
    }
}

: #03 Spring boot & Kafka 연동하기

KafkaProducerConfig 클래스는 실제 Kafka Broker 에 메세지를 보내는 역할을 하는 KafkaTemplate 을 구성하고 Bean 으로 등록하는 역할을 합니다.

Producer 설정을 하는 방법에는 application.yml 작성 법과 Bean 등록 법이 있는데 이번 포스팅에서는 둘을 적절히 혼합했습니다.

  • KafkaProducerController & KafkaProducerService

저는 API 호출을 통해 KafkaProducerController 로 쿼리 스트링으로 메세지를 전달하고 KafkaProducerService 에서 KafkaTemplate 을 통해 Kafka Broker 로 메세지를 전송하려고 합니다.

// KafkaProducerController
@RequiredArgsConstructor
@RequestMapping("/kafka")
@RestController
public class KafkaProducerController {

    private final KafkaProducerService kafkaProducerService;

    @PostMapping
    public ResponseEntity<Void> sendMassage(
            @RequestParam String message) {
        this.kafkaProducerService.sendMessageToKafka(message);
        return ResponseEntity.ok().build();
    }
}
// KafkaProducerService
/* 바라보고 있는 kafka broker topic 메세지 발행 */
@Slf4j
@RequiredArgsConstructor
@Service
public class KafkaProducerService {

    @Value("${topic.name}")
    private String topicName;

    /* Kafka Template 을 이용해 Kafka Broker 전송 */

    private final KafkaTemplate<String,String> kafkaTemplate;

    public void sendMessageToKafka(String message) {
        System.out.printf("Producer Message : %s%n",message);
        this.kafkaTemplate.send(topicName,message);
    }
}

3. Consumer application 구성하기

Consumer application (ConsumerApplication01, ConsumerApplication02) 을 구동할 것입니다.
두 application 의 구성(이름 제외)은 완전히 동일함으로 ConsumerApplication01 하나만 살펴보겠습니다 ❗️

  • application.yml
server:
  port: 8090

spring:
  application:
    name: consumer_application01

  kafka:
    consumer:
      bootstrap-servers: localhost:9092 # Kafka 클러스에 대한 초기 연결에 사용할 호스트 : 포트 목록
      group-id: consumer_group01 # Group Id
      auto-offset-reset: earliest # offset 이 없거나 더 이상 없는 경우 어떻게 처리할지 전략 결정
       ## Deserialze 방법은 KafkaConsumerConfig 로 설정
#      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  • KafkaConsumerConfig
@EnableKafka # 필수 어노테이션
@Configuration
public class KafkaConsumerConfig {

    private final Environment env;

    KafkaConsumerConfig(Environment env) {
        this.env = env;
    }

    @Bean
    public Map<String, Object> consumerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,env.getProperty("spring.kafka.consumer.bootstrap-servers"));
        props.put(ConsumerConfig.GROUP_ID_CONFIG,env.getProperty("spring.kafka.consumer.group-id"));
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,env.getProperty("spring.kafka.consumer.auto-offset-reset"));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        return props;
    }
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(this.consumerConfig());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String,String> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(this.consumerFactory());
        return factory;
    }
}

KafkaConsumerConfig 클래스는 ConcurrentKafkaListenerContainerFactory 클래스를 생성하고 ConsumerFactory 인터페이스를 내부 멤버변수에 Set 하고 Bean 으로 등록하기 위한 클래스입니다.

  • KafkaConsumerService
// ConsumerApplication01 의 KafkaConsumerService01
@Slf4j
@Service
public class KafkaConsumerService01 {

    @KafkaListener(topics = "my-topic", groupId = "consumer_group01")
    public void consume(String message) throws IOException {
        System.out.printf("Consumed Message : %s%n", message);
    }
}

여기서는 해당 Consumer Application 이 구독한 Kafka Broker 의 Topic 의 Partition 에서 message 을 받아와서 출력하는 코드로 구성됩니다.
@KafkaListener 속성에 구독하는 Topic 과 해당 Consumer 가 속하는 Group Id 을 설정 만 해주면 끝납니다 ❗️

4. 결과 테스트

먼저 아까 구현한 KafkaProducerController API 호출을 진행합니다.
쿼리 스트링으로 new message 라는 메세지를 my-topic 에 발행해보겠습니다.

해당 이미지는 실제 구동된 ConsumerApplication01ConsumerApplication02 콘솔에 찍힌 전달된 메세지 결과입니다.
ProducerApplication 부터 KafkaBroker 을 거쳐 두개의 ConsumerApplication 까지 메세지가 잘 전달되는 것을 확인할 수 있습니다.

메세지는 Group 이 같은 Consumer 에게는 라운드 로빈 방식으로 전달되고, Group 이 다른 Consumer 에게는 모두 메세지가 전달됩니다 ❗️


🎮 마무리

이번 포스팅에서는 간단한 Kafka System 을 구축하고 Spring boot 와 연동해보았습니다 ❗️
비록 아주 기본적인 실습이였지만 이번 포스팅을 통해 기존에 Kafka 에 대해 잘못 이해했던 부분들을 바로 잡을 수 있었던 좋은 시간이였습니다.
현재 Kafka 는 많은 기업에서 사용하고 있는 만큼, 개인적으로 실제 프로젝트에 도입해보고 싶은 생각이있습니다.
긴글 읽어주셔서 감사합니다
🙏


참고

[SpringBoot] 카프카와 스프링부트 연동
🙈[SpringBoot] Kafka 연동하기🐵
Spring Boot에서 Apache Kafka 사용 ... 1/2
Spring Kafka 프로젝트
Producer & Consumer 설정을 위한 class 작성

profile
비즈니스가치를추구하는개발자

0개의 댓글