[Spring] Spring (boot, auto configuration) + Kafka 예제

최진민·2021년 11월 9일
0

Kafka

목록 보기
3/3
post-thumbnail

📠 1. 설정

  • build.gradle

    plugins {
        id 'org.springframework.boot' version '2.5.6'
        id 'io.spring.dependency-management' version '1.0.11.RELEASE'
        id 'java'
    }
    
    group = 'me.jinmin'
    version = '0.0.1-SNAPSHOT'
    sourceCompatibility = '11'
    
    configurations {
        compileOnly {
            extendsFrom annotationProcessor
        }
    }
    
    repositories {
        mavenCentral()
    }
    
    dependencies {
        implementation 'org.springframework.boot:spring-boot-starter-web'
        implementation 'org.springframework.kafka:spring-kafka'
        compileOnly 'org.projectlombok:lombok'
        annotationProcessor 'org.projectlombok:lombok'
        testImplementation 'org.springframework.boot:spring-boot-starter-test'
        testImplementation 'org.springframework.kafka:spring-kafka-test'
    }
    
    test {
        useJUnitPlatform()
    }
    
  • application.yml

    spring:
      kafka:
        consumer:
          bootstrap-servers: localhost:9092
          group-id: foo
          auto-offset-reset: earliest
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        producer:
          bootstrap-servers: localhost:9092
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
    • spring.kafka.consumer
      • bootstrap-servers
        • Kafka 연결에 사용될 호스트:포트
        • 글로벌 설정도 있지만 consumer 전용으로 오바리이딩
      • group-id
        • Consumer는 유일하게 식별 가능한 그룹이 존재하는데 이를 작성
      • auto-offset-reset
        • Kafka 초기 offset이 없거나, 더이상 offset이 없을때 수행
          • latest : 가장 최근에 생산된 메시지 offset reset
          • earliest : 가장 오래된 메시지로 offset reset
          • none : offset 정보없을때, Exception 발생
      • key-deserializer / value-deserializer
        • Kafka에서 데이터를 수신할 때, key / value 역직렬화
        • KafkaTemplate과 매핑
        • (Json 데이터일 경우, JsonDeserializer)
    • spring.kafka.producer
      • bootstrap-servers
        • consumer와 동일
      • key-serializer / value-serializer
        • Kafka에서 데이터를 송신할 때, key / value 직렬화
  • docker-compose.yml

    version: '2'
    services:
      zookeeper:
        image: wurstmeister/zookeeper
        container_name: zookeeper
        ports:
          - "2181:2181"
      kafka:
        image: wurstmeister/kafka:2.12-2.5.0
        container_name: kafka
        ports:
          - "9092:9092"
        environment:
          KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock


🚘 2. 구현

  • Producer

    package me.jinmin.springkafkaautoconfig;
    
    import lombok.RequiredArgsConstructor;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    @RequiredArgsConstructor
    public class KafkaProducer {
        private static final String TOPIC = "jinmin";
        private final KafkaTemplate<String, String> kafkaTemplate;
    
        public void sendMessgae(String msg) {
            System.out.println(String.format("Produce message : %s", msg));
            kafkaTemplate.send(TOPIC, msg);
        }
    }
    • KafkaTemplate을 통해 해당되는 TOPIC에 메시지를 생산한다.
  • Consumer

    package me.jinmin.springkafkaautoconfig;
    
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Service;
    
    import java.io.IOException;
    
    @Service
    public class KafkaConsumer {
    
        @KafkaListener(
                topics = "jinmin",
                groupId = "foo"
        )
        public void listen(String msg) throws IOException {
            System.out.println(String.format("Consumed message : %s", msg));
        }
    }
    • KafkaListener를 통해 topicgroupId에 해당되는 메시지를 소비한다.
  • Controller

    package me.jinmin.springkafkaautoconfig;
    
    import lombok.RequiredArgsConstructor;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @RequiredArgsConstructor
    public class Controller {
    
        private final KafkaProducer kafkaProducer;
    
        @PostMapping("/kafka")
        public String sendMsg(@RequestParam("message") String message) {
            kafkaProducer.sendMessgae(message);
            return "Successful Sending!!";
        }
    }


⛱ 3. 테스트

  • 애플리케이션을 구동하기 전, 기존에 작성한 docker-compose.yml 설정을 통해 DockerZookeeperKafka 컨테이너를 먼저 실행하자.
    • 위와 같이 실행되면 연결 성공!
  • 연동을 통해 토픽을 제대로 생성했는지 확인
    docker container exec -it [container_name] bash
    • jinmin이라는 토픽을 제대로 생성
  • 메시지 생산 / 소비
    • 메시지를 URL을 통해 보내기 전, 카프카가 이를 소비하는 지 확인할 수 있도록 명령어를 통해 Consumer를 구동시켜놓자
    • 과정
    • 실제 Run
profile
열심히 해보자9999

0개의 댓글