Kafka 클러스터를 클라우드 환경에 올리기 전에 온프레미스에서 간단히 연동하여 테스트 해보고자 진행했다.
테스트 목적이기 때문에 broker들을 docker-compose에 컨테이너로 띄운 뒤, 클러스터를 구축했다.
sudo apt update
sudo apt install -y ca-certificates curl gnupg lsb-release
# Docker GPG 키 추가
sudo mkdir -p /etc/apt/keyrings
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
# Docker 저장소 추가
echo \
"deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] \
https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | \
sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
# Docker 설치
sudo apt update
sudo apt install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
# 현재 사용자에게 Docker 권한 부여 (재로그인 필요)
sudo usermod -aG docker $USER
newgrp docker
Docker Compose V2는 docker compose로 실행되며, 다음과 같이 설치합니다:
sudo apt update
sudo apt install docker.io -y
# Docker Compose CLI 플러그인 설치
sudo apt install docker-compose-plugin -y
# 현재 유저를 docker 그룹에 추가 (로그아웃/로그인 필요)
sudo usermod -aG docker $USER
newgrp docker
# 정상 설치 확인
docker compose version
예상 출력:
Docker Compose version v2.22.0
# 서버 1에서만 실행
CLUSTER_ID=$(docker run --rm bitnami/kafka:3.6.1 kafka-storage.sh random-uuid)
echo "Generated Cluster ID: $CLUSTER_ID"
출력된 클러스터ID를 복사
후 각 yml 파일에 클러스터 아이디로 넣어줄 예정
version: '3'
services:
kafka:
image: bitnami/kafka:3.6.1
container_name: kafka
ports:
- "9092:9092"
- "9093:9093"
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@<BROKER1_IP>:9093,2@<BROKER2_IP>:9093,3@<BROKER3_IP>:9093
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://<BROKER1_IP>:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_KRAFT_CLUSTER_ID=<YOUR_CLUSTER_ID> # 메모해둔 값을 넣으셈
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_NUM_PARTITIONS=3
- KAFKA_CFG_DEFAULT_REPLICATION_FACTOR=3
volumes:
- ./kafka_data:/bitnami/kafka
restart: always
user: root
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8989:8080"
environment:
- KAFKA_CLUSTERS_0_NAME=auto-driving
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=10.0.2.133:9092,10.0.2.136:9092,10.0.2.138:9092
restart: always
version: '3'
services:
kafka:
image: bitnami/kafka:3.6.1
container_name: kafka
ports:
- "9092:9092"
- "9093:9093"
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_NODE_ID=2
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@<BROKER1_IP>:9093,2@<BROKER2_IP>:9093,3@<BROKER3_IP>:9093
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://<BROKER2_IP>:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_KRAFT_CLUSTER_ID=<YOUR_CLUSTER_ID>
- ALLOW_PLAINTEXT_LISTENER=yes
volumes:
- ./kafka_data:/bitnami/kafka
restart: always
user: root
version: '3'
services:
kafka:
image: bitnami/kafka:3.6.1
container_name: kafka
ports:
- "9092:9092"
- "9093:9093"
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_NODE_ID=3
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@<BROKER1_IP>:9093,2@<BROKER2_IP>:9093,3@<BROKER3_IP>:9093
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://<BROKER3_IP>:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_KRAFT_CLUSTER_ID=<YOUR_CLUSTER_ID>
- ALLOW_PLAINTEXT_LISTENER=yes
volumes:
- ./kafka_data:/bitnami/kafka
restart: always
user: root
서버 1에는 카프카와 카프카 ui 설치
서버 2에는 카프카 설치
서버 3에는 카프카 설치
그리고 각 브로커들을 클러스터로 구성
docker-compose up -d
localhost:8989
//kafka
implementation 'org.springframework.kafka:spring-kafka'
spring:
kafka:
bootstrap-servers: <BROKER1_IP>:9092,<BROKER2_IP>:9092,<BROKER3_IP>:9092
consumer:
group-id: test-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
우선 테스트를 위해 간단한 메세지 발생 코드만 구성했다.
package org.example.backend.domain.kafka;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@RequiredArgsConstructor
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
package org.example.backend.domain.kafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumerService {
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void listen(String message) {
System.out.println("Consumed message: " + message);
}
}
package org.example.backend.domain.kafka;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RequiredArgsConstructor
@RestController
public class KafkaTestController {
private final KafkaProducerService producerService;
@PostMapping("/send")
public String send(@RequestParam String message) {
producerService.sendMessage("test-topic", message);
return "Message sent: " + message;
}
}
클러스터를 구성하고 연동하는 과정에서 문제 생김
2025-05-14T09:46:54.933+09:00 INFO 4460 --- [backend] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-test-group-1, groupId=test-group] Disconnecting from node 2 due to socket connection setup timeout. The timeout value is 11572 ms.
2025-05-14T09:47:03.911+09:00 INFO 4460 --- [backend] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-test-group-1, groupId=test-group] Disconnecting from node 3 due to socket connection setup timeout. The timeout value is 8868 ms.
2025-05-14T09:47:22.749+09:00 INFO 4460 --- [backend] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-test-group-1, groupId=test-group] Disconnecting from node 2 due to socket connection setup timeout. The timeout value is 18551 ms.
로그의 종류는 INFO이지만 내용을 보면 disconnecting으로 카프카 클러스터와 연결이 실패하고 있는 것을 확인했다.
내 로컬에서 VM에 통신이 되는지 ping을 날려봤는데 ping이 가지 않았다. virtualbox에서 분명 NAT 네트워크로 내 로컬 호스트와 포트포워딩을 해줘도 해결되지 않았다.
현재 문제 상황이 내 로컬환경에서 VM을 인식?하지 못하는 문제였기 때문에 VM으로 ping이 가도록 설정해줬다.
VM의 네트워크를 브리지로 설정한 뒤, DHCP를 다시 활성화 시켜서 동적으로 새로운 Iip를 받아왔다.
이후 내 로컬에서 VM으로 ping 테스트를 했고 성공했다.
새로 부여 받은 broker들의 ip를 application.yml에 추가했고 카프카 브로커 연결에 성공했다.
아래는 간단한 메세지 테스트 결과이다.


