[Confluent] Transformation 을 활용한 message key 변경.

김지환·2023년 7월 16일
0

TL;DR

kafka는 message 의 key 값이 정해져 있지 않다면 round-robin 방식으로 각 파티션에 데이터들이 분배되게 된다. 이렇게 되면 데이터의 순서 보장이 되지 않을 수 있는데 이런 경우 message의 key를 지정해준다면, 해당 key 값에 해당하는 partition으로 분배되게 된다.

kafka connect 의 개념중 하나인 transformation을 사용해서 전달되는 record들의 key 값을 설정해줄 수 있는다. 이 밖에도 message의 schema를 제거하거나 predefined돼 있는 field를 제거하는 등의 작업을 통해서 message를 좀 더 간단하게 만들어 줄 수 있다. 뿐만 아니라 필요하다면 field를 추가해서 사용도 가능하다.

Transformation

각각의 제공자들 별로 다양한 transforms 기능들이 있다. 각 제공자들은 본인들의 connect에서 사용할 수 있는 transforms에 대해서 documentation 해놓고 사용법들을 기재해 놓고 있다.

보통 transforms ( 필자가 사용해본 connector configuration에 한해서 ) 들은 다음과 같은 방식으로 정의한다.

apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
  name: test-connector
  namespace: confluent
spec:
  class: io.confluent.connect.jdbc.JdbcSourceConnector
  ...
  transforms: "createKey,extractInt"
  transforms.createKey.type: "org.apache.kafka.connect.transforms.ValueToKey"
  transforms.createKey.fields: "id"
  transforms.extractInt.type: "org.apache.kafka.connect.transforms.ExtractField$Key"
  transforms.extractInt.field: "id"

transforms key 에 string + comma 조합으로 복수개의 transforms의 이름을 짓고 각 transforms 별로 속성을 설정해주는 방식이다.

아래는 debezium PostgresConnector의 예시이다.

  "transforms": "unwrap",
  "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.drop.tombstones":false,
  "transforms.unwrap.delete.handling.mode":"rewrite"

위와 같은 방식으로 내가 원하는 record를 만들 수가 있다.

org.apache.kafka.connect.transforms

자 그럼 서론에서 이야기했던 message key를 특정 field 의 value값으로 만드는 방법을 보자

record의 key 값을 설정하는 방법으로는 ValueToKey, ExtractField$Key 를 활용하면 된다.

  • ValueToKey: 해당 Fields 의 value 값을 message key 로 만든다.
  • ExtractField$Key: 아무 설정을 하지 않고 ValueToKey를 사용하면 Struct(blah) 와 같은 방식으로 key 값이 형성되게 된다. primitive type으로 설정을 하기 위해서는 해당 Struct 구조에서 value 값을 뽑아줘야하는데 그 때 해당 transformation을 사용한다.
apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
  name: test-connector
  namespace: confluent
spec:
  class: io.confluent.connect.jdbc.JdbcSourceConnector
  taskMax: 1
  configs:
    tasks.max: "1"
    name: postgresql-user-test
    connector.class: "io.confluent.connect.jdbc.JdbcSourceConnector"
    connection.user: "postgres"
    connection.password: "y4c9GPpnJT"
    connection.url: "jdbc:postgresql://postgresql.default.svc.cluster.local:5432/postgres"
    schema.pattern: "public"
    topic.prefix: "postgresql-01-"
    topic.creation.default.replication.factor: "-1"
    topic.creation.default.partitions: "1"
    min.insync.replicas: "1"
    table.whitelist: "user"
    poll.interval.ms: "10000"
    mode: "bulk"
    acks: "all"
    transforms: "createKey,extractInt"
    transforms.createKey.type: "org.apache.kafka.connect.transforms.ValueToKey"
    transforms.createKey.fields: "id"
    transforms.extractInt.type: "org.apache.kafka.connect.transforms.ExtractField$Key"
    transforms.extractInt.field: "id"
profile
Developer

0개의 댓글