1️⃣ Kafka CLI 기본 개념

Kafka를 설치하면 여러 CLI(Command Line Interface) 도구가 함께 제공된다.
이 도구들을 통해 토픽 생성, 메시지 송수신, 컨슈머 그룹 관리 등
Kafka의 핵심 기능을 직접 제어할 수 있다.

Kafka 3.x 이후 버전에서는
모든 CLI 명령어가 --bootstrap-server 옵션을 사용하도록 통합되었으며,
Zookeeper 기반 옵션(--zookeeper)은 제거되었다.

📌 CLI 명령어 실행 형태

kafka-topics.sh --bootstrap-server localhost:9092 --list

Windows에서는 .bat,
Mac·Linux·WSL2에서는 .sh 확장자를 사용한다.
만약 $PATH 설정이 잘못되었다면 Kafka 바이너리 전체 경로로 접근해야 한다.
예시:

/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

2️⃣ kafka-topics 명령어: 토픽 관리

Kafka는 데이터를 토픽(Topic) 단위로 저장한다.
CLI를 통해 토픽을 생성·조회·수정·삭제할 수 있다.

기능명령어 예시설명
토픽 생성kafka-topics.sh --bootstrap-server localhost:9092 --create --topic first_topic --partitions 3 --replication-factor 1토픽 생성
토픽 목록 조회kafka-topics.sh --bootstrap-server localhost:9092 --list전체 토픽 리스트 확인
토픽 상세 정보 조회kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic first_topic파티션, 리더, ISR 정보 확인
토픽 삭제kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first_topic토픽 삭제

✅ 주요 옵션 정리

  • --partitions N: 병렬 처리 단위
  • --replication-factor N: 복제 계수
  • --describe: 각 파티션의 리더와 복제 상태를 확인

💡 주의:
로컬 환경에서는 브로커가 1개인 경우
--replication-factor는 반드시 1이어야 한다.
브로커 개수보다 큰 복제 계수를 설정하면 오류 발생.


3️⃣ kafka-console-producer: 데이터 전송

Kafka 프로듀서는 데이터를 토픽으로 보낸다.
CLI에서는 kafka-console-producer 명령어로 손쉽게 테스트할 수 있다.

🧩 기본 형태

kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic first_topic

프로듀서 실행 후 터미널에서 입력하는 각 줄이
Kafka 메시지로 토픽에 전송된다.
Ctrl + C를 누르면 전송을 종료한다.

💡 Producer 속성

  • --producer-property acks=all
    → 모든 브로커가 메시지를 복제 후 확인 시 성공으로 간주
  • --producer-property compression.type=gzip
    → 메시지를 압축해 전송 효율 향상
  • --property parse.key=true --property key.separator=:
    key:value 형태로 입력 가능
    → 동일 키는 동일 파티션에 저장됨

🧱 예시

> user1:login
> user2:logout
> user1:purchase

이렇게 입력하면 : 왼쪽이 키, 오른쪽이 값으로 저장된다.


4️⃣ kafka-console-consumer: 데이터 소비

Kafka 컨슈머는 토픽에서 메시지를 읽어온다.
CLI를 통해 쉽게 데이터를 소비하며 동작을 실습할 수 있다.

기본 명령어

kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic first_topic

주요 옵션

옵션설명
--from-beginning토픽의 처음부터 읽기
--group my-group컨슈머 그룹 지정
--formatter kafka.tools.DefaultMessageFormatter출력 포맷 지정
--property print.key=true메시지 키 표시
--property print.partition=true파티션 번호 표시
--property print.offset=true오프셋 번호 표시

예시 출력

Partition:0 Offset:1 Key:user1 Value:login
Partition:2 Offset:5 Key:user2 Value:logout

💡 포인트

  • 하나의 컨슈머 그룹 안에서는 각 파티션을 하나의 컨슈머만 소비.
  • 여러 그룹을 만들면 동일 데이터를 여러 번 병렬로 읽을 수 있다.

5️⃣ 컨슈머 그룹 실습

Kafka는 메시지를 병렬 처리하기 위해
여러 컨슈머를 컨슈머 그룹(Consumer Group) 으로 묶는다.

예시

# 1번 터미널
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic test_topic --group my-group

# 2번 터미널 (동일 그룹)
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic test_topic --group my-group

→ 두 컨슈머가 같은 그룹이라면
Kafka는 파티션을 자동 분배하여 병렬로 읽는다.

컨슈머 그룹 관리 명령어

기능명령어 예시
그룹 목록 조회kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
그룹 상세 정보kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
오프셋 재설정kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --topic test_topic --reset-offsets --to-earliest --execute

💡 오프셋 리셋 전략

옵션설명
--to-earliest토픽 처음부터 다시 읽기
--to-latest가장 최근 메시지부터 읽기
--shift-by -NN개의 메시지를 뒤로 이동

6️⃣ Java Producer 기본 구조

Kafka는 Java SDK(kafka-clients) 를 통해 프로듀서를 작성할 수 있다.

🔧 주요 단계

  1. 프로듀서 설정 (bootstrap.servers, key.serializer, value.serializer)
  2. ProducerRecord 생성
  3. send() 메서드로 데이터 전송
  4. flush()close() 호출로 종료

🧩 코드 예시

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

ProducerRecord<String, String> record =
    new ProducerRecord<>("demo_java", "key_1", "hello kafka");

producer.send(record, (metadata, exception) -> {
    if (exception == null) {
        System.out.printf("Sent to partition %d with offset %d%n",
            metadata.partition(), metadata.offset());
    } else {
        exception.printStackTrace();
    }
});

producer.flush();
producer.close();

💡 콜백 활용

  • onCompletion() 내부에서 metadata.partition()metadata.offset()을 통해
    메시지 저장 위치 확인 가능.

7️⃣ Java Consumer 기본 구조

Kafka Consumer API를 사용하면
토픽 데이터를 실시간으로 읽고 처리할 수 있다.

🔧 설정 주요 항목

설정 키설명
bootstrap.servers브로커 주소
group.id컨슈머 그룹 식별자
key.deserializer역직렬화 클래스
value.deserializer역직렬화 클래스
auto.offset.reset오프셋 없음 시 읽기 시작점 (earliest / latest)

🧩 코드 예시

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-java-consumer");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("auto.offset.reset", "earliest");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("demo_java"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("Key: %s, Value: %s, Partition: %d, Offset: %d%n",
            record.key(), record.value(), record.partition(), record.offset());
    }
}

8️⃣ 안전한 종료와 셧다운 훅

컨슈머는 무한 루프로 작동하므로,
프로그램 종료 시 consumer.wakeup() 을 호출해
안전하게 종료해야 한다.

final Thread mainThread = Thread.currentThread();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    System.out.println("Shutdown detected, closing consumer...");
    consumer.wakeup();
    try { mainThread.join(); } catch (InterruptedException e) { }
}));

이후 try-catch 블록에서 WakeupException 을 처리하고,
consumer.close() 로 리밸런싱을 정상 종료한다.


9️⃣ 핵심 정리

항목CLI 도구Java 대응 API
토픽 생성/관리kafka-topicsAdminClient
메시지 전송kafka-console-producerKafkaProducer
메시지 소비kafka-console-consumerKafkaConsumer
컨슈머 그룹 관리kafka-consumer-groupsConsumer API 내부 offset commit

💬 결론

CLI는 Kafka 구조를 가장 직관적으로 학습할 수 있는 도구이며,
Java SDK는 실무 애플리케이션 통합의 표준 수단이다.

CLI 실습을 통해
토픽 → 프로듀서 → 브로커 → 컨슈머 → 오프셋 흐름을 익힌 후
Java 코드로 재현하면 Kafka의 동작 메커니즘을 완전히 체득할 수 있다.

profile
okorion's Tech Study Blog.

0개의 댓글