Schema Registry

Tony·2023년 5월 8일

Kafka

목록 보기
1/1

ProtoBuf/Avro/Json 사용시 메세지 스키마를 중앙 집중적으로 관리할 수 있도록 해줍니다.

Gradle Plugin

  • com.github.imflog.kafka-schema-registry-gradle-plugin // schemaRegistry와의 통신 도구
  • com.github.davidmc24.gradle.plugin.avro // avro -> java 변환 도구

사용법은 해당 github README.MD를 참고하면 편합니다.

Subject Name

unique identifier for schema to retrieve from schema registry.

TopicNameStrategy

  • {topic-name}-value

RecordNameStrategy

  • {class-name}-value

TopicRecordNameStrategy

  • {topic-name}-{class-name}-value

How to do with subject name

  1. Set the correct subject name for the strategy you are willing to use.
  2. Configure the clients the strategy
    • key.subject.name.strategy
    • value.subject.name.strategy

Schema Register

Schema를 Schema Registry에 저장하면, 해당 schema unique ID와 버전 번호를 부여합니다. 버전 번호는 1부터 incremental하게 증가합니다. Schema를 업데이트하면, id 값이 새로 부여되고 버전 번호가 증가합니다.

How to register schema into schema registry

<command line>

jq '. | {schema: tojson}' src/main/avro/payment.avsc | \
curl -u API_KEY:API_SECRET \
	 -X POST http://localhost:8081/subjects/payment-value/versions \
     -H "Content-Type:application/json" \
     -d @-
<build.gradle>

register {
	subject('payment-value', src/main/avro/payment.avsc', 'AVRO')
}

./gradlew registerSchemasTask

How to do with Spring-kafka

Just put KafkaAvroDeserializer.class into Consumer and Producer Config

configs.put(ProducerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, <Deserializer_Class>);

Schema Compatibility

Type

BACKWARD

Consumers using the new version can read data produced with the previous version.
새로운 버전에 기존 필드값을 제거할 수 있고, 추가하려는 필드값에 기본값을 명시하면 필드 추가도 가능합니다. Consumer가 이전 버전의 Data를 볼 수 있는 모드이므로, Consumer부터 업데이트하고 Producer을 업데이트한다면 아무 문제없이 작동합니다.
예를 들어, 다음과 같이 변경할 수 있습니다.

Old version
{
	field1: Long,
    field2: Integer
}
New version
{
	field3: String default = "Kim",
    fiedl2: Integer
}

Producer가 old version을 사용하고, Consumer가 new version을 사용하고 있다고 가정해봅시다.
이때, Producer가 보낸 메세지에서는 field1이 사라지고 field3는 default로 설정된 상태로 역직렬화되어 Consumer가 메세지를 받습니다.

FORWARD

Consumers using the previous schema version can read data produced with the new version.
BACKWARD와는 반대입니다. Producer부터 새로운 버전의 Schema로 업데이트한 후에, Consumer를 업데이트해야합니다.
새로운 버전에 기존 필드값을 제거할 수 있고, 추가하려는 필드값에 기본값을 명시하면 필드 추가도 가능하다는 점은 BACKWARD와 같습니다.
Producer가 새로운 버전의 스키마를 사용하고, Consumer가 이전 버전의 스키마를 사용한다고 가정해봅시다. 메세지는 새로운 버전에서 제거된 필드는 기본값, 새로운 필드는 무시된 채로 역직렬화되어 Consumer에게 전달됩니다.

FULL

Both way around above.
양쪽으로 Compatibility가 보장되므로 업데이트 순서는 상관없습니다.

NONE

No compatibility check

BACKWARD_TRANSITIVE

Consumers using the new schema version can read data produced with "all" previous versions
BACKWARD와 원리가 같지만, "모든 이전" 버전에 작용하는 것이 다릅니다.

FORWARD_TRANSITIVE

Consumers using the old schema version can read data produced with all later versions
FORWARD와 원리가 같지만, "모든 이후" 버전에 작용하는 것이 다릅니다.

FULL_TRANSITIVE

Both way around of above two.

How to Set Compatibility Level

<build.gradle>

schemaRegistry {
	url = 'http://localhost:8081'
    
    config {
    	subject('payment-value', 'FORWARD')
    }
}
<Schema Registry REST command>

curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json"\
--data '{"compatibility": "FULL"}'\
http://localhost:8081:config/payment-value

How to Check a Schema for Compatibility

<build.gralde>

schemaRegistry {
	url = 'http://localhost:8081'
    
    compatibility {
    	subject('payment-value', src/main/avro/payment.avro, 'AVRO')
    }
}
jq '. | {schema: tojson}' src/main/avro/payment.avsc | \
curl -u API_KEY:API_SECRET \
	 -X POST http://localhost:8081/subjects/payment-value/version/latest \
     -H "Content-Type:application/json" \
     -d @-

jq는 커맨드라인 json 프로세서입니다. 위의 커맨드의 의미는
. : 입력값을 표준입력(stdin)에서 읽는다는 의미입니다.
| : 파이프라인입니다. 입력값을 다음 커맨드에 넘깁니다.
{schema: tojson} : 입력값이 tojson 함수를 통해 json으로 변경되어 key가 schema, value가 변환된 값으로 설정되어 json이 만들어집니다.
src/main/avro/payment.avsc : 해당 경로에 있는 입력 파일이 입력으로 주어집니다.

0개의 댓글