이 글에서는 Windows 11 환경에서 Apache Kafka 2.8.0 버전을 설치하고 실행하는 방법을 상세하게 설명합니다.
설치 순서는 Kafka 다운로드, 파일 구성 확인, Zookeeper 및 Kafka 실행, 그리고 Kafka Topic 생성 순입니다
kafka_2.13-2.8.0.tgz
파일을 압축 해제하여 폴더로 변환합니다.kafka_2.12-3.6.0
폴더 안에는 주로 사용할 bin
과 config
폴더가 있습니다.bin/windows
폴더: Zookeeper와 Kafka를 실행 및 종료할 수 있는 배치 파일이 있습니다.config
폴더: Zookeeper와 Kafka 설정을 담고 있는 설정 파일들이 있습니다.Apache ZooKeeper는 분산 시스템에서 설정 관리, 이름 서비스, 동기화, 그리고 그룹 서비스와 같은 서비스를 제공하는 중앙 집중식 서비스입니다.
ZooKeeper는 트리 구조로 데이터를 저장하며, 각 노드는 znode라고 불립니다. ZooKeeper는 높은 가용성을 제공하고, 분산 시스템의 여러 컴포넌트 간에 일관된 뷰를 제공하며, 고가용성과 확장성을 제공합니다.
Apache Kafka는 실시간 스트림 처리를 위한 분산 메시지 브로커 시스템입니다.
Kafka는 대용량의 데이터 스트림을 처리하고, 여러 소비자에게 데이터를 분산시킬 수 있습니다.
Kafka는 높은 처리량, 확장성, 내결함성을 제공하며, 다양한 데이터 파이프라인과 스트리밍 애플리케이션에 사용됩니다.
Kafka는 내부적으로 ZooKeeper를 사용하여 클러스터의 메타데이터 관리, 리더 선출, 구성 관리, 그리고 브로커의 살아있음 확인(health checking)과 같은 여러 중요한 작업을 수행합니다.
이러한 방식으로, ZooKeeper는 Kafka 클러스터의 안정적이고 일관된 운영을 지원하는 역할을 수행합니다.
cmd
창을 열고 Kafka 폴더 위치로 이동합니다.bin\windows\zookeeper-server-start.bat config\zookeeper.properties
F:\hanghae\FinalProjects\kafka_2.13-3.6.0>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
입력 줄이 너무 깁니다.
명령 구문이 올바르지 않습니다.
-> 폴더 길이가 너무 길어서 문제 발생하였다.
F 드라이브 바로 밑에 설정해서 문제를 해결한다.
해결
서버가 정상적으로 부팅 됬음을 확인 할 수 있습니다.
cmd
창을 열어 줍니다.bin\windows\kafka-server-start.bat config\server.properties
bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --topic dev-topic
bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
bin\windows\kafka-topics.bat: Windows 환경에서 Kafka 토픽을 관리하기 위한 스크립트의 경로입니다.Kafka Topic이 정상적으로 생성되면, C:\tmp\kafka-logs
위치에 해당 Topic의 폴더가 생성됩니다. 이제 Kafka 환경이 성공적으로 구축되었으며, 데이터 스트리밍 및 메시징 작업을 시작할 수 있습니다.
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.apache.kafka:kafka-clients'
org.springframework.kafka:spring-kafka
와 org.apache.kafka:kafka-clients
두 라이브러리 모두 Apache Kafka를 사용하기 위해 필요한 라이브러리들입니다. 하지만 각각의 라이브러리가 제공하는 기능과 목적이 조금 다릅니다.
org.apache.kafka:kafka-clients
org.springframework.kafka:spring-kafka
:
org.apache.kafka:kafka-clients
는 Kafka와 직접 통신하기 위한 기본적인 기능들을 제공하며, org.springframework.kafka:spring-kafka
는 이러한 Kafka 클라이언트를 Spring Framework 내에서 좀 더 편리하게 사용할 수 있도록 추가적인 기능들과 통합 기능들을 제공합니다.
# Kafka
# Consumer 설정
# Kafka 컨슈머가 연결할 브로커의 주소입니다. 여기서는 로컬에서 실행 중인 Kafka 브로커의 주소와 포트를 지정하고 있습니다.
spring.kafka.consumer.bootstrap-servers=localhost:9092
# 컨슈머 그룹의 ID를 설정합니다. 이 ID는 Kafka 클러스터 내에서 이 컨슈머 그룹을 식별하는 데 사용됩니다.
spring.kafka.consumer.group-id=test-consumer-group
# 오프셋이 초기화되어야 하는 상황(예: 처음 시작하는 컨슈머, 또는 오프셋이 더 이상 유효하지 않은 경우)에 사용할 오프셋 초기화 정책을 설정합니다.
# 'earliest'는 토픽의 처음부터 메시지를 읽기 시작하겠다는 것을 의미합니다.
spring.kafka.consumer.auto-offset-reset=earliest
# Kafka로부터 메시지의 키를 역직렬화하는 데 사용할 클래스를 지정합니다. 여기서는 문자열 역직렬화 클래스를 사용하고 있습니다.
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# Kafka로부터 메시지의 값을 역직렬화하는 데 사용할 클래스를 지정합니다. 여기서도 문자열 역직렬화 클래스를 사용하고 있습니다.
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# Producer 설정
# Kafka 프로듀서가 연결할 브로커의 주소입니다. 컨슈머 설정과 마찬가지로 로컬에서 실행 중인 Kafka 브로커의 주소와 포트를 지정하고 있습니다.
spring.kafka.producer.bootstrap-servers=localhost:9092
# Kafka로 메시지를 보낼 때 메시지의 키를 직렬화하는 데 사용할 클래스를 지정합니다. 여기서는 문자열 직렬화 클래스를 사용하고 있습니다.
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# Kafka로 메시지를 보낼 때 메시지의 값을 직렬화하는 데 사용할 클래스를 지정합니다. 여기서도 문자열 직렬화 클래스를 사용하고 있습니다.
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
@RestController
@RequiredArgsConstructor
public class KafkaController {
private final KafkaProducerService producer;
@PostMapping("/send")
public void send(@RequestParam("message") String message){
producer.sendMessage("test",message);
}
}
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "test", groupId = "test-consumer-group")
public void consume(String message){
System.out.println("Received Message in group 'test-consumer-group': " + message);
}
}
@Service
@RequiredArgsConstructor
public class KafkaProducerService {
private final KafkaTemplate<String,String> kafkaTemplate;
public void sendMessage(String topic, String message){
kafkaTemplate.send(topic,message);
}
}