일반화
- 홈 엔터테인먼트 시스템에 있는 수신기
- 데이터 스트림을 처리하고 다른쪽 끝난에 연결된 외부 장치에 적합한 형식으로 보낸다.
-직원이 유급 휴가를 제출하는 데 사용하는 HR 시스템을 예로 들어보자
# 주키퍼 서버 시작
bin/zookeeper-server-start.sh config/zookeeper.properties
# 카프카 서버 클러스터 시작
bin/kafka-server-start.sh config/server0.properties
bin/kafka-server-start.sh config/server1.properties
bin/kafka-server-start.sh config/server2.properties
bin/kafka-topics.sh --create --bootstrap-server localhost:9094 --topic kinaction_helloworld --partitions 3 --replication-factor 3
bin/kafka-topics.sh --list --bootstrap-server localhost:9094
bin/kafka-topics.sh --bootstrap-server localhost:9094 --describe --topic kinaction_helloworld
bin/kafka-console-producer.sh --bootstrap-server localhost:9094 --topic kinaction_helloworld
bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic kinaction_helloworld --from-beginning
Alert alert = new Alert(...);
ProducerRecord<Alert, String> pr = new ProducerRecord<>("kinaction_alert", "alert", alert.getAlertMessage());
producer.send(pr, new AlertCallback()); # 콜백을 사용할 수도 있음
producer.close()
consumer.subscribe(List.of("kinaction_audit")); //토픽 구독
while(keepConsuming){
var records = consumer.poll(Duriation.ofMillis(250));
for(ConsumerRecord<String, String> record : records){
log.info("kinaction_info offset = {}, kinaction_value = {}", record.offset(), record.value());
OffsetAndMetaData offsetMeta = new OffsetAndMetadata(++record.offset(), "");
Map<TopicPartition, OffsetAndMetadata> kaOffsetMap = new HashMap<>();
kaOffsetMap.put(new TopicPartition("kinaction_audit", record.partition()), offsetMeta);
consumer.commitSync(kaOffsetMap);
}
}
public class HelloWorldProducer{
public static main(String[] args){
Properties kaProperties = new Properties();
kaProperties.put("bootstrap.servers", "localhost:9092", "localhost:9093", "localhost:9094");
kaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try(Producer<String, String> producer = new KafkaProducer<>(kaProperties))
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kinaction_helloworld", null, "hello world again");
producer.send(producerRecord)
}
}
public class HelloWorldConsumer{
...
public static void main(String[] args){
Properties kaProperties = new Properties();
kaProperties.put("bootstrap.servers", "localhost:9092", "localhost:9093", "localhost:9094")
kaProperties.put("group.id","kinaction_helloconsumer");
kaProperties.put("enable.auto.commit","true");
kaProperties.put("auto.commit.interval.ms","1000");
kaProperties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kaProperties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
HelloWorldConsumer helloWorldConsumer = new HelloWorldConsumer();
helloWorldConsumer.consume(kaProperties);
Runtime.getRuntime().addShutdownHook(new Thread(helloWorldconsumer::shutdown));
}
private void consume(Properties kaProperties){
try(KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kaProperties)){
consumer.subscribe(List.of("kinaction_helloworld"));
while(keepConsuming){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(250));
...
}
}
}
}