debezium에서 만든 이미지를 사용했다. 환경 변수 설정하는 데서 시간을 오래 잡아먹었는데 처음에는 s CONNECT_를 앞에 붙여야 하는 줄 알았는데 알고 보니 빼고 써야 했다.
statefulset으로 구성을 했다.
bootstrap_servers -> 여기에 자신이 구축한 카프카 브로커 서버를 써주면 된다.
apiVersion: v1
kind: Service
metadata:
name: connect-service
namespace: debezium
labels:
name: connect
spec:
ports:
- port: 8083
name: debezium-port
protocol: TCP
nodePort: 30651
selector:
app: connect
type: LoadBalancer
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: connect
namespace: debezium
spec:
selector:
matchLabels:
app: connect
serviceName: connect-service
replicas: 2
updateStrategy:
type: RollingUpdate
podManagementPolicy: OrderedReady
template:
metadata:
labels:
app: connect
spec:
restartPolicy: Always
containers:
- name: kafka-connect
image: debezium/connect
resources:
requests:
cpu: "1"
memory: 1Gi
# limits:
# cpu: "2"
# memory: 1Gi
ports:
- containerPort: 8083
name: connect
- containerPort: 3306
name: sql
volumeMounts:
- name: connect
mountPath: var/lib/plugin
env:
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: BOOTSTRAP_SERVERS
value: kafka-broker-0.kafka-service.default.svc.cluster.local:9092 ,kafka-broker-1.kafka-service.default.svc.cluster.local:9092,kafka-broker-2.kafka-service.default.svc.cluster.local:9092
- name: GROUP_ID
value: connect
- name: CONFIG_STORAGE_TOPIC
value: configs
- name: CONFIG_STORAGE_REPLICATION_FACTOR
value: "1"
- name: OFFSET_STORAGE_TOPIC
value: offsets
- name: OFFSET_STORATE_REPLICATION_FACTOR
value: "1"
- name: STATUS_STORAGE_TOPIC
value: statuses
- name: STATUS_STORAGE_REPLICATION_FACTOR
value: "1"
- name: KEY_CONVERTER
value: org.apache.kafka.connect.storage.StringConverter
- name: VALUE_CONVERTER
value: org.apache.kafka.connect.converters.ByteArrayConverter
- name: INTERNAL_KEY_CONVERTER
value: org.apache.kafka.connect.json.JsonConverter
- name: INTERNAL_VALUE_CONVERTER
value: org.apache.kafka.connect.json.JsonConverter
- name: REST_ADVERTISED_HOST_NAME
value: $(MY_POD_IP)
- name: REPLICATION_FACTOR
value: "1"
- name: REST_ADVERTISED_LISTENER
value: PLAINTEXT://$(MY_POD_IP)
volumeClaimTemplates:
- metadata:
name: connect
spec:
accessModes: ["ReadWriteOnce"]
storageClassName: "kafka-debezium"
resources:
requests:
storage: 10Gi
pv이름은 본인 자유다
apiVersion: v1
kind: PersistentVolume
metadata:
name: connect-0
spec:
capacity:
storage: 10Gi
accessModes:
- ReadWriteOnce
persistentVolumeReclaimPolicy: Delete
storageClassName: "kafka-debezium"
hostPath:
path: /data/
type: DirectoryOrCreate
debezium 이미지 안에는 debezium 커넥터가 포함되어 있기 때문에 따로 다운로드하지 않아도 된다.
테스트해보니 rest 요청은
외부에서는 퍼블릭 IP:nodeport
내부에서는 포드 IP:debezium port, 노드 private IP:nodeport