
Schema Registry는 데이터 형식(스키마)을 중앙에서 저장하고 관리하는 서비스입니다.
프로듀서와 컨슈머 모두 Schema Registry에 등록된 스키마를 사용하면 데이터 형식이 변경되더라도 일관성과 호환성을 유지할 수 있습니다.
주문개발팀에서 배송개발팀으로 주문에 대한 데이터를 카프카로 전달하고 있다고 가정해 봅시다. Schema Registry를 사용하지 않아 데이터에 대한 정의는 각자의 모듈에 클래스로 구현되어 있는 상태입니다.
기존에 주문개발팀은 구매자 아이디라는 값을 buyerId라는 변수로 정의해 토픽에 데이터를 적재하고 있고, 배송개발팀도 여기에 맞춰 로직이 구현되어 있습니다.
어느날 주문개발팀에서 구매자 아이디라는 값을 purchaserId로 변경하면 어떻게 될까요? 기존의 배송개발팀 시스템은 에러가 발생하며, 배송개발팀은 문제를 해결하기 위해 코드를 변경해야 합니다.
Schema Registry는 이러한 문제를 방지합니다. 주문개발팀과 배송개발팀이 각자 데이터를 정의하지 않고 중앙에 저장된 스키마를 동일하게 바라봄으로써 위와 같은 문제가 발생하지 않게 됩니다.
서비스를 운영하면서 데이터 형식은 변하기 마련이고 Schema Registry는 이러한 변화를 관리하기 위한 목적으로 만들어졌습니다. 각각의 스키마는 변화에 대한 버전을 가지며, Schema Registry는 compatibility type라는 옵션을 통해 이러한 변화를 어떻게 처리할지 정의하고 있습니다.
| Compatibility Type | 허용되는 변경 사항 | 비교 대상 스키마 | 먼저 업그레이드할 대상 |
|---|---|---|---|
| BACKWARD | 필드 삭제, 선택적 필드 추가 | 마지막 버전 | 컨슈머 |
| BACKWARD_TRANSITIVE | 필드 삭제, 선택적 필드 추가 | 모든 이전 버전 | 컨슈머 |
| FORWARD | 필드 추가, 선택적 필드 삭제 | 마지막 버전 | 프로듀서 |
| FORWARD_TRANSITIVE | 필드 추가, 선택적 필드 삭제 | 모든 이전 버전 | 프로듀서 |
| FULL | 선택적 필드 추가, 선택적 필드 삭제 | 마지막 버전 | 순서 상관없음 |
| FULL_TRANSITIVE | 선택적 필드 추가, 선택적 필드 삭제 | 모든 이전 버전 | 순서 상관없음 |
| NONE | 모든 변경 허용 | 호환성 검사 비활성화 | 상황에 따라 다름 |
컨플루언트는 기본적으로 스키마 레지스트리를 사용함.
스키마 레지스트리만 세팅하는 게 아니라 컨플루언트 전체를 실행해 스키마 레지스트리를 실습해보겠다.
Confluent 세팅에 대한 자세한 방법은 Confluent 공식문서에서 확인하실 수 있습니다.
Docker Desktop을 설치한 뒤 docker-compose.yml파일을 실행해 주면 됩니다. Confluent에서 제공하는 docker-compose.yml파일을 다운받은 뒤 아래 명령어로 해당 파일을 실행합니다.
docker compose up -d
설치가 완료되면 아래와 같이 여러 컨테이너들이 실행되어 있는걸 확인하실 수 있습니다.

Confluent 세팅이 완료됐다면 localhost:9021에 접속했을 때 아래와 같은 화면이 보이게 됩니다.

이 예제는 공식문서의 예제를 참고했으며, 이전 글인 Avro 간단 실습의 내용을 활용합니다.
토픽을 사이에 두고 avro로 선언한 데이터를 주고받아 봅시다. 이때 Confluent SchemaRegistry에 avro타입을 저장하고 이를 활용합니다.
transactions토픽에 avro로 선언한 Payment형태의 데이터를 전달합니다.
@Configuration
public class KafkaProducerConfig {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String SCHEMA_REGISTRY_URL = "http://localhost:8081";
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
configProps.put("schema.registry.url", SCHEMA_REGISTRY_URL);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
{
"namespace": "io.confluent.examples.clients.basicavro",
"type": "record",
"name": "Payment",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
@Component
@RequiredArgsConstructor
public class MyProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
private static final String TRANSACTION_TOPIC = "transactions";
public void sendMessage(String id, Double amount) {
Payment payment = new Payment(id,amount);
kafkaTemplate.send(TRANSACTION_TOPIC, payment);
}
}
@RestController
@RequiredArgsConstructor
public class ProducerController {
private final MyProducer myProducer;
@GetMapping("/transactions/{id}/{amount}")
public void produce(@PathVariable("id") String id, @PathVariable("amount") Double amount) {
myProducer.sendMessage(id,amount);
}
}
http://localhost:8080/transactions/1/100를 호출합니다.
transactions라는 토픽이 생성됐습니다.

transactions토픽은 우리가 정의한 Payment avro형태로 스키마가 정의되어 있습니다.

http://localhost:8081/schemas를 통해 스키마 레지스트리에 Payment가 등록됐음을 확인할 수 있습니다.

transactions토픽의 데이터를 읽어옵니다.
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String SCHEMA_REGISTRY_URL = "http://localhost:8081";
private static final String GROUP_ID = "transaction-group";
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
configProps.put("schema.registry.url", SCHEMA_REGISTRY_URL);
configProps.put("specific.avro.reader", false);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Consumer에서 스키마 레지스트리에 등록된 스키마 정보를 가져오기 위해 SchemaRegistryClient를 사용할 예정입니다. Consumer에 적용하기 전에 SchemaRegistryClient의 기능을 간단히 살펴봅시다.
@SpringBootTest
class MyConsumerTest {
private static final String SCHEMA_REGISTRY_URL = "http://localhost:8081";
private final SchemaRegistryClient schemaRegistryClient =
new CachedSchemaRegistryClient(SCHEMA_REGISTRY_URL, 10);
@Test
@DisplayName("스키마 레지시트리에 등록된 스키마이름을 모두 가져옵니다")
void getAllSubjects() throws Exception{
Collection<String> allSchemaNames = schemaRegistryClient.getAllSubjects();
for (String schema : allSchemaNames) {
System.out.println("schema = " + schema); // allSubject = transactions-value
}
}
@Test
@DisplayName("이름과 버전을 통해 특정 스키마를 찾습니다")
void getByVersion(){
Schema schema = schemaRegistryClient.getByVersion("transactions-value", 1, false);
System.out.println("schema = " + schema);
}
@Test
@DisplayName("이름을 통해 가장 최신 버전의 스키마를 찾습니다")
void getLatestSchemaMetadata() throws Exception{
SchemaMetadata latestSchemaMetadata = schemaRegistryClient.getLatestSchemaMetadata("transactions-value");
String schema = latestSchemaMetadata.getSchema();
System.out.println("schema = " + schema);
}
}
@Component
public class MyConsumer {
private static final String TRANSACTION_TOPIC = "transactions";
private static final String SCHEMA_REGISTRY_URL = "http://localhost:8081";
private final SchemaRegistryClient schemaRegistryClient =
new CachedSchemaRegistryClient(SCHEMA_REGISTRY_URL, 10);
@KafkaListener(topics = TRANSACTION_TOPIC)
public void consume(ConsumerRecord<String,GenericRecord> record) throws Exception{
GenericRecord value = record.value();
Schema schema = schemaRegistryClient.getByVersion("transactions-value", 1, false);
List<String> fields = parseJsonAndGetFieldNames(schema.getSchema());
System.out.println("========");
for (String field : fields) {
System.out.println(field + ": " + value.get(field));
}
System.out.println("========");
}
private List<String> parseJsonAndGetFieldNames(String schema) throws Exception{
ObjectMapper objectMapper = new ObjectMapper();
JsonNode schemaNode = objectMapper.readTree(schema);
JsonNode fieldsNode = schemaNode.path("fields");
List<String> fieldNames = new ArrayList<>();
for (JsonNode field : fieldsNode) {
String fieldName = field.path("name").asText();
fieldNames.add(fieldName);
}
return fieldNames;
}
}
