name=alert-source
connector.class=FileStreamSource
tasks.max=1 # 독립 실행형으로 테스트
file=alert.txt # 이 파일을 모니터링
topic=kinaction_alert_connect # 데이터가 전송될 토픽
하기 전에 Kafka Connector를 설치해야한다.
https://docs.confluent.io/platform/7.5/connect/quickstart.html
bin/connect-standalone.sh config/connect-standalone.properties config/alert-standalone.properties

name=alert-sink
connector.class=FileStreamSink
tasks.max=1 # 독립 실행형으로 테스트
file=alert-sink.txt # 이 파일을 모니터링
topic=kinaction_alert_connect # 데이터가 전송될 토픽
bin/connect-standalone.sh config/connect-standalone.properties config/alert-standalone.properties alert-sink.properties

--fom-beginning으로토픽 처음부터 소비를 할수있다{
"type": "record",
"name": "kinaction_libraryCheckout",
"fields": [{...}]
}
.... 중략 ....
//직렬화
properties.put("value.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer")
//역직렬화
properties.put("value.deserializer","io.confluent.kafka.serializers.KafkaAvroDeserializer")

설정
plugins { kotlin("jvm") version "1.9.0" id("com.github.davidmc24.gradle.plugin.avro") >version "1.9.1" } dependencies { implementation("org.apache.avro:avro:1.11.3") implementation("io.confluent:kafka-avro-> serializer:7.5.1") } avro { isCreateSetters.set(true) isCreateOptionalGetters.set(false) isGettersReturnOptional.set(false) isOptionalGettersForNullableFieldsOnly.set(false) fieldVisibility.set("PUBLIC") outputCharacterEncoding.set("UTF-8") stringType.set("String") templateDirectory.set(null as String?) isEnableDecimalLogicalType.set(true) } tasks.withType(JavaCompile::class).configureEach > { options.encoding = "UTF-8" } tasks { register("generateAvro", > GenerateAvroJavaTask::class){ source("src/avro") setOutputDir(file("src/main/avro")) } }