Kafka : 3.2.0
SpringBoot : 2.4.3
아파치 카프가 공식 홈페이지 에서 다운로드 (https://kafka.apache.org/downloads)
나는 리눅스에서 wget으로 다운로드, 설치는 압축을 풀면 끝
wget https://downloads.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties
※ 1년 전에 처음 자료 정리하고 테스트할 때는 이상 없었는데, 오늘 다시 테스트 해보니 Consumer에서 "could not be established. Broker may not be available" 오류 메세지 출력 되었다.
→ server.properties에 advertised.listeners에 IP와 포트를 지정해서 문제 해결
./bin/kafka-topics.sh -create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
./bin/kafka-topics.sh --list --zookeeper localhost:2181
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
build.gradle에 아래 의존성 추가
// for apache kafka
implementation 'org.springframework.kafka:spring-kafka'
spring:
kafka:
bootstrap-servers:
- 192.168.0.4:9092
consumer:
# consumer bootstrap servers가 따로 존재하면 설정
# bootstrap-servers: 192.168.0.4:9092
# 식별 가능한 Consumer Group Id
group-id: testgroup
# Kafka 서버에 초기 offset이 없거나, 서버에 현재 offset이 더 이상 존재하지 않을 경우 수행할 작업을 설정
# latest: 가장 최근에 생산된 메시지로 offeset reset
# earliest: 가장 오래된 메시지로 offeset reset
# none: offset 정보가 없으면 Exception 발생
auto-offset-reset: earliest
# 데이터를 받아올 때, key/value를 역직렬화
# JSON 데이터를 받아올 것이라면 JsonDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# producer bootstrap servers가 따로 존재하면 설정
# bootstrap-servers: 3.34.97.97:9092
# 데이터를 보낼 때, key/value를 직렬화
# JSON 데이터를 보낼 것이라면 JsonDeserializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
1) spring.kafka.consumer
- bootstrap-servers
- kafka 클러스터에 대한 초기 연결에 사용할 IP 목록, 쉼표로 구분
- group-id
- Consumer는 Consumer Group이 존재하고, 유일하게 식별할 수 있는 Consumer Group ID
- auto-offset-reset
- Kafka 서버에 초기 offset이 없거나, 서버에 현재 offset이 더 이상 없는 경우 수행할 작업을 작성
- Consumer Group의 Consumer는 메시지를 소비할 때 Topic 내에 Partition에서 다음에 소비할 offset이 어디인지 공유하고 있다. 그런데 오류 등으로 인해 offset 정보가 없어졌을 때 어떻게 offset을 reset 할 것인지를 명시
- latest : 가장 최근에 생산된 메시지로 offset reset
- earliest : 가장 오래된 메시지로 offset reset
- none : offset 정보가 없으면 Exception 발생
- key-deserializer / value-deserializer
- Kafka에서 데이터를 받아올 때 key/value를 역직렬화 한다.
- key, value는 KafkaTemplate의 key, value를 의미한다.
- 메시지가 문자열 데이터이면 StringDeserializer를 사용하고, JSON 데이터면 JsonDeserializer도 가능하다.
2) spring.kafka.producer
- bootstrap-servers
- consumer.bootstrap-server와 동일하지만 producer 전용으로 오버라이딩 하려면 작성한다.
- key-serializer / value-serializer
- Kafka에 데이터를 보낼때 key/value를 직렬화 한다.
KafkaTemplate에 Topic명과 메시지를 전달한다. kafkaTemplate.send() 메서드가 실행되면 Kafka 서버로 메시지가 전송된다.
@Service
public class KafkaProducer {
@Value(value = "${message.topic.name}")
private String topicName;
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
System.out.println(String.format("Produce message : %s", message));
this.kafkaTemplate.send(topicName, message);
}
}
Kafka로 부터 메시지를 받으려면 @KafkaListener 어노테이션을 달아주면 된다.
@Service
public class KafkaConsumer {
@KafkaListener(topics = "${message.topic.name}", groupId = ConsumerConfig.GROUP_ID_CONFIG)
public void consume(String message) throws IOException {
System.out.println(String.format("Consumed message : %s", message));
}
}
@Slf4j
@CrossOrigin("*")
@RestController
@RequestMapping(value = "/kafka/test")
public class SampleController {
private final KafkaProducer producer;
@Autowired
SampleController(KafkaProducer producer) {
this.producer = producer;
}
@PostMapping(value = "/message")
public String sendMessage(@RequestParam("message") String message) {
this.producer.sendMessage(message);
return "success";
}
}
카프카와 스프링부트를 실행시키고 연동상태를 확인한 후 포스트맨에서 컨트롤러의 sendMessage를 호출하면 파라미터로 받은 메시지를 Producer로 카프카에 전송하고 Consumer에서 이를 수신하여 출력되는 것을 볼 수 있다.
※ Producer와 Consumer 설정을 application.properties에 작성했지만 설정을 여러 개로 관리하고 싶다면 Bean으로 구현하는 것이 좋은 방법이다
안녕하세요. 덕분에 초기 설정을 쉽게 했습니다.
한 가지 말씀드릴게 있다면
"4.4 Kafka Consumer 클래스"에서
@KafkaListener(topics = "${message.topic.name}", groupId = ConsumerConfig.GROUP_ID_CONFIG)
이렇게 작성하셨는데요. 이렇게하면 Consumer group id가 application.yml에서 설정한 testgroup이 아니라 group.id로 뜨게 됩니다.
"groupId = ConsumerConfig.GROUP_ID_CONFIG"를 지우니 testgroup으로 나오네요.
"ConsumerConfig.GROUP_ID_CONFIG"는 "group.id" 값을 가진 상수로 보이는데 혹시 이렇게 설정하신 이유가 있을까요?
좋은 글 작성해주셔서 정말 감사합니다.