이 글에서 작성된 대부분의 세팅 방법은 https://youtu.be/QngHCFFsa00 을 기반으로 작성되었음을 알립니다. 다만 저의 경우에는 영상대로 따라하면 안되는 부분도 약간 있어서 영상과 조금씩 내용이 다를 수 있습니다. 그리고 영상에서는 Spring Boot 3.x.x 버전을 사용하지만, 저는 Spring boot 2.7.11 버전을 사용한다는 약간의 차이점이 있습니다.
이론 공부도 중요하지만, 일단 한번 실행을 해보고 나서 공부하면 저는 효과가 좋더군요.
그래서 일단 Kafka 가 정확히 뭔지는 몰라도 Spring boot 로 Kafka 와 통신을 하는
초간단 애플리케이션을 생성해보려 합니다. 지금부터 시작해보겠습니다.
development tools and environment:
- OS :
Window 10 Home Edition
- IDE :
Intellij Ultimate
- framework :
Spring boot v2.7.11
- jdk vendor(version) :
azul-17
- CLI :
PowerShell 7
CLI 가
Powershell
이다 보니 콘솔창 명령 입력 시 줄바꿈을 위해서 ` (백틱) 을 사용합니다.Bash
계열의 CLI 개발자 분들은 해당 문자를\
로 대체해서 작성하시기 바랍니다.
프로젝트 생성 후에 pom.xml 에 가서 추가적인 설정을 하나만 더 해줍니다.
이렇게 안하면 PowerShell
에서 \mvnw spring-boot:run
명령어를 입력하면 에러가 발생합니다.
<properties>
<java.version>17</java.version>
<!-- 아래 설정 추가, 이렇게 안하면 가끔 인코딩 안맞아서 실행이 안되는 경우가 있음 -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
1-1. 생성위치
1-2. 작성법
---
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.2
container_name: broker
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
여기서 9092:9092
포트를 잘 봐두시기 바랍니다.
저희가 이후에 spring-boot 애플리케이션에서 저 포트로 kafka container instance 와 통신을 할 겁니다.
Powershell(콘솔창)을 열고 docker-compose.yml
파일이 있는 경로에서 가서 아래 명령어를 입력하여 docker container 가 실행됩니다.
docker compose up
실행하면 대충 아래와 같은 화면이 나오면서 정상적으로 docker-compose.yml 에 작성했던 2개의 container 가 실행되는 것을 확인할 수 있습니다.
정말 제대로 실행이 됐는지 다시 한번 확인하기 위해서 새로운 Powershell(콘솔창) 을 실행하여 docker ps -a
를 통해서 확인합니다. 저는 아래처럼 나오네요.
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
65bb18ab9cfd confluentinc/cp-kafka:7.3.2 "/etc/confluent/dock…" 41 seconds ago Up 40 seconds 0.0.0.0:9092->9092/tcp broker
09d649a97939 confluentinc/cp-zookeeper:7.3.2 "/etc/confluent/dock…" 42 seconds ago Up 40 seconds 2181/tcp, 2888/tcp, 3888/tcp zookeeper
새로운 Powershell 콘솔창을 열고 아래 명령어를 쳐줍니다.
docker exec broker kafka-topics --bootstrap-server broker:9092 --create --topic quickstart
이후, Docker 로 띄운 Kafak container 에 아래 명령어를 전달합니다.
자세히 보면 kafka consumer 콘솔에 명령어를 내리는 걸 확인할 수 있고,
이때 topic 을 quickstart 로 지정하는 것을 확인 할 수 있습니다.
docker exec --interactive --tty broker `
kafka-console-consumer --bootstrap-server broker:9092 `
--topic quickstart `
--from-beginning
이러면 지금부터 새로운 메세지가 quickstart 라는 토픽에 들어오면 로그가 콘솔 창에 찍히게 됩니다. 참고로 반대로 produce 를 하고 싶으면 아래처럼 명령어를 입력하면 됩니다.
docker exec --interactive --tty broker `
kafka-console-producer --bootstrap-server broker:9092 `
--topic quickstart
저희는 spring boot app 을 통해서 메세지를 kafka 에 전송할 예정입니다.
전체적인 spring boot app 코드는 github 에 올려놨습니다.
https://github.com/CodingToastBread/kafka-setting
이 글에서는 핵심적인 세팅에 대한 것만 기록하겠습니다.
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=events
package toast.bread.springkafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import toast.bread.springkafka.config.KafkaConfigProps;
import toast.bread.springkafka.domain.CustomerVisitEvent;
import java.time.LocalDateTime;
import java.util.UUID;
@SpringBootApplication
@EnableConfigurationProperties
public class SpringKafkaApplication {
@Autowired
private ObjectMapper objectMapper;
public static void main(String[] args) {
SpringApplication.run(SpringKafkaApplication.class, args);
}
@Bean
public ApplicationRunner runner(final KafkaTemplate<String, String> kafkaTemplate, final KafkaConfigProps kafkaConfigProps) throws JsonProcessingException {
final CustomerVisitEvent event = CustomerVisitEvent.builder()
.customerId(UUID.randomUUID().toString())
.dateTime(LocalDateTime.now())
.build();
final String payload = objectMapper.writeValueAsString(event);
return args -> kafkaTemplate.send(kafkaConfigProps.getTopic(), payload);
}
@KafkaListener(topics = "quickstart")
public String listens(final String in) {
System.out.println(in);
return in;
}
}
runner 메소드는 Spring ApplicationContext 가 완전히 초기화되면 자동으로 return 한 Lambda 식이 실행됩니다. 이때 KafkaTemplate 을 통해서 Json 문자열을 Kafka 에 전송하도록 합니다.
반대로 @KafkaListener
는 Kafka 에 quickstart 라는 토픽에 Event 가 오면
해당 메세지를 받는 메소드입니다. 참고로 이걸 하나라도 생성하면 spring application 실행 후 자동 종료되지 않고, 계속해서 Kafka 에서 오는 메세지를 Listening 하기 위해 대기를 합니다.
spring boot 프로젝트 root 에 가서 새로운 콘솔창(PowerShell) 창을 열고
아래처럼 명령어를 입력합니다.
.\mvnw spring-boot:run
pom.xml 에서 plugin 태그를 하나 추가하고, mvn update 를 해준 합니다.
그리고 나서 maven clean 하고 다시 mvn spring-boot:run
명령어를 실행해보세요.
<build>
<plugins>
<!-- 이 아래 plugin 하나를 추가한다. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- 얘는 기존에 있던것임. 건들지 말것 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>