Kafka에서 Zookeeper에 의존성을 제거하고 Apache Kafka 자체적으로 분산합의 알고리즘을 책정하는 방식으로 간다고 한다. (KRAFT 방식)
bitnami/Kafka 특정 버전 이상부터 (아마 3.2) Production 환경에서도 쓸 수 있을만큼 지원을 해준다고 한다.
현재 내 EC2서버에 docker를 설치하고 compose file을 작성해 bitnami/kafka를 내 로컬 환경 Spring에서 사용하려고 한다.
우선 Ubuntu 환경에 docker와 docker Compose가 깔려있고 hub에서 bitnami/kafka 3.4 버전을 받았다는 전제하에 시작한다.
# Docker 설치
sudo apt-get install -y docker-ce
# Download the latest version of Docker Compose
sudo curl -L "https://github.com/docker/compose/releases/latest/download/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
# Make the binary executable
sudo chmod +x /usr/local/bin/docker-compose
# Verify the installation
docker-compose --version
# kafka image
docker pull bitnami/kafka:3.4
Docker Compose를 작성해 내가 원하는 세팅으로 container를 실행한다
# 텍스트 편집기 이동
nano docker-compose.yml
# 내부 내용 작성
version: "3"
services:
kafka:
image: 'bitnami/kafka:3.4'
container_name: kafka-server
ports:
- '9094:9094'
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://현재 공개 IP(EC2 IP):9094
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@127.0.0.1:9093
- ALLOW_PLAINTEXT_LISTENER=yes
가장 중요한 부분이 KAFKA_CFG_LISTENERS, KAFKA_CFG_ADVERTISED_LISTENERS인데
Docker에 network 설정시 내부적으로 사용할 9092포트가 있고 외부 PC에서 접속할 EXTERNAL의 경우 현재 kafka가 설치된 PC의 IP주소와 사용할 포트를 적어주고 컨테이너 ports를 설정한다.
(기본적으로 해당 PC의 인바운드 규칙을 수정해 9094포트에 대한 접근을 허용해준다.)
MQ로 사용할 Topic을 생성하고 producer 이용해 메세지를 보내보자.
sudo docker exec -it kafka-server kafka-console-producer.sh --producer.config /opt/bitnami/kafka/config/producer.properties --bootstrap-server 현재 공개 IP(EC2 IP):9094 --topic test
콘솔에 메세지를 입력하고 consumer에서 확인해본다.
sudo docker exec -it kafka-server kafka-console-consumer.sh --consumer.config /opt/bitnami/kafka/config/consumer.properties --bootstrap-server 현재 공개 IP(EC2 IP):9094 --topic test --from-beginning
EC2내부적으로 확인 후 Spring의 KafkaProducerConfig.class를 작성한다.
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"현재 공개 IP(EC2 IP):9094");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String,String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
빈으로 등록한 KafkaTemplate를 이용해 맛있게 잘 사용해주면된다.
@Service
@Slf4j
@Data
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public MessageDto send(String topic, MessageDto messageDto){
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try{
jsonInString = mapper.writeValueAsString(messageDto);
}catch(JsonProcessingException ex){
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer sent data from the Ui microservice: " + messageDto);
return messageDto;
}
}
EC2 consumer에 메세지가 잘 전달되는걸 볼 수 있다.