카프카(Kafka)와 스프링 부트 - springboot & kafka로 simple pub, sub 만들기

정현우·2021년 7월 18일
5

🔥 "kafka 완전 입문자" 가 spring boot로 어떻게 pub/sub 하는지에 초첨이 맞춰진 게시글 입니다

스프링 부트와 카프카

1장에서 실행한 카프카 컨테이너를 스프링 부트에서 활용해 보자, Spring boot with intelliJ IDE and Kafka setting, 모든 자세한 사항은 여기에서 찾아볼 수 있다

1. 스프링 부트 프로젝트 세팅

1) spring initianlizr

  • Dependencies에 우선 Kafka, Mysql, Lombok, JPA(없어도 무방은 할 것 같으나 추후 추가 테스트를 위해)를 추가했다.
  • 빌드는 Gradle로 설정했다. 메이븐 싫다! 😀

2) Dependencies

  • Dependencies 를 위해, 특히 DB (JPA포함) 관련 설정들 해주기

  • 위 사진에서 참조할 것은 왼쪽 경로만 참조하면 된다.
  • application properties에 하단 정보를 DB password부분만 변경해서 붙여넣자
server.address=localhost
server.port=8080

# API 호출시, SQL 문을 콘솔에 출력한다.
spring.jpa.show-sql=true

# DDL 정의시 데이터베이스의 고유 기능을 사용합니다.
# ex) 테이블 생성, 삭제 등
spring.jpa.generate-ddl=true

# MySQL 을 사용할 것.
spring.jpa.database=mysql

# MySQL 설정
spring.datasource.url=jdbc:mysql://localhost:3306/study?useSSL=false&characterEncoding=UTF-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=여길바꾸세요!
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# MySQL 상세 지정
spring.jpa.database-platform=org.hibernate.dialect.MySQL5InnoDBDialect

############################## Kafka ##############################

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=kafka-demo
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.max-poll-records=1000
spring.kafka.template.default-topic=kafka-demo

kafka관련 설정들은 아래와 같다.

  1. spring.kafka.bootstrap-servers: 카프카서버 정보, 기본적으로 9092 포트를 사용한다.

  2. spring.kafka.consumer.group-id: 컨슈머의 그룹id

  3. spring.kafka.consumer.enable-auto-commit: 데이터를 어디까지 읽었다는 offset을 주기적으로 저장할지 여부

  4. spring.kafka.consumer.auto-offset-reset: offset에 오류가 있을 경우 어디서부터 다시 할지 여부

    • ealiest: 맨처음부터 다시 읽는다
    • latest: 이전꺼는 무시하고, 이제부터 들어오는 데이터부터 읽기 시작한다
  5. spring.kafka.producer.key-serializer: 데이터를 kafka로 전달할때 사용하는 Key Encoder ClassStringSerializer는 문자열 형태의 데이터에만 사용 가능

  6. spring.kafka.consumer.key-deserializer: 데이터를 kafka에서 받아서 사용하는 Key Decoder ClassStringDeserializer는 문자열 형태의 데이터에만 사용 가능

  7. spring.kafka.producer.value-serializer: 데이터를 kafka로 전달할때 사용하는 Value Encoder ClassStringSerializer는 문자열 형태의 데이터에만 사용 가능

  8. spring.kafka.consumer.value-deserializer: 데이터를 kafka에서 받아서 사용하는 Value Decoder ClassStringDeserializer는 문자열 형태의 데이터에만 사용 가능

  9. spring.kafka.consumer.max-poll-records: consumer가 한번에 가져오는 message 갯수

  10. spring.kafka.template.default-topic: 기본 설정 topic name


2. Controller & Service logic

1) 프로젝트 구성도와 service

  • 프로젝트 구성도는 위와 같고, restAPI 아키텍처를 따를 것이다. controller / service 패키지를 src > main > java > com.kafka.demo 하위에 만들어 주자.
package com.kafka.demo.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {
    private static final String TOPIC = "kafka-demo";
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        System.out.println(String.format("Produce message : %s", message));
        this.kafkaTemplate.send(TOPIC, message);
    }
}
  • KafkaProducer는 위와 같다. TOPIC은 properties에서 설정한 토픽으로 설정해줘야 한다.
  • 아 물론 당연히 저 토픽 부분이 앞으로는 request에서 header, body(data), params 등 으로 가변적인 요소가 될 것이다.
  • kafkaTemplate API를 살펴보는 것도 좋은 방법이다.
  • 핵심은 producer는 this.kafkaTemplate.send(TOPIC, message); 를 통해서 TOPIC에 해당하는 message를 전달할 것이다.
package com.kafka.demo.service;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "kafka-demo", groupId = "kafka-demo")
    public void consume(String message) throws IOException {
        System.out.println(String.format("Consumed message : %s", message));
    }
}
  • KafkaConsumer는 위와 같다. topics와 groupId는 우선 properties에서 설정한 것으로 해주자.
  • 어노테이션이 @KafkaListener 와 같다. 이 부분은 데브원영 님이 kafka의 대가!

2) Controller 기본 세팅 완성

package com.kafka.demo.controller;

import com.kafka.demo.service.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
    private final KafkaProducer producer;

    @Autowired
    KafkaController(KafkaProducer producer) {
        this.producer = producer;
    }

    @PostMapping
    public String sendMessage(@RequestParam("message") String message) {
        this.producer.sendMessage(message);

        return "success";
    }
}
  • 마지막으로 controller는 위와 같다.
  • 특별한 것 전혀없이, POST 형식의 request를 message 부분을 data로 받는 것이고, success라는 string만 return한다.

3. Build와 Deploy후 실제 test하기

1) gradle build & war

2) tomcat server config & run

  • 톰켓 서버 러닝을 성공한다면 다음과 같다 :) 왜냐면 아무 설정도 안했으니까 위와 같이 뜬다!

3) 이게 뭐에요,, 서버 에러 로그 계속 찍히잖아요!

2021-07-18 21:20:03.012  WARN 13819 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-foo-1, groupId=foo] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2021-07-18 21:20:03.855  WARN 13819 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-foo-1, groupId=foo] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2021-07-18 21:20:03.856  WARN 13819 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-foo-1, groupId=foo] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2021-07-18 21:20:04.746  WARN 13819 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-foo-1, groupId=foo] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
  • Kafka도 당연히 서버가 필요하고, '브로커'가 Producer되는 메신저를 다 받아서 전달한다. 조금 더 자세히 알아보고 싶으면 클릭
  • 브로커 서버를 러닝해라! 1장에서 브로커 Dokcer Container를 기억할 것 이다!

  • 이번엔 docker에서 핫하게 밀어주시는 App으로 조져보자 (mac기준이다)
  • 그냥,, container 러닝만 하면된다 ^^,, container를 제거한 사람은 docker-compose up -d 를 통해 (물론 yml 파일이 있는 곳에서 ^^) 다시 러닝해주면 된다.

4) 다시 서버 실행, 그리고 실제 test

  • 뭐 도구는 알아서 선택하고, 러닝한 서버에 위와 같이 request를 보내보자!
  • 콘솔창에서 확인해보면, Produce message : Hello Kafka World! 와 같이 우리가 설정한 로그가 찍혀있는 것을 확인할 수 있다.
  • 위에서 우리가 러닝한 컨테이너 (Kafka 요놈)에 접근해보자! 어떻게요? 라면 클릭

  • container로 접근해서 consumer를 실행시키는 shell을 통해서 'kafka-demo'에 해당하는 토픽을 'from-beginning'으로 찍어보니 우리가 보낸 메시지, Hello Kafka World를 볼 수 있다.
  • 실시간으로 직접 해보자! 생각보다 퍼포먼스가 굉장히 빠르다는 것을 바로 체감할 수 있다!!

4. 마무리

1) 다시 처음으로 돌아가서

profile
도메인 중심의 개발, 깊이의 가치를 이해하고 “문제 해결” 에 몰두하는 개발자가 되고싶습니다. 그러기 위해 항상 새로운 것에 도전하고 노력하는 개발자가 되고 싶습니다!

0개의 댓글