//커스텀 플러그인
package com.example.kafka.connect.transforms;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public class Base64DecodingTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger log = LoggerFactory.getLogger(Base64DecodingTransformation.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public R apply(R record) {
if (record.value() == null) {
return record;
}
try {
// Step 1: Get the value as string
String value;
if (record.value() instanceof byte[]) {
value = new String((byte[]) record.value(), StandardCharsets.UTF_8);
log.info("Value converted from byte[]: {}", value);
} else if (record.value() instanceof String) {
value = (String) record.value();
log.info("Value is already string: {}", value);
} else {
throw new DataException("Unsupported value type: " + record.value().getClass());
}
// Step 2: Remove quotes and decode Base64
value = value.trim().replaceAll("^\"|\"$", "");
log.info("Value after removing quotes: {}", value);
byte[] decodedBytes = Base64.getDecoder().decode(value);
String decodedString = new String(decodedBytes, StandardCharsets.UTF_8);
log.info("Decoded string: {}", decodedString);
// Step 3: Parse JSON
JsonNode jsonNode = objectMapper.readTree(decodedString);
log.info("JSON content: {}", jsonNode.toString());
log.info("Has deviceId: {}", jsonNode.has("deviceId"));
log.info("DeviceId field: {}", jsonNode.get("deviceId"));
String id = jsonNode.get("id").asText();
String deviceId = jsonNode.get("deviceId").asText();
String data = decodedString; // 전체 JSON을 data 컬럼에 저장
log.info("Parsed values - id: {}, deviceId: {}", id, deviceId);
// Step 4: Create schema and struct
Schema schema = SchemaBuilder.struct()
.field("id", Schema.STRING_SCHEMA)
.field("deviceId", Schema.STRING_SCHEMA)
.field("data", Schema.STRING_SCHEMA)
.build();
Struct struct = new Struct(schema)
.put("id", id)
.put("deviceId", deviceId)
.put("data", data);
log.info("Created Struct: {}", struct);
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
schema,
struct,
record.timestamp()
);
} catch (Exception e) {
log.error("Error processing record", e);
throw new DataException("Failed to process record: " + e.getMessage(), e);
}
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
//pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.kafka.connect</groupId>
<artifactId>base64-decode-transform</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<kafka.version>3.7.0</kafka.version>
<jackson.version>2.12.7</jackson.version>
<slf4j.version>1.7.36</slf4j.version>
<log4j.version>2.20.0</log4j.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Kafka Connect API -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-transforms</artifactId>
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>
<!-- Jackson for JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Maven Compiler Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
</configuration>
</plugin>
<!-- Maven Shade Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<!-- Service Transformer for Kafka Connect Plugins -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/org.apache.kafka.connect.transforms.Transformation</resource>
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
<!-- Maven Clean Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
</plugins>
</build>
</project>
위의 java파일을 빌드한후, base64-decode-transform-1.0-SNAPSHOT.jar 로 만들어 아래 plugin 폴더에 넣습니다.
├── CHANGELOG.md
├── CONTRIBUTE.md
├── COPYRIGHT.txt
├── FastInfoset-1.2.16.jar
├── LICENSE-3rd-PARTIES.txt
├── LICENSE.txt
├── README.md
├── README_JA.md
├── README_KO.md
├── README_ZH.md
├── angus-activation-2.0.0.jar
├── annotations-2.22.12.jar
├── antlr4-runtime-4.10.1.jar
├── auth-2.22.12.jar
├── avro-1.11.4.jar
├── aws-core-2.22.12.jar
├── aws-json-protocol-2.22.12.jar
├── aws-msk-iam-auth-1.1.5.jar
├── base64-decode-transform-1.0-SNAPSHOT.jar
├── byte-buddy-1.14.11.jar
├── c3p0-0.9.5.5.jar
├── caffeine-2.9.3.jar
├── checksums-2.22.12.jar
├── checksums-spi-2.22.12.jar
├── classmate-1.5.1.jar
├── cloudevents-json-jackson-2.2.0.jar
├── connect-api-3.7.0.jar
├── connect-file-3.7.0.jar
├── connect-json-3.7.0.jar
├── connect-runtime-3.7.0.jar
├── connect-transforms-3.7.0.jar
├── debezium-api-3.0.6.Final.jar
├── debezium-connector-jdbc-3.0.6.Final.jar
├── debezium-core-3.0.6.Final.jar
├── debezium-sink-3.0.6.Final.jar
├── endpoints-spi-2.22.12.jar
├── error_prone_annotations-2.10.0.jar
├── eventstream-1.0.1.jar
├── glue-2.22.12.jar
├── hibernate-c3p0-6.4.8.Final.jar
├── hibernate-commons-annotations-6.0.6.Final.jar
├── hibernate-core-6.4.8.Final.jar
├── http-auth-aws-2.22.12.jar
├── http-auth-spi-2.22.12.jar
├── http-client-spi-2.22.12.jar
├── identity-spi-2.22.12.jar
├── istack-commons-runtime-4.1.1.jar
├── jackson-annotations-2.12.7.jar
├── jackson-core-2.12.7.jar
├── jackson-databind-2.12.7.jar
├── jackson-datatype-jdk8-2.12.7.jar
├── jackson-datatype-jsr310-2.12.7.jar
├── jackson-module-jaxb-annotations-2.12.7.jar
├── jackson-module-scala_2.13-2.12.7.jar
├── jakarta.activation-api-2.1.0.jar
├── jakarta.inject-api-2.0.1.jar
├── jakarta.persistence-api-3.1.0.jar
├── jakarta.transaction-api-2.0.1.jar
├── jakarta.xml.bind-api-4.0.0.jar
├── jandex-3.1.2.jar
├── jaxb-core-4.0.2.jar
├── jaxb-runtime-4.0.2.jar
├── jboss-logging-3.5.0.Final.jar
├── jcl-over-slf4j-1.7.36.jar
├── jna-5.12.1.jar
├── jna-platform-5.12.1.jar
├── kafka
│ └── connect.jaas
├── kafka-clients-3.7.0.jar
├── kafka.textClipping
├── kafka_2.13-3.7.0.jar
├── log4j-api-2.20.0.jar
├── log4j-core-2.20.0.jar
├── mariadb-java-client-3.2.0.jar
├── mchange-commons-java-0.2.19.jar
├── ojdbc11-21.15.0.0.jar
├── paranamer-2.8.jar
├── profiles-2.22.12.jar
├── protocol-core-2.22.12.jar
├── regions-2.22.12.jar
├── scala-library-2.13.5.jar
├── scala-reflect-2.13.5.jar
├── schema-registry-build-tools-1.1.22.jar
├── schema-registry-common-1.1.22.jar
├── schema-registry-kafkaconnect-converter-1.1.22.jar
├── schema-registry-serde-1.1.22.jar
├── slf4j-api-1.7.36.jar
├── slf4j-reload4j-1.7.36.jar
├── stax-ex-2.1.0.jar
├── sts-2.12.0.jar
├── txw2-4.0.2.jar
├── utils-2.22.12.jar
└── waffle-jna-3.2.0.jar
위의 플로그인을 zip 으로 저장한다음, s3에 업로드 합니다.
이후, MSK 에서 사용자 지정 플러그인으로 구성합니다.
//byte-worker
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter.schemas.enable=false
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.com.example.kafka.connect.transforms.Base64DecodingTransformation=DEBUG
//mirror-worker
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# 커넥터 클래스: MirrorMaker 2.0의 소스 커넥터
connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
# 타겟 클러스터에서 키 변환 시 스키마 사용 안 함
target.key.converter.schemas.enable=false
# 복제 시 사용되는 Kafka topic의 replication factor
replication.factor=2
# 소스 클러스터에서 값 변환 시 스키마 사용 안 함
source.value.converter.schemas.enable=false
# 타겟 클러스터 SASL 인증 설정 (JAAS)
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="dotsenergy" password="!dotsenergy";
# 자동 토픽 생성 허용
auto.create.topics.enable=true
# ACL 동기화 비활성화
sync.topic.acls.enabled=false
# 병렬 태스크 수 (보통 1로 설정함)
tasks.max=1
# 복제 정책 클래스 설정 (기본은 소스 이름 접두사 사용)
replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy
# 소스 클러스터의 별칭
source.cluster.alias=source
# 소스 클러스터에서 값 변환기 설정
source.value.converter=org.apache.kafka.connect.storage.StringConverter
# 최소 ISR 설정
min.insync.replicas=1
# 타겟 클러스터에서 값 변환 시 스키마 사용 안 함
target.value.converter.schemas.enable=false
# 프로듀서 ACK 설정 (1이면 리더만 응답)
producer.acks=1
# 타겟 클러스터의 보안 프로토콜
target.cluster.security.protocol=SASL_SSL
# 상태 저장 주제의 replication factor
status.storage.replication.factor=2
# 소스와 타겟 이름을 구분할 구분자
replication.policy.separator=.
# 구성 변경 시 동작: restart
config.action.reload=restart
# 타겟 클러스터에서 자동 토픽 생성 허용
target.cluster.allow.auto.create.topics=true
# 파티션 수 기본 설정
num.partitions=1
# 소스 클러스터 파티션 수 기본 설정
source.cluster.partitions=1
# 타겟 클러스터의 SASL 메커니즘 설정
target.cluster.sasl.mechanism=SCRAM-SHA-512
# 복제 대상 토픽 목록 (예: topic1,topic2 형식으로 설정)
topics={복제할_토픽_리스트_예:topic1,topic2}
# 타겟 클러스터에서 키 변환기 설정
target.key.converter=org.apache.kafka.connect.storage.StringConverter
# 토픽 목록 주기적 새로고침 활성화
refresh.topics.enabled=true
# 타겟 클러스터에서 값 변환기 설정
target.value.converter=org.apache.kafka.connect.storage.StringConverter
# 소스 클러스터에서 키 변환 시 스키마 사용 안 함
source.key.converter.schemas.enable=false
# 최소 ISR 수를 저장에 대해서도 설정
min.insync.replicas.storage=1
# 소스 클러스터 SASL 인증 설정 (JAAS)
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="dotsenergy" password="!dotsenergy";
# admin 상태 저장 topic의 replication factor
admin.target.replication.factor=2
# 소스 클러스터 부트스트랩 서버 목록 (예: broker1:9096,broker2:9096)
source.cluster.bootstrap.servers={소스_클러스터_브로커_주소_리스트_예:broker1:9096,broker2:9096}
# 소스 클러스터 SASL 메커니즘
source.cluster.sasl.mechanism=SCRAM-SHA-512
# config 저장 topic의 replication factor
config.storage.replication.factor=2
# 타겟 클러스터의 별칭
target.cluster.alias=target
# 타겟 클러스터 부트스트랩 서버 목록 (예: broker1:9096,broker2:9096)
target.cluster.bootstrap.servers={타겟_클러스터_브로커_주소_리스트_예:broker1:9096,broker2:9096}
# offset 저장 topic의 replication factor
offset.storage.replication.factor=2
# group offset 동기화 시의 replication factor
sync.group.offsets.storage.replication.factor=2
# 소스 클러스터에서 키 변환기 설정
source.key.converter=org.apache.kafka.connect.storage.StringConverter
# 토픽 설정도 동기화 (retention, cleanup 등)
sync.topic.configs.enabled=true
# 타겟 클러스터의 파티션 수 기본 설정
target.cluster.partitions=1
# 소스 클러스터의 보안 프로토콜
source.cluster.security.protocol=SASL_SSL
해당 커넥터에 이전에 만든 플러그인 및 mirror-worker (작업자) 를 넣습니다.
# JDBC Sink Connector 클래스 (Debezium JDBC Connector 사용)
connector.class=io.debezium.connector.jdbc.JdbcSinkConnector
# 오류 메시지를 로그에 포함
errors.log.include.messages=true
# DB 비밀번호 (예: '!yourPassword')
connection.password={!DB_비밀번호_예:!dotsenergy}
# 자동 토픽 생성 허용
auto.create.topics.enable=true
# 커넥터 태스크 수 (1개)
tasks.max=1
# 변환기(transform) 목록: base64 디코딩 후 라우팅
transforms=decodeBase64,routeTable
# DB에서 식별자(컬럼명 등)를 인용(quote) 처리
quote.identifiers=true
# DLQ(Dead Letter Queue)에서 Key Converter의 스키마 비활성화
errors.deadletterqueue.key.converter.schemas.enable=false
# DLQ 메시지에 컨텍스트 헤더 포함
errors.deadletterqueue.context.headers.enable=true
# 테이블 라우팅 시 토픽 이름을 사용해 테이블명을 재구성
transforms.routeTable.replacement=device_data_$1_$2_$3
# 스키마 자동 진화 비활성화 (DB 컬럼 변경 무시)
auto.evolve=false
# DLQ 토픽의 replication factor
errors.deadletterqueue.topic.replication.factor=2
# Kafka 값 변환기: byte array (base64 디코딩 전 처리용)
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
# insert 모드 (기본적으로 INSERT 쿼리 수행)
insert.mode=insert
# 오류 발생 시 로그 기록 활성화
errors.log.enable=true
# Kafka 키 변환기 설정 (일반 문자열)
key.converter=org.apache.kafka.connect.storage.StringConverter
# DLQ에서 사용하는 값 변환기 (JSON 사용)
errors.deadletterqueue.value.converter=org.apache.kafka.connect.json.JsonConverter
# 라우팅 변환기 설정: Regex 기반 토픽 리네이밍
transforms.routeTable.type=org.apache.kafka.connect.transforms.RegexRouter
# 테이블명 포맷 설정 (토픽명을 그대로 사용)
table.name.format=${topic}
# 오류 발생 시 재시도 타임아웃 (ms 단위)
errors.retry.timeout=300000
# DLQ에서 사용하는 키 변환기
errors.deadletterqueue.key.converter=org.apache.kafka.connect.storage.StringConverter
# Kafka 토픽 목록 (복제된 토픽)
topics={토픽_리스트_예:source.I-qtm-guro,source.R-qtm-guro}
# DLQ에 컨텍스트 정보 포함
errors.deadletterqueue.context.enable=true
# DB 사용자명
connection.username={!DB_사용자명_예:dotsenergy}
# 라우팅 시 사용할 정규식 패턴
# 예: source.I-qtm-guro → device_data_I_qtm_guro
transforms.routeTable.regex=source\.(I|R)-(.*)-(.*)
# JDBC 드라이버 설정 (MariaDB)
connection.driver=org.mariadb.jdbc.Driver
# 오류 재시도 최대 대기 시간 (ms)
errors.retry.delay.max.ms=60000
# DB에 삽입할 필드만 화이트리스트 (컬럼 필터링)
fields.whitelist=id,deviceId,data
# 키 변환기의 스키마 비활성화
key.converter.schemas.enable=false
# DLQ 토픽 이름
errors.deadletterqueue.topic.name=qtm-dlq
# DLQ 값 변환기에서 스키마 비활성화
errors.deadletterqueue.value.converter.schemas.enable=false
# 값 변환기에서 스키마 비활성화
value.converter.schemas.enable=false
# 커스텀 Base64 디코딩 변환기 클래스 (직접 구현 필요)
transforms.decodeBase64.type=com.example.kafka.connect.transforms.Base64DecodingTransformation
# 오류 허용 수준 (all: 모든 오류 허용하고 진행)
errors.tolerance=all
# 테이블 자동 생성 비활성화
auto.create=false
# DB 연결 URL (MariaDB)
connection.url={JDBC_URL_예:jdbc:mariadb://your-db-host:3306/your-db-name}
해당 커넥터에 이전에 만든 플러그인 및 byte-worker (작업자) 를 넣습니다.
mariaDB
CREATE TABLE t_device_data_11 (
sequence_id BIGINT AUTO_INCREMENT PRIMARY KEY,
id VARCHAR(255) NOT NULL,
deviceId VARCHAR(255) NOT NULL,
data LONGTEXT,
event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);