package com.example.kafkatester;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.Properties;
@SpringBootApplication
public class KafkaTesterApplication {
private static String TOPIC_NAME = "hello-kafka";
private static String BOOTSTRAP_SERVERS = "127.0.0.1:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
for (int index = 0; index < 10; index++) {
String data = "This is record " + index;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, data);
try {
producer.send(record);
System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e);
}
}
}
}
This is record 0
This is record 1
This is record 2
This is record 3
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
for (int index = 0; index < 10; index++) {
String data = "This is record " + index;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, Integer.toString(index), data);
try {
producer.send(record);
System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e);
}
}
key, value가 제대로 들어 오는지 확인하기 위해서 콘솔 컨슈머를 다음 옵션과 함께 실행
$ kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic hello-kafka --property print.key=true --property key.separator="-"
0-This is record 0
1-This is record 1
2-This is record 2
3-This is record 3
4-This is record 4
5-This is record 5
key 값을 넣지 않는 방식으로 레코드 전달 할 경우
null-This is record 0
null-This is record 1
null-This is record 2
카프카가 전달하는 메시지는 레코드라고 한다.
레코드 키는 메시지를 구분하는 구분자 역할을 한다.
레코드 값은 실질적으로 전달하고 싶은 데이터이다.
String, ByteeArray, Int, CSV, TSV, JSON, Object등 사실상 제한 없음
JSON 사용시 key/value 형태로서 확장성이 뛰어남. 컬럼 정보(key) 포함. 또한 디버그시 유리.
CSV 사용시 콤마 기준으로 데이터 구분. 용량 이득.
포맷을 관리하는 다른 방법
@SpringBootApplication
public class KafkaTesterApplication {
...
private static int PARTITION_NUMBER = 1;
public static void main(String[] args) {
...
for (int index = 0; index < 10; index++) {
String data = "This is record " + index;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, PARTITION_NUMBER, Integer.toString(index), data);
try {
producer.send(record);
System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e);
}
}
}
}
파티션을 지정하기에 순서를 보장할 수 있다.
중요 !
프로듀서에서 데이터 보낼때 얼마나 빠르게 보내고 유실을 허용할 것인지 설정. 레플리카와 연관.
프로듀서가 브로커와 소켓연결을 맺어 보낸 즉시 성공으로 간주한다.(ack를 받지 않는 것으로 보인다, UDP).
브로커가 정상적으로 받아서 리더 파티션에 저장했는지 알 수 없다. 팔로워 파티션에도 저장됬는지 알 수 없음.
전송 속도가 중요하고 일부 유실되어도 무관한 데이터에 사용
프로듀서가 보낸 메시지가 리더 파티션에 정상 저장되었는지 확인. 팔로워 파티션에 저장됬는지는 모름. 즉, 리더 파티션에 저장되고 해당 브로커가 죽으면 데이터 유실.
acks = 0에 비해 신뢰도가 높지만 아직 유실 가능성은 있음.
프로듀서가 보낸 메시지가 리더, 팔로워 파티션에 정상 저장되었는지 확인.
리더 파티션의 데이터가 팔로워 파티션까지 복제될때 까지 기다림. 복제가 완료되기 까지 기다림으로 인해 속도가 느림.
유실 가능성이 없지만, 속도가 느림.