Kafka Connect
에 source connector
나 sink connector
을 연결했다고 해보자. 컨넥터와 카프카 브로커 사이에 주고 받는 메시지를 어떻게 변환하여 저장할 것인지 역할을 수행하는 것이 Converter
이다.
몽고DB에서 제공하는 source connector
는 키 스키마
와 값 스키마
의 두 가지 유형의 스키마를 default로 제공한다. 그래서 mongo source connector
는 값과 키가 모두 포함된 메시지를 default 스키마 형식에 맞추어 Apache Kafka
로 보낸다.(이전 게시글을 통해 메시지를 직접 확인해보자)
default 스키마 코드: https://github.com/mongodb/mongo-kafka/blob/master/src/main/java/com/mongodb/kafka/connect/source/schema/AvroSchemaDefaults.java
링크타고 들어가보면 Avro 스키마를 이용해 default 스키마를 정의한 모습을 확인할 수 있다.
키 스키마
는 Apache Kafka
로 전송되는 메시지의 키 구조를 적용한다.
값 스키마
는 Apache Kafka
로 전송되는 메시지의 값 구조를 적용한다.
컨버터는 유연하게 선택하여 사용할 수 있도록 설계되어 있다. 사용할 수 있는 컨버터는 Avro Converter, Protobuf Converter 등 다양하게 존재한다.
이전 게시글에 이어 실습을 진행해보았다.
이번 시간에는 Kafka Connect
의 mongodb source connector
에 Avro Converter
와 스키마 레지스트리를 연동하고 컨넥트에서 사용한 스키마를 스키마 레지스트리에 등록하고, 카프카 스트림즈
를 이용해 컨슈머쪽에서 Avro
스키마로 메시지를 받아 처리하는 작업을 진행해본다.
이전 시간에서 넘어왔다고 해보자 굉장히 간단하다.
Avro Converter
를 사용한다고 표기하고 스키마 레지스트리 주소를 넣어주면 된다.
{
"name": "mongo-simple-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"database": "example-stream",
"collection": "product",
// 다음 내용이 추가됨
"output.format.value": "schema",
"output.format.key": "schema",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
다음과 같이 json을 업데이트하고 kafka connect에 connector를 다시 등록해주자
먼저 Change Event
를 감지하기 위해 몽고디비에 변화를 준다.
그 다음 몽고디비 컨테이너 내부에서 다음 명령어를 실행하여 스키마 레지스트리쪽으로 API Call을 날린다.
curl schema-registry:8081/subjects
다음과 같이 스키마 레지스트리에 connector가 스키마를 등록한 것 확인이 가능하다.
["example-stream.product-key","example-stream.product-value"]
curl schema-registry:8081/subjects/example-stream.product-value/versions
조회해보니 버전1이 존재한다.
버전을 넣어 api call 한다.
curl schema-registry:8081/subjects/example-stream.product-value/versions/1
응답내용
{
"subject":"example-stream.product-value",
"version":1,
"id":2,
"schema":"{\"type\":\"record\",\"name\":\"ChangeStream\",\"fields\":[{\"name\":\"_id\",\"type\":\"string\"},{\"name\":\"operationType\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"fullDocument\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ns\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"ns\",\"fields\":[{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"coll\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"ns\"}],\"default\":null},{\"name\":\"to\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"to\",\"fields\":[{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"coll\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"to\"}],\"default\":null},{\"name\":\"documentKey\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"updateDescription\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"updateDescription\",\"fields\":[{\"name\":\"updatedFields\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"removedFields\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null}],\"connect.name\":\"updateDescription\"}],\"default\":null},{\"name\":\"clusterTime\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"txnNumber\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"lsid\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"lsid\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"uid\",\"type\":\"string\"}],\"connect.name\":\"lsid\"}],\"default\":null}],\"connect.name\":\"ChangeStream\"}"
}
토픽을 까보자. 무언가 Avro 형식으로 메시지가 브로커에 저장된듯하다.
Kafka Connect 쪽에서 자신이 사용한 Avro 스키마를 스키마 레지스트리에 적재해주었음을 위에서 확인하였다.
이번 섹션에서는 해당 Avro 스키마를 스키마 레지스트리로부터 받아 메시지를 컨슘하여 kafka streams로 메시지를 처리해본다.
코틀린을 사용하였다!
repositories {
maven {
url = uri("https://packages.confluent.io/maven/")
}
}
dependencies {
implementation(project(":diff-checker:diff-checker-enum"))
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
// 카프카 브로커 지원하는지 버전 확인// https://mvnrepository.com/artifact/org.apache.kafka/connect-json
implementation("org.apache.kafka:kafka-clients:3.4.0")
implementation("org.apache.kafka:kafka-streams:3.4.0")
// 카프카 버전 유의
implementation("io.confluent:kafka-streams-avro-serde:7.3.2")
implementation("io.confluent:kafka-schema-registry-client:7.3.2")
implementation("io.confluent:kafka-avro-serializer:7.3.2")
testImplementation("org.apache.kafka:kafka-streams-test-utils:3.4.0")
// https://mvnrepository.com/artifact/org.slf4j/slf4j-api
implementation("org.slf4j:slf4j-api:2.0.7")
implementation("ch.qos.logback:logback-classic:1.4.6")
}
package com.example.event.diffchecker
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.KStream
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
import java.util.*
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig
import org.apache.avro.generic.GenericRecord
/**
* 카프카 스트림즈로 Avro Schema를 GenericRecord으로 받아 처리하는 예제
*/
fun main(args:Array<String>) {
// env
val applicationName = "mongo-diff-checker-app"
val bootstrapServers = "localhost:9092"
val productStreamTopic = "example-stream.product"
val upsertProductTopic = "example-stream.product.upsert"
val deleteProductTopic = "example-stream.product.delete"
val schemaRegistryUrl = "http://localhost:8081"
// Generic Avro Serde for a generic Avro record type
val serdeConfig = mapOf(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to schemaRegistryUrl
)
val genericAvroRecordSerde = GenericAvroSerde()
genericAvroRecordSerde.configure(serdeConfig, false)
// streams props 설정
val props = Properties()
props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = schemaRegistryUrl
props[StreamsConfig.APPLICATION_ID_CONFIG] = applicationName
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String()::class.java
props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = genericAvroRecordSerde::class.java
// stream builder 설정
val builder = StreamsBuilder()
val productStream: KStream<String, GenericRecord> = builder.stream(productStreamTopic)
productStream.filter { key, value ->
val operationType = value.get("operationType")
operationType.toString() == "insert" || operationType.toString() == "replace" || operationType.toString() == "update"
}.to(upsertProductTopic)
productStream.filter { key, value ->
val operationType = value.get("operationType")
operationType.toString() == "delete"
}.to(deleteProductTopic)
// topology 구성
val streams = KafkaStreams(builder.build(), props)
// start
streams.start()
}
GenericRecord
는 File/String 기반의 Schema 에서 Avro Object를 생성하는 것을 말한다. 이 방법은 runtime 에서 실패할 수 있기 때문에 사용에서는 추천되는 방법은 아니지만, 쉽게 사용 할 수 있는 장점이 있다. 이 단점을 보완하려면 SpecificRecord
를 써서 구현해야한다.
{"_id": "{\"_data\": \"8264200CC6000000012B022C0100296E5A100448D3A76BAF3249C2ABF0BA0B98ABFFA3463C5F6964003C61626A6A636B6B646A6A6B686B000004\"}", "operationType": "insert", "fullDocument": "{\"_id\": \"abjjckkdjjkhk\", \"mallId\": \"애플jk\", \"title\": \"매ㄱ북sjjdasd\", \"content\": \"aaa\", \"price\": \"1000\", \"imageUrl\": \"http://aa.com/imgae\", \"_class\": \"com.example.event.productcollector.api.product.domain.Product\"}", "ns": {"db": "example-stream", "coll": "product"}, "to": null, "documentKey": "{\"_id\": \"abjjckkdjjkhk\"}", "updateDescription": null, "clusterTime": "{\"$timestamp\": {\"t\": 1679822022, \"i\": 1}}", "txnNumber": null, "lsid": null}
GenericRecord
로 받은 value
를 println
으로 찍어 확인해보면 이렇게 의도대로 잘 나오는 것을 확인 가능하다.
게시글 2개에 걸쳐서 전반적으로 Kafka Connect
, Kafka Streams
, Schema Registry
, Avro Schema
등을 사용해보며 카프카에 대해 전반적인 기술 습득을 할 수 있었으며 CDC
, 이벤트 스트리밍 아키텍처
에 대해 이해하게 되었다.
https://www.mongodb.com/docs/kafka-connector/current/introduction/converters/