안녕하세요 이번 포스팅에서는 Kafka
서버를 구축하고 Spring Boot
서버와 연동시키는 간단한 실습을 해보겠습니다.
[서버] Kafka 에 대해서 포스팅을 먼저 참고해주세요 ❗️
가장 먼저, Apache Kafka 다운로드 공식 홈페이지에 들어가 Kafka 최신 버전을 다운받습니다.
저는 2023 년 10월에 출시된 3.6.0
버전을 다운받겠습니다.
저는 다운로드 받은 압축 파일을 제 로컬 기준 Desktop/ForCoding
폴더에 위치시키겠습니다.
해당 이미지 처럼 다운로드 받은 Kafka 관련 폴더와 tgz 파일이 하나씩 존재합니다.
이번에는 kafka_2.13-3.6.0
폴더에 들어가서 구성을 살펴보겠습니다 ❗️
여기서 주로 사용하는 폴더는 bin
과 config
폴더입니다.
bin
폴더는 kafka 관련 각종 실행 쉘 커맨드 파일이 있으며, config
폴더에는 설정 관련 파일이 존재합니다.
모든 쉘 커맨드 실행 위치는
kafka_2.13-3.6.0
입니다.
첫번째로, Kakfa Cluster 을 관리하고 분산 처리를 조정하는 역할을 하는 Zookeeper
을 구동시킵니다.
Zookeeper
는 밑에 있는 쉘 커맨드를 통해 구동시키겠습니다.
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
명령어에서 알 수 있듯이, 현재 위치에서 bin
폴더에 존재하는 zookeeper-server-start.sh
쉘 커맨드를 구동하는데 이때, config
폴더에 존재하는 zookeepr.properties
설정 파일을 참조합니다.
Zookeeper
는 기본적으로 2181
포트에서 구동됩니다 ❗️
Kafka 의 핵심인 Kafka Broker
을 구동해보겠습니다.
주의할 점은 Zookeeper
서버가 구동된 상태일때만 Kafka Broker
구동이 가능합니다 👨💻
마찬가지로 Apache Kafka 에서 다운받았던 Kafka 폴더로 제공되는 쉘 커맨드를 통해 구동합니다.
./bin/kafka-server-start.sh ./config/server.properties
명령어 구성이 Zookeeper
서버를 구동할때랑 비슷합니다.
bin
폴더에 존재하는 kafka-server-start.sh
쉘 커맨드를 구동하며, cofig
폴더의 server.properties
설정 파일을 참조합니다.
Kafka Broker 는 기본적으로 9092
포트에서 구동됩니다 ❗️
마지막으로 구동시킨 Kafka Broker
에 my-topic
이라는 topic 을 만들겠습니다.
다운받았던 Kafka 폴더는 topic 관련 쉘 커맨드 또한 제공합니다.
topic 생성
./bin/kafka-topics.sh --create --topic [생성할 topic 이름] --bootstrap-server localhost:9092 --partitions 1
여기서 bootstrap-server
란 Kafka Cluster
접속하기 위한 초기 연결 주소를 나타냅니다.
과거 Kafka 버전에서는 Zookeeper
을 통해 초기 연결을 진행했으나, 최신 버전에서는 bootstrap-server
옵션을 통해 클라이언트가 Kafka Broker
에 직접 연결을 시도합니다.
이를 통해 Kafka Broker
에 연결할 수 있으며, 해당 주소를 통해 Kafka 클라이언트는 Topic 을 생성하거나 데이터를 전송할 수 있습니다.
여기서는 로컬의 9092 포트 즉, 앞에서 생성한 Kafka Broker
연결을 했습니다.
또한, 여기서 Partitons
이란 topic 을 구성하는 partition 의 복제수를 의미합니다.
Kafka 의 고가용성(High Availability)
을 보장합니다 ❗️
: [Kafka] Kafka Topic Replication
topic 삭제
./bin/kafka-topics.sh --delete --topic [삭제할 topic 이름] --bootstrap-server localhost:9092
해당 명령어를 통해 생성한 topic 을 삭제할 수 있습니다.
topic 목록 확인
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
이번에는 생성한 topic 목록을 확인해보겠습니다.
이미지 처럼 생성한 topic 인 my-topic
이 조회되는 것을 확인할 수 있습니다.
topic 정보 확인
./bin/kafka-topics.sh --describe --topic [확인할 topic 이름] --bootstrap-server localhost:9092
해당 명령어를 통해 생성한 topic 의 정보을 확인할 수 있습니다.
topic 이름, topic id , 복제수 등의 정보를 확인할 수 있습니다 👨💻
이렇게 간단하게 쉘 커맨드와 설정 파일을 통해 Zookeeper
서버와 Kafka Broker
을 구동시켰습니다 ❗️
지금까지 Apache Kafka 홈페이지에서 다운로드 받은 Kafka 을 이용해서 Zookeeper 와 Kafka Broker 을 구동시켜보았습니다.
여기서는 Docker
을 이용해서 동일하게 Zookeeper 와 Kafka 을 구동시켜보겠습니다 ❗️
이번에는 메세지(이벤트) 을 생성하는 Producer 와 Consumer 을 쉘 커맨드를 통해 구현해보고 테스트해보겠습니다.
Producer 와 Consumer 을 구동하기 위해서는 앞서 구동시켰던 Zookeeper
와 Kafka Broker
가 구동된 상태여야 합니다 ❗️
./bin/kafka-console-producer.sh --broker-list localhost 9092 --topic [발행할 topic 이름]
동일하게 다운받았던 Kafka 폴더로 제공된 쉘 커맨드로 구동됩니다.
명령어가 정상적으로 처리가 되어 >
입력창이 뜨는 것을 확인할 수 있습니다.
여기에 메세지(이벤트) 을 발행하면 해당 topic(=my-topic) 을 구독한 Consumer 가 해당 메세지(이벤트) 을 읽어올 수 있습니다.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [구독할 topic 이름] --from-beginning
해당 명령어를 통해 my-topic 을 구독하는 Consumer 을 구동시킬 수 있습니다.
이때 --from-beginning
옵션은 Consumer 가 구독하는 topic 에 저장된 모든 이전 메세지까지 읽어오는 기능입니다.
이번 파트에서는 Kafka 와 Spring boot 애플리케이션을 연동해보겠습니다 ❗️
Spring boot 을 통해 Producer
와 Consumer
을 구현할 것이기 때문에 앞서 구현했던 Zookeeper
와 Kafka Broker
가 구동된 상태여야 합니다.
자세한 코드는 여기 에서 확인 가능합니다.
implementation 'org.springframework.kafka:spring-kafka'
spring kafka
프로젝트는 Spring 에서 Apache Kafka 을 쉽게 사용할 수 있도록 추상화하여 제공하는 프로젝트입니다.
Producer,Consumer 애플리케이션의 build.gradle
에 이와 같이 의존성을 추가해줍니다 ❗️
application.yml
server:
port: 8080
spring:
application:
name: producer_application
kafka:
producer:
bootstrap-servers: localhost:9092 # Kafka 클러스터에 대한 초기 연결에 사용할 호스트 : 포트 목록
## serializer 방법은 KafkaProducerConfig 로 설정
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
topic:
name: my-topic
KafkaProducerConfig.Class
@Configuration
public class KafkaProducerConfig {
private final Environment env;
KafkaProducerConfig(Environment environment) {
this.env = environment;
}
public Map<String, Object> producerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
env.getProperty("spring.kafka.producer.bootstrap-servers"));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
, StringSerializer.class);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(this.producerConfig());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(this.producerFactory());
}
}
: #03 Spring boot & Kafka 연동하기
KafkaProducerConfig
클래스는 실제 Kafka Broker 에 메세지를 보내는 역할을 하는 KafkaTemplate
을 구성하고 Bean 으로 등록하는 역할을 합니다.
Producer 설정을 하는 방법에는 application.yml 작성 법과 Bean 등록 법이 있는데 이번 포스팅에서는 둘을 적절히 혼합했습니다.
KafkaProducerController
& KafkaProducerService
저는 API 호출을 통해 KafkaProducerController
로 쿼리 스트링으로 메세지를 전달하고 KafkaProducerService
에서 KafkaTemplate
을 통해 Kafka Broker 로 메세지를 전송하려고 합니다.
// KafkaProducerController
@RequiredArgsConstructor
@RequestMapping("/kafka")
@RestController
public class KafkaProducerController {
private final KafkaProducerService kafkaProducerService;
@PostMapping
public ResponseEntity<Void> sendMassage(
@RequestParam String message) {
this.kafkaProducerService.sendMessageToKafka(message);
return ResponseEntity.ok().build();
}
}
// KafkaProducerService
/* 바라보고 있는 kafka broker topic 메세지 발행 */
@Slf4j
@RequiredArgsConstructor
@Service
public class KafkaProducerService {
@Value("${topic.name}")
private String topicName;
/* Kafka Template 을 이용해 Kafka Broker 전송 */
private final KafkaTemplate<String,String> kafkaTemplate;
public void sendMessageToKafka(String message) {
System.out.printf("Producer Message : %s%n",message);
this.kafkaTemplate.send(topicName,message);
}
}
Consumer application (ConsumerApplication01
, ConsumerApplication02
) 을 구동할 것입니다.
두 application 의 구성(이름 제외)은 완전히 동일함으로 ConsumerApplication01
하나만 살펴보겠습니다 ❗️
application.yml
server:
port: 8090
spring:
application:
name: consumer_application01
kafka:
consumer:
bootstrap-servers: localhost:9092 # Kafka 클러스에 대한 초기 연결에 사용할 호스트 : 포트 목록
group-id: consumer_group01 # Group Id
auto-offset-reset: earliest # offset 이 없거나 더 이상 없는 경우 어떻게 처리할지 전략 결정
## Deserialze 방법은 KafkaConsumerConfig 로 설정
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
KafkaConsumerConfig
@EnableKafka # 필수 어노테이션
@Configuration
public class KafkaConsumerConfig {
private final Environment env;
KafkaConsumerConfig(Environment env) {
this.env = env;
}
@Bean
public Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,env.getProperty("spring.kafka.consumer.bootstrap-servers"));
props.put(ConsumerConfig.GROUP_ID_CONFIG,env.getProperty("spring.kafka.consumer.group-id"));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,env.getProperty("spring.kafka.consumer.auto-offset-reset"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(this.consumerConfig());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(this.consumerFactory());
return factory;
}
}
KafkaConsumerConfig
클래스는 ConcurrentKafkaListenerContainerFactory
클래스를 생성하고 ConsumerFactory
인터페이스를 내부 멤버변수에 Set 하고 Bean 으로 등록하기 위한 클래스입니다.
KafkaConsumerService
// ConsumerApplication01 의 KafkaConsumerService01
@Slf4j
@Service
public class KafkaConsumerService01 {
@KafkaListener(topics = "my-topic", groupId = "consumer_group01")
public void consume(String message) throws IOException {
System.out.printf("Consumed Message : %s%n", message);
}
}
여기서는 해당 Consumer Application 이 구독한 Kafka Broker 의 Topic 의 Partition 에서 message 을 받아와서 출력하는 코드로 구성됩니다.
@KafkaListener
속성에 구독하는 Topic 과 해당 Consumer 가 속하는 Group Id 을 설정 만 해주면 끝납니다 ❗️
먼저 아까 구현한 KafkaProducerController
API 호출을 진행합니다.
쿼리 스트링으로 new message 라는 메세지를 my-topic
에 발행해보겠습니다.
해당 이미지는 실제 구동된 ConsumerApplication01
과 ConsumerApplication02
콘솔에 찍힌 전달된 메세지 결과입니다.
ProducerApplication
부터 KafkaBroker
을 거쳐 두개의 ConsumerApplication
까지 메세지가 잘 전달되는 것을 확인할 수 있습니다.
메세지는 Group 이 같은 Consumer 에게는 라운드 로빈 방식으로 전달되고, Group 이 다른 Consumer 에게는 모두 메세지가 전달됩니다 ❗️
이번 포스팅에서는 간단한 Kafka System
을 구축하고 Spring boot
와 연동해보았습니다 ❗️
비록 아주 기본적인 실습이였지만 이번 포스팅을 통해 기존에 Kafka 에 대해 잘못 이해했던 부분들을 바로 잡을 수 있었던 좋은 시간이였습니다.
현재 Kafka
는 많은 기업에서 사용하고 있는 만큼, 개인적으로 실제 프로젝트에 도입해보고 싶은 생각이있습니다.
긴글 읽어주셔서 감사합니다 🙏
[SpringBoot] 카프카와 스프링부트 연동
🙈[SpringBoot] Kafka 연동하기🐵
Spring Boot에서 Apache Kafka 사용 ... 1/2
Spring Kafka 프로젝트
Producer & Consumer 설정을 위한 class 작성