Kafka를 스터디하는 김에 Spring Application과 이를 연동하여 메세지를 주고 받는 애플리케이션을 간단하게 작성해보려 한다.
docker-compose.yml
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.5.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
KAFKA_ADVERTISED_HOST_NAME은 카프카 클라이언트와 브로커가 통신을 하기 위해 처음으로 client가 부여받는 ip로 kafka client가 broker에게 통신 요청을 하고 이를 부여받으면 해당 ip 연결하여 사용 여부를 검사할 수 있다.
근데 만약 하나의 서버에 kafka client와 broker가 함께 설치 되어있다면 굳이 adv_host를 private ip로 부여할 필요 없이 localhost로 지정하면 그만이다.
나는 가상머신에 client와 broker를 1대 씩 설치하여 테스트할 것이기 때문에 일단 이 옵션을 127.0.0.1로 설정할 것이다.
$ docker-compose up -d
.
.
.
.
.
Creating kafka ... done
Creating zookeeper ... done
다음 명령어로 확인해보면 컨테이너가 잘 실행된 것을 볼 수 있다.
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
7c829867de43 wurstmeister/kafka:2.12-2.5.0 "start-kafka.sh" 2 hours ago Up 2 hours 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
a03d6d6df04f wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 hours ago Up 2 hours 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
다음은 kafka 서버의 설정을 약간 수정해줄 것이다. console로 테스트할 때는 딱히 문제가 없지만 여러 테스트를 용이하게 하기 위해서이다.
먼저, kafka 컨테이너로 접속한다.
$ docker exec -it kafka /bin/bash
$ bash# cd /opt/kafka/config
$ bash# vi server.properties
server.properteis
먼저 Socket Server Setting 부분에 listeners와 advertised.listeners의 주석 처리를 해제하고 후자에는 공인 ip를 작성한다.
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://공인ip:9092
delete.topic.enable=true
auto.create.topics.enable=true
delete.topic.enable
설정은 토픽을 삭제 할 수 있게 하기 위함이고 auto.create.topics.enable
설정은 토픽이 존재하지 않은 상태에서 토픽을 찾으려 할 때 자동으로 이를 생성해주는 옵션이다.
자세한 것은 여기에 기술해두었다.
그리고 이제 exit로 컨테이너를 나와 다시 재기동 시켜주면 된다. restart를 쓰면 kafka 버그 때문에 바로 안 올라가는 경우가 있으니 당황하지말고 stop 후 좀 대기하여 start로 재기동해보자.
$ docker stop kafka
$ docker start kafka
먼저 세팅한 kafka 시스템이 잘 동작하는지 테스트 해보자.
일단 메세지를 보낼 토픽을 생성해보자.
$ bash# cd /opt/kafka/bin
$ bash# kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic exam-topic
위 명령어는 토픽의 각 파티션을 복제하는 replication-factor를 1개 파티션 수를 1개로 지정하여 토픽을 생성하겠다는 의미이다.
또, 카프카 분산 시스템은 커맨드라인으로 메세지를 전송할 수 있는 유틸을 제공한다. kafka-console-producer와 consumer로 메세지가 오고가는 것을 확인할 수 있다.
먼저 Terminal1에서 producer를 실행하여 메세지를 입력하고
Test Terminal1
$ bash# kafka-console-producer.sh --topic exam-topic --broker-list localhost:9092
> This is a msg;
다른 터미널을 하나 켜서 consumer를 실행하면 전송한 메세지가 들어오는 것을 확인할 수 있다.
Test Terminal2
$ bash# kafka-console-consumer.sh --topic exam-topic --bootstrap-server localhost:9092 --from-beginning
This is a msg;
이제 Kafka가 제대로 동작하는 것을 확인했으니 메세지를 전송하고 받는 application을 만들어보자.
gradle 6.8.3
spring boot 2.7.1
당연히 먼저 kafka 연동을 위한 의존성을 추가해야한다. 간단한 로그를 위해 lombok도 추가했다.
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: foo
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
그 다음은 application과 kafka의 연동 설정을 위해 yml 파일을 작성한다.
spring.kafka
bootstrap-servers
consumer
group-id
consumer group을 구별하기 위한 유니크한 id 작성
auto-offset-reset
key-deserializer/value-desrializer
producer
데이터 전송을 위한 KafkaProducerBean을 만든다. 그리고 이 ProducerFactory를 이용하는 KafkaTemlate을 통해 데이터를 send하면 된다.
만약 여러 데이터 타입을 전송하고 싶을 때는 Producer Bean을 여러 개 정의하면 될 듯 하다.
참고로 환경변수 파일을 통해 Kafka를 연동할 때 직렬화 방식을 지정하지 않으면 SpringSerializer가
default
로 설정된다.
하지만, java bean으로 설정 시 key, value의 직렬화 정보를 지정하지 않으면 default 값이 자동으로 구성되지 않으니 에러가 발생하지 않도록 조심하자.
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String,Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory(configs);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaProducer.java
Configuration 파일에서 정의한 kafkaTemplate 빈을 이용해 “exam-topic”이라는 이름의 토픽을 전송할 수 있도록 메서드를 정의한다.
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
private static final String TOPIC = "exam-topic";
@Autowired
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
log.info("Produce message : {}", message);
this.kafkaTemplate.send(TOPIC, message);
}
}
KafkaConsumer.java
다음은 exam-topic이라는 이름의 토픽이 전달됐다면 해당 메세지들을 가져올 수 있게끔 하는 KafkaListener를 달아 정의해주면 된다.
@Service
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "exam-topic", groupId = "foo")
public void consume(String message) throws IOException {
log.info("Consumed message : {}", message);
}
}
KafkaController.java
@RestController
@RequestMapping(value = "/kafka")
@Slf4j
public class KafkaController {
private final KafkaProducer producer;
@Autowired
KafkaController(KafkaProducer producer){
this.producer = producer;
}
@PostMapping
@ResponseBody
public String sendMessage(@RequestParam String message) {
log.info("message : {}", message);
this.producer.sendMessage(message);
return "success";
}
}
이제 애플리케이션을 구동시켜 작성한 코드가 잘 돌아가는지 확인해보자.
먼저 terminal로 kafka-console-consumer.sh 을 동작시키고
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic exam-topic --from-beginning
api를 호출해본다.
그러면 terminal에 consumer가 끌어온 메세지 내용들이 쭉 찍히고
애플리케이션에서도 찍은 로그에 주고 받은 메세지 내용이 찍힌 것을 확인할 수 있다.
More Kafka3-1. 카프카 프로듀서 · 앞으로 점프
SpringBoot Kafka 연동하기 :: victolee
#03 Spring boot & Kafka 연동하기