Kafka를 사용하려면 먼저 최신 릴리스를 다운로드해야 합니다. Scala 버전에 따라 바이너리를 선택해야 하며, 예시는 Scala 2.13과 Kafka 4.0.0 기준입니다.
$ tar -xzf kafka_2.13-4.0.0.tgz
$ cd kafka_2.13-4.0.0
✅ TIP: 로컬 실행을 위해 Java 17 이상이 설치되어 있어야 합니다.
# 클러스터 UUID 생성
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
# 로그 디렉토리 포맷
$ bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
# Kafka 서버 실행
$ bin/kafka-server-start.sh config/server.properties
$ docker pull apache/kafka:4.0.0
$ docker run -p 9092:9092 apache/kafka:4.0.0
$ docker pull apache/kafka-native:4.0.0
$ docker run -p 9092:9092 apache/kafka-native:4.0.0
✅ TIP: 도커로 빠르게 테스트 환경을 만들 수 있지만, 로그 볼륨 마운트 등 설정도 함께 고려하세요.
Kafka에서 이벤트는 토픽 단위로 저장됩니다. 토픽은 파일 시스템의 폴더와 유사합니다.
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
토픽 정보 확인:
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is my first event
>This is my second event
🚀 한 줄 입력마다 개별 이벤트로 저장됩니다.
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
Ctrl-C로 종료할 수 있으며, 복수의 콘슈머가 동시에 동일 토픽을 소비하는 것도 가능합니다.
파일 → Kafka 토픽 → 파일 구조의 간단한 파이프라인을 구성할 수 있습니다.
$ echo "plugin.path=libs/connect-file-4.0.0.jar" >> config/connect-standalone.properties
$ echo -e "foo\nbar" > test.txt
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
$ more test.sink.txt
foo
bar
Kafka 토픽에서 직접 확인도 가능:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
Kafka Streams는 Kafka 토픽의 데이터를 실시간으로 처리할 수 있는 Java/Scala 클라이언트 라이브러리입니다.
KStream<String, String> textLines = builder.stream("quickstart-events");
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
.groupBy((keyIgnored, word) -> word)
.count();
wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
☑️ 실시간 집계, 윈도우, 조인 등 복잡한 스트림 연산을 처리할 수 있어 실시간 데이터 분석에 적합합니다.
# Kafka 종료: Ctrl-C
# 로그/데이터 삭제
$ rm -rf /tmp/kafka-logs /tmp/kraft-combined-logs