bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
위 명령어로 브로커 정보(버전, 옵션 등)들을 확인할 수 있다.
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
String messageValue = "testMessage";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
producer.send(record);
producer.flush();
producer.close();
}
주요한 클래스는 KafkaProducer과 ProducerRecord이다.
KafkaProducer를 생성할 때는
- 브로커 정보
- 레코드 키 직렬화 방식
- 레코드 값 직렬화 방식
이 필수 설정으로 들어가며, Java에서는 Properties 클래스를 사용한다.
즉시 전송이 아니라 모아뒀다가 배치로 전송한다.
버퍼에 존재하는 레코드를 모두 전송
producer 인스턴스 리소스 종료
RecordMetadata metadata = producer.send(record).get();
Future 클래스의 get()을 호출하고 RecordMetadata로 확인할 수 있다.
public class ProducerCallback implements Callback {
private final static Logger logger = LoggerFactory.getLogger(ProducerCallback.class);
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null)
logger.error(e.getMessage(), e);
else
logger.info(recordMetadata.toString());
}
}
Callback 인터페이스를 구현함으로써 콜백함수를 작성한다.
그리고
producer.send(record, new ProducerCallback());
producer에서 전송할 때, 해당 콜백 클래스를 같이 넘겨준다.
어느 방식이든 보낸 레코드의 파티션 정보와 오프셋 정보를 알 수 있다.
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("record:{}", record);
}
}
}
주요한 클래스는 KafkaConsumer과 ConsumerRecord이다.
KafkaConsumer 생성할 때는
- 브로커 정보
- 레코드 키 역직렬화 방식
- 레코드 값 역직렬화 방식
이 필수 설정으로 들어가며, Producer와 마찬가지로 정보를 전달할 때 Properties 클래스를 사용한다.
consumer.subscribe(Arrays.asList(TOPIC_NAME));
컨슈머에 토픽을 할당한다.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
데이터를 가져온다. 인자로 들어가는 내용은 컨슈머 버퍼에 데이터를 기다리기 위한 시간.
consumer.commitSync();
poll 메소드로 받은 가장 마지막 레코드의 오프셋을 기준으로 커밋
커밋이 완료될 때까지 기다린다.
consumer.commitAsync();
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
// 필요 작업 처리
}
}
정상적으로 커밋이 되었을 때, 콜백함수를 사용하여 후처리를 할 수도 있다.
ConsumerRebalanceListener 인터페이스의 onPartitionAssigned()와 onPartitionRevoked()를 이용
직접 파티션을 지정하여 할당할 때는 assign()을 이용한다.
정상적으로 종료되지 않을 경우, 세션 타임아웃이 발생할 때 까지 컨슈머 그룹에 남을 수 있다. 이 경우 컨슈머 랙이 늘어날 수 있다.
wakeup() 메소드를 통해 안전하게 종료할 수 있다.
셧다운 훅을 이용해 kill-TERM {프로세스 번호}를 호출함으로써 셧다운 훅을 발생시킨다.
그리고 이후에 close()를 통해 안전하게 종료되었음을 명시적으로 알려준다.
소스코드에서 AdminClient 클래스를 이용해 브로커 정보를 알아낼 수 있다.
Propertites configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(configs);