kafka 클러스터 구축 및 Spring app 연동 테스트

박정호·2025년 5월 14일

Kafka 클러스터를 클라우드 환경에 올리기 전에 온프레미스에서 간단히 연동하여 테스트 해보고자 진행했다.
테스트 목적이기 때문에 broker들을 docker-compose에 컨테이너로 띄운 뒤, 클러스터를 구축했다.

docker 설치

sudo apt update
sudo apt install -y ca-certificates curl gnupg lsb-release

# Docker GPG 키 추가
sudo mkdir -p /etc/apt/keyrings
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg

# Docker 저장소 추가
echo \
  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] \
  https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | \
  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null

# Docker 설치
sudo apt update
sudo apt install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin

# 현재 사용자에게 Docker 권한 부여 (재로그인 필요)
sudo usermod -aG docker $USER

newgrp docker

Docker Compose 설치 (v2 방식)

Docker Compose V2는 docker compose로 실행되며, 다음과 같이 설치합니다:

sudo apt update
sudo apt install docker.io -y

# Docker Compose CLI 플러그인 설치
sudo apt install docker-compose-plugin -y

# 현재 유저를 docker 그룹에 추가 (로그아웃/로그인 필요)
sudo usermod -aG docker $USER

newgrp docker

# 정상 설치 확인
docker compose version

예상 출력:

Docker Compose version v2.22.0

VM 3개 준비 사항

  • 각 vm 사양은 RAM 4gb, 우선 2cpu, disc 25 gb 줬음
  • 각 vm ip 는 10.0.2.25, 10.0.2.30, 10.0.2.35
  • 각 vm 엔 docker 와 docker-compose 설치를 마침

Broker1의 VM 에서 할 일

# 서버 1에서만 실행
CLUSTER_ID=$(docker run --rm bitnami/kafka:3.6.1 kafka-storage.sh random-uuid)
echo "Generated Cluster ID: $CLUSTER_ID"

출력된 클러스터ID를 복사

후 각 yml 파일에 클러스터 아이디로 넣어줄 예정

Broker1의 docker-compose.yml

version: '3'
services:
  kafka:
    image: bitnami/kafka:3.6.1
    container_name: kafka
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@<BROKER1_IP>:9093,2@<BROKER2_IP>:9093,3@<BROKER3_IP>:9093
      - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://<BROKER1_IP>:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_KRAFT_CLUSTER_ID=<YOUR_CLUSTER_ID> # 메모해둔 값을 넣으셈 
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_NUM_PARTITIONS=3
      - KAFKA_CFG_DEFAULT_REPLICATION_FACTOR=3
    volumes:
      - ./kafka_data:/bitnami/kafka
    restart: always
    user: root

  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "8989:8080"
    environment:
      - KAFKA_CLUSTERS_0_NAME=auto-driving
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=10.0.2.133:9092,10.0.2.136:9092,10.0.2.138:9092
    restart: always
  • 브로커 클러스터에 대한 ui는 broker1의 vm에서만 보려고 여기만 작성해줌(그냥 귀찮아서..)

broker2 docker-compose.yml

version: '3'
services:
  kafka:
    image: bitnami/kafka:3.6.1
    container_name: kafka
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_NODE_ID=2
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@<BROKER1_IP>:9093,2@<BROKER2_IP>:9093,3@<BROKER3_IP>:9093
      - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://<BROKER2_IP>:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_KRAFT_CLUSTER_ID=<YOUR_CLUSTER_ID>
      - ALLOW_PLAINTEXT_LISTENER=yes
    volumes:
      - ./kafka_data:/bitnami/kafka
    restart: always
    user: root

broker3 docker-compose.yml

version: '3'
services:
  kafka:
    image: bitnami/kafka:3.6.1
    container_name: kafka
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_NODE_ID=3
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@<BROKER1_IP>:9093,2@<BROKER2_IP>:9093,3@<BROKER3_IP>:9093
      - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://<BROKER3_IP>:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_KRAFT_CLUSTER_ID=<YOUR_CLUSTER_ID>
      - ALLOW_PLAINTEXT_LISTENER=yes
    volumes:
      - ./kafka_data:/bitnami/kafka
    restart: always
    user: root

현재 한 것

서버 1에는 카프카와 카프카 ui 설치

서버 2에는 카프카 설치

서버 3에는 카프카 설치

그리고 각 브로커들을 클러스터로 구성

Broker1~3 순서대로 실행 시키는 것을 권장

docker-compose up -d

이후 서버 1에서 설치한 카프카 ui 접속

localhost:8989

Spring app 설정

build.gradle

//kafka
implementation 'org.springframework.kafka:spring-kafka'

application.yml

spring:
	  kafka:
    bootstrap-servers: <BROKER1_IP>:9092,<BROKER2_IP>:9092,<BROKER3_IP>:9092
    consumer:
      group-id: test-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

우선 테스트를 위해 간단한 메세지 발생 코드만 구성했다.

KafkaProducerService.java

package org.example.backend.domain.kafka;

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@RequiredArgsConstructor
@Service
public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

KafkaConsumerService.java

package org.example.backend.domain.kafka;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumerService {

    @KafkaListener(topics = "test-topic", groupId = "test-group")
    public void listen(String message) {
        System.out.println("Consumed message: " + message);
    }
}

KafkaTestController.java

package org.example.backend.domain.kafka;

import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RequiredArgsConstructor
@RestController
public class KafkaTestController {

    private final KafkaProducerService producerService;

    @PostMapping("/send")
    public String send(@RequestParam String message) {
        producerService.sendMessage("test-topic", message);
        return "Message sent: " + message;
    }
}

트러블 슈팅

클러스터를 구성하고 연동하는 과정에서 문제 생김

2025-05-14T09:46:54.933+09:00  INFO 4460 --- [backend] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-test-group-1, groupId=test-group] Disconnecting from node 2 due to socket connection setup timeout. The timeout value is 11572 ms.
2025-05-14T09:47:03.911+09:00  INFO 4460 --- [backend] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-test-group-1, groupId=test-group] Disconnecting from node 3 due to socket connection setup timeout. The timeout value is 8868 ms.
2025-05-14T09:47:22.749+09:00  INFO 4460 --- [backend] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-test-group-1, groupId=test-group] Disconnecting from node 2 due to socket connection setup timeout. The timeout value is 18551 ms.

로그의 종류는 INFO이지만 내용을 보면 disconnecting으로 카프카 클러스터와 연결이 실패하고 있는 것을 확인했다.

내 로컬에서 VM에 통신이 되는지 ping을 날려봤는데 ping이 가지 않았다. virtualbox에서 분명 NAT 네트워크로 내 로컬 호스트와 포트포워딩을 해줘도 해결되지 않았다.

해결

현재 문제 상황이 내 로컬환경에서 VM을 인식?하지 못하는 문제였기 때문에 VM으로 ping이 가도록 설정해줬다.

VM의 네트워크를 브리지로 설정한 뒤, DHCP를 다시 활성화 시켜서 동적으로 새로운 Iip를 받아왔다.

이후 내 로컬에서 VM으로 ping 테스트를 했고 성공했다.

새로 부여 받은 broker들의 ip를 application.yml에 추가했고 카프카 브로커 연결에 성공했다.

아래는 간단한 메세지 테스트 결과이다.

성공화면

0개의 댓글