Kafka를 설치하면 여러 CLI(Command Line Interface) 도구가 함께 제공된다.
이 도구들을 통해 토픽 생성, 메시지 송수신, 컨슈머 그룹 관리 등
Kafka의 핵심 기능을 직접 제어할 수 있다.
Kafka 3.x 이후 버전에서는
모든 CLI 명령어가 --bootstrap-server 옵션을 사용하도록 통합되었으며,
Zookeeper 기반 옵션(--zookeeper)은 제거되었다.
📌 CLI 명령어 실행 형태
kafka-topics.sh --bootstrap-server localhost:9092 --list
Windows에서는 .bat,
Mac·Linux·WSL2에서는 .sh 확장자를 사용한다.
만약 $PATH 설정이 잘못되었다면 Kafka 바이너리 전체 경로로 접근해야 한다.
예시:
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
Kafka는 데이터를 토픽(Topic) 단위로 저장한다.
CLI를 통해 토픽을 생성·조회·수정·삭제할 수 있다.
| 기능 | 명령어 예시 | 설명 |
|---|---|---|
| 토픽 생성 | kafka-topics.sh --bootstrap-server localhost:9092 --create --topic first_topic --partitions 3 --replication-factor 1 | 토픽 생성 |
| 토픽 목록 조회 | kafka-topics.sh --bootstrap-server localhost:9092 --list | 전체 토픽 리스트 확인 |
| 토픽 상세 정보 조회 | kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic first_topic | 파티션, 리더, ISR 정보 확인 |
| 토픽 삭제 | kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first_topic | 토픽 삭제 |
--partitions N: 병렬 처리 단위--replication-factor N: 복제 계수--describe: 각 파티션의 리더와 복제 상태를 확인💡 주의:
로컬 환경에서는 브로커가 1개인 경우
--replication-factor는 반드시 1이어야 한다.
브로커 개수보다 큰 복제 계수를 설정하면 오류 발생.
Kafka 프로듀서는 데이터를 토픽으로 보낸다.
CLI에서는 kafka-console-producer 명령어로 손쉽게 테스트할 수 있다.
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic first_topic
프로듀서 실행 후 터미널에서 입력하는 각 줄이
Kafka 메시지로 토픽에 전송된다.
Ctrl + C를 누르면 전송을 종료한다.
--producer-property acks=all--producer-property compression.type=gzip--property parse.key=true --property key.separator=:key:value 형태로 입력 가능> user1:login
> user2:logout
> user1:purchase
이렇게 입력하면 : 왼쪽이 키, 오른쪽이 값으로 저장된다.
Kafka 컨슈머는 토픽에서 메시지를 읽어온다.
CLI를 통해 쉽게 데이터를 소비하며 동작을 실습할 수 있다.
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic first_topic
| 옵션 | 설명 |
|---|---|
--from-beginning | 토픽의 처음부터 읽기 |
--group my-group | 컨슈머 그룹 지정 |
--formatter kafka.tools.DefaultMessageFormatter | 출력 포맷 지정 |
--property print.key=true | 메시지 키 표시 |
--property print.partition=true | 파티션 번호 표시 |
--property print.offset=true | 오프셋 번호 표시 |
Partition:0 Offset:1 Key:user1 Value:login
Partition:2 Offset:5 Key:user2 Value:logout
💡 포인트
Kafka는 메시지를 병렬 처리하기 위해
여러 컨슈머를 컨슈머 그룹(Consumer Group) 으로 묶는다.
# 1번 터미널
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic test_topic --group my-group
# 2번 터미널 (동일 그룹)
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic test_topic --group my-group
→ 두 컨슈머가 같은 그룹이라면
Kafka는 파티션을 자동 분배하여 병렬로 읽는다.
| 기능 | 명령어 예시 |
|---|---|
| 그룹 목록 조회 | kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list |
| 그룹 상세 정보 | kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group |
| 오프셋 재설정 | kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --topic test_topic --reset-offsets --to-earliest --execute |
💡 오프셋 리셋 전략
| 옵션 | 설명 |
|---|---|
--to-earliest | 토픽 처음부터 다시 읽기 |
--to-latest | 가장 최근 메시지부터 읽기 |
--shift-by -N | N개의 메시지를 뒤로 이동 |
Kafka는 Java SDK(kafka-clients) 를 통해 프로듀서를 작성할 수 있다.
bootstrap.servers, key.serializer, value.serializer)ProducerRecord 생성send() 메서드로 데이터 전송flush() 및 close() 호출로 종료Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =
new ProducerRecord<>("demo_java", "key_1", "hello kafka");
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Sent to partition %d with offset %d%n",
metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
producer.flush();
producer.close();
💡 콜백 활용
onCompletion() 내부에서 metadata.partition()과 metadata.offset()을 통해Kafka Consumer API를 사용하면
토픽 데이터를 실시간으로 읽고 처리할 수 있다.
| 설정 키 | 설명 |
|---|---|
bootstrap.servers | 브로커 주소 |
group.id | 컨슈머 그룹 식별자 |
key.deserializer | 역직렬화 클래스 |
value.deserializer | 역직렬화 클래스 |
auto.offset.reset | 오프셋 없음 시 읽기 시작점 (earliest / latest) |
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-java-consumer");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("demo_java"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Key: %s, Value: %s, Partition: %d, Offset: %d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
컨슈머는 무한 루프로 작동하므로,
프로그램 종료 시 consumer.wakeup() 을 호출해
안전하게 종료해야 한다.
final Thread mainThread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutdown detected, closing consumer...");
consumer.wakeup();
try { mainThread.join(); } catch (InterruptedException e) { }
}));
이후 try-catch 블록에서 WakeupException 을 처리하고,
consumer.close() 로 리밸런싱을 정상 종료한다.
| 항목 | CLI 도구 | Java 대응 API |
|---|---|---|
| 토픽 생성/관리 | kafka-topics | AdminClient |
| 메시지 전송 | kafka-console-producer | KafkaProducer |
| 메시지 소비 | kafka-console-consumer | KafkaConsumer |
| 컨슈머 그룹 관리 | kafka-consumer-groups | Consumer API 내부 offset commit |
CLI는 Kafka 구조를 가장 직관적으로 학습할 수 있는 도구이며,
Java SDK는 실무 애플리케이션 통합의 표준 수단이다.
CLI 실습을 통해
토픽 → 프로듀서 → 브로커 → 컨슈머 → 오프셋 흐름을 익힌 후
Java 코드로 재현하면 Kafka의 동작 메커니즘을 완전히 체득할 수 있다.