🔥 "kafka 완전 입문자" 가 spring boot로 어떻게 pub/sub 하는지에 초첨이 맞춰진 게시글 입니다
1장에서 실행한 카프카 컨테이너를 스프링 부트에서 활용해 보자, Spring boot with intelliJ IDE and Kafka setting, 모든 자세한 사항은 여기에서 찾아볼 수 있다
server.address=localhost
server.port=8080
# API 호출시, SQL 문을 콘솔에 출력한다.
spring.jpa.show-sql=true
# DDL 정의시 데이터베이스의 고유 기능을 사용합니다.
# ex) 테이블 생성, 삭제 등
spring.jpa.generate-ddl=true
# MySQL 을 사용할 것.
spring.jpa.database=mysql
# MySQL 설정
spring.datasource.url=jdbc:mysql://localhost:3306/study?useSSL=false&characterEncoding=UTF-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=여길바꾸세요!
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# MySQL 상세 지정
spring.jpa.database-platform=org.hibernate.dialect.MySQL5InnoDBDialect
############################## Kafka ##############################
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=kafka-demo
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.max-poll-records=1000
spring.kafka.template.default-topic=kafka-demo
spring.kafka.bootstrap-servers: 카프카서버 정보, 기본적으로 9092 포트를 사용한다.
spring.kafka.consumer.group-id: 컨슈머의 그룹id
spring.kafka.consumer.enable-auto-commit: 데이터를 어디까지 읽었다는 offset을 주기적으로 저장할지 여부
spring.kafka.consumer.auto-offset-reset: offset에 오류가 있을 경우 어디서부터 다시 할지 여부
ealiest
: 맨처음부터 다시 읽는다latest
: 이전꺼는 무시하고, 이제부터 들어오는 데이터부터 읽기 시작한다 spring.kafka.producer.key-serializer: 데이터를 kafka로 전달할때 사용하는 Key Encoder ClassStringSerializer는 문자열 형태의 데이터에만 사용 가능
spring.kafka.consumer.key-deserializer: 데이터를 kafka에서 받아서 사용하는 Key Decoder ClassStringDeserializer는 문자열 형태의 데이터에만 사용 가능
spring.kafka.producer.value-serializer: 데이터를 kafka로 전달할때 사용하는 Value Encoder ClassStringSerializer는 문자열 형태의 데이터에만 사용 가능
spring.kafka.consumer.value-deserializer: 데이터를 kafka에서 받아서 사용하는 Value Decoder ClassStringDeserializer는 문자열 형태의 데이터에만 사용 가능
spring.kafka.consumer.max-poll-records: consumer가 한번에 가져오는 message 갯수
spring.kafka.template.default-topic: 기본 설정 topic name
package com.kafka.demo.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final String TOPIC = "kafka-demo";
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
System.out.println(String.format("Produce message : %s", message));
this.kafkaTemplate.send(TOPIC, message);
}
}
this.kafkaTemplate.send(TOPIC, message);
를 통해서 TOPIC에 해당하는 message를 전달할 것이다. package com.kafka.demo.service;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "kafka-demo", groupId = "kafka-demo")
public void consume(String message) throws IOException {
System.out.println(String.format("Consumed message : %s", message));
}
}
@KafkaListener
와 같다. 이 부분은 데브원영 님이 kafka의 대가! package com.kafka.demo.controller;
import com.kafka.demo.service.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final KafkaProducer producer;
@Autowired
KafkaController(KafkaProducer producer) {
this.producer = producer;
}
@PostMapping
public String sendMessage(@RequestParam("message") String message) {
this.producer.sendMessage(message);
return "success";
}
}
2021-07-18 21:20:03.012 WARN 13819 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-foo-1, groupId=foo] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2021-07-18 21:20:03.855 WARN 13819 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-foo-1, groupId=foo] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2021-07-18 21:20:03.856 WARN 13819 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-foo-1, groupId=foo] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2021-07-18 21:20:04.746 WARN 13819 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-foo-1, groupId=foo] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
Produce message : Hello Kafka World!
와 같이 우리가 설정한 로그가 찍혀있는 것을 확인할 수 있다.