Apache Kafka와 JAVA를 연동하는 간단한 예제 코드를 알아보자.
public class ProducerJAVA {
// 로그 설정
private static final Logger log = LoggerFactory.getLogger(ProducerDemoKeys.class.getSimpleName());
public static void main(String[] args) {
// create Producer Properties
Properties properties = new Properties();
// 부트스트랩 서버 설정
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // properties.setProperty( "bootstrap.servers", "127.0.0.1:9092"); 로도 설정 가능
// Key serializer 설정
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // properties.setProperty("key.serializer", StringSerializer.class.getName()); 로도 설정 가능
// Value serializer 설정
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // properties.setProperty("value.serializer", StringSerializer.class.getName()); 로도 설정 가능
// create the producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
String topic = "demo_java"; // 토픽 설정
for (int j = 0; j < 3; j++) {
for (int i = 0; i < 10; i++) {
String key = "id_" + i; // 키 설정 (생략 가능)
String value = "hello world " + i; // value 설정
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record,
(metadata, exception) -> { // 메시지 전송 callback
if(exception == null) {
log.info("key: {}", key);
log.info("partition: {}", metadata.partition());
} else {
log.error("Error!! ", exception);
}
});
}
}
producer.flush(); // 카프카 메세지 전송은 비동기 방식이기 때문에 플러시 안해주면 전송하기 전에 프로그램 끝나버림
producer.close();
}
}
public class ConsumerJAVA {
// 로그 설정
private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class.getSimpleName());
public static void main(String[] args) {
String groupId = "my-java-application"; // 컨슈머 그룹 ID 설정
String topic = "wikimedia.recentchange.connect"; // 토픽 설정
// create Consumer Properties
Properties properties = new Properties();
// 부트스트랩 서버 설정
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// Key deserializer 설정
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
// value deserializer 설정
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
// 컨슈머 그룹 ID 설정
properties.setProperty("group.id", groupId);
// offset reset 설정
// latest -> 지금부터 보낸 메세지만 읽겠다 / earliest -> 현재 컨슈머 그룹의 커밋된 오프셋을 찾지 못할 때 가장 처음의 오프셋부터 메시지를 읽겠다
properties.setProperty("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(topic)); // 구독할 토픽 설정. 복수 개의 토픽 설정 가능함.
// 메시지 폴링 루프
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // 최대 1000 밀리초만큼 기다리면서 브로커로부터 메시지를 가져옴
for (ConsumerRecord<String, String> record : records) {
log.info("Key: {} , Value: {} ", record.key(), record.value());
log.info("Partition: {} , Offset: {} ", record.partition(), record.offset());
}
}
}
}