EKS 클러스터 환경에서 Kafka 클러스터를 구성하기 위해서는 단일 카프카 POD를 구성할 때와 많은 차이점이 있었다.
이에 대해 정리가 필요해 작성했다.
카프카 클러스터는 3개의 Zookeeper와 4개의 Kafka Borker가 필요하다.
Zookeeper는 서로의 존재를 네트워크상에서 알고 있어야 한다.
Kafka Borker ID는 고유해야 한다.
Kafka Broker는 서로 통신할 수 있는 규약을 정의해주어야한다.
Kafka Client는 Kafka Broker들의 주소를 모두 알고 있어야 한다.
위 요구사항을 모두 만족시키기 위해서 카프카 브로커 Deployment와 서비스를 1:1로 맵핑시켜야 Advertised_listener, 즉, 통신 리스너의 선점 문제를 해결할 수 있었고, 클러스터가 정상 작동할 수 있었다.
PLAINTEXT://
: 보안 없이 평문 통신을 하는 프로토콜이다. 내부 네트워크에서 Kafka Client와 통신할 때 사용한다.
INTERNAL://
: 브로커 내부에서 사용하는 리스너이다. 리더와 팔로워 통신에 사용된다.
apiVersion: v1
kind: Service
metadata:
name: zk-hs
labels:
app: zk
spec:
ports:
- port: 2888
name: server
- port: 3888
name: leader-election
selector:
app: zk
---
apiVersion: v1
kind: Service
metadata:
name: zk-cs
labels:
app: zk
spec:
ports:
- port: 2181
name: client
selector:
app: zk
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: zk-pdb
spec:
selector:
matchLabels:
app: zk
maxUnavailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: zookeeper
spec:
selector:
matchLabels:
app: zk
serviceName: zk-hs
replicas: 3
updateStrategy:
type: RollingUpdate
podManagementPolicy: OrderedReady
template:
metadata:
labels:
app: zk
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- zk
topologyKey: "kubernetes.io/hostname"
containers:
- name: kubernetes-zookeeper
imagePullPolicy: Always
image: "registry.k8s.io/kubernetes-zookeeper:1.0-3.4.10"
ports:
- containerPort: 2181
name: client
- containerPort: 2888
name: server
- containerPort: 3888
name: leader-election
command:
- sh
- -c
- "start-zookeeper \
--servers=3 \
--data_dir=/var/lib/zookeeper/data \
--data_log_dir=/var/lib/zookeeper/data/log \
--conf_dir=/opt/zookeeper/conf \
--client_port=2181 \
--election_port=3888 \
--server_port=2888 \
--tick_time=2000 \
--init_limit=10 \
--sync_limit=5 \
--heap=512M \
--max_client_cnxns=60 \
--snap_retain_count=3 \
--purge_interval=12 \
--max_session_timeout=40000 \
--min_session_timeout=4000 \
--log_level=INFO"
readinessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2181"
initialDelaySeconds: 10
timeoutSeconds: 5
livenessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2181"
initialDelaySeconds: 10
timeoutSeconds: 5
volumeMounts:
- name: datadir
mountPath: /var
volumes:
- name: datadir
persistentVolumeClaim:
claimName: zk-prod-pvc
securityContext:
runAsUser: 1000
fsGroup: 1000
주키퍼를 Deployment로 관리하려했지만, zookeeper 클러스터 내부에서 2888포트로 통신이 안되는 트러블과, 서비스 네임을 통한 호스트 이름을 찾을 수 없다는 트러블이 발생했고, 이에 따라서 많은 트러블 슈팅을 해봤으나 ( POD 네트워크에서 2888 포트 curl, service-name driven host curl 등등 ) 결론은 zookeeper를 start시킬 sh 명령어가 없는것이 문제라는 것을 냈다.
그래서 공식문서를 참조해 StatefulSet을 사용해 Zookeeper를 배포했고 볼륨을 할당해주었다.
K8S Zookeeper 공식문서
Zookeeper Stateful Set
브로커는 zk-cs-test 서비스를 사용하면 되고, 주키퍼끼리는 zk-hs-test를 통해 내부 리더를 선별하는 통신을 한다.
여기서 문제가 발생했다.
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- zk
topologyKey: "kubernetes.io/hostname"
Affinity
여기에서 requiredDuringSchedulingIgnoredDuringExecution
에 있는 topologyKet에 hostname이 고유한 hostname, 즉 한 번 배포된 node에는 배포되지 않는다는 제약조건을 걸어놨고, 프로젝트에서 사용하는 클러스터에는 node가 2개였기 때문에 마지막 zookeeper pod가 pending상태에서 벗어나지 못했다.
이에 따라 node가 2개인 환경에서도 가능할까 하여 preferred로 바꿔보려 했으나 attribute값이 완전히 달라 node의 수를 3개로 해서 배포했다.
주키퍼의 배포 개수를 2개로 줄이면 되지 않냐는 말도 있었지만 Zookeeper cluster는 고가용성을 위해 반드시 홀수개로 구성해야한다.
공식문서에서 hostname을 topologyKey로 준 이유도 역시 node가 죽었을 때 해당 node에 Zookeeper POD가 2개 띄어진 상황이 된다면 kafka cluster역시 바로 죽어버리기 때문에 고가용성을 위함이라는 정보를 얻게 됐다.
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-deployment-0
labels:
app: kafka-0
spec:
replicas: 1
selector:
matchLabels:
app: kafka-0
template:
metadata:
labels:
app: kafka-0
spec:
containers:
- name: broker
image: confluentinc/cp-kafka:5.5.11
ports:
- containerPort: 9092
env:
- name: KAFKA_BROKER_ID
value: "0"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "3"
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
value: "2"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: INTERNAL
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "3"
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: "false"
- name: KAFKA_LISTENERS
value: INTERNAL://:9093,PLAINTEXT://:9092
- name: KAFKA_ADVERTISED_LISTENERS
value: INTERNAL://kafka-service-0:9093,PLAINTEXT://kafka-service-0:9092
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: INTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- name: KAFKA_ZOOKEEPER_CONNECT
value: zk-cs:2181
volumeMounts:
- name: kafka
mountPath: /var
volumes:
- name: kafka
persistentVolumeClaim:
claimName: kf-prod-pvc-1
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-deployment-1
labels:
app: kafka-1
spec:
replicas: 1
selector:
matchLabels:
app: kafka-1
template:
metadata:
labels:
app: kafka-1
spec:
containers:
- name: broker
image: confluentinc/cp-kafka:5.5.11
ports:
- containerPort: 9092
env:
- name: KAFKA_BROKER_ID
value: "0"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "3"
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
value: "2"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: INTERNAL
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "3"
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: "false"
- name: KAFKA_LISTENERS
value: INTERNAL://:9093,PLAINTEXT://:9092
- name: KAFKA_ADVERTISED_LISTENERS
value: INTERNAL://kafka-service-1:9093,PLAINTEXT://kafka-service-1:9092
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: INTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- name: KAFKA_ZOOKEEPER_CONNECT
value: zk-cs:2181
volumeMounts:
- name: kafka
mountPath: /var
volumes:
- name: kafka
persistentVolumeClaim:
claimName: kf-prod-pvc-2
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-deployment-2
labels:
app: kafka-2
spec:
replicas: 1
selector:
matchLabels:
app: kafka-2
template:
metadata:
labels:
app: kafka-2
spec:
containers:
- name: broker
image: confluentinc/cp-kafka:5.5.11
ports:
- containerPort: 9092
env:
- name: KAFKA_BROKER_ID
value: "0"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "3"
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
value: "2"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: INTERNAL
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "3"
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: "false"
- name: KAFKA_LISTENERS
value: INTERNAL://:9093,PLAINTEXT://:9092
- name: KAFKA_ADVERTISED_LISTENERS
value: INTERNAL://kafka-service-2:9093,PLAINTEXT://kafka-service-2:9092
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: INTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- name: KAFKA_ZOOKEEPER_CONNECT
value: zk-cs:2181
volumeMounts:
- name: kafka
mountPath: /var
volumes:
- name: kafka
persistentVolumeClaim:
claimName: kf-prod-pvc-3
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-deployment-3
labels:
app: kafka-3
spec:
replicas: 1
selector:
matchLabels:
app: kafka-3
template:
metadata:
labels:
app: kafka-3
spec:
containers:
- name: broker
image: confluentinc/cp-kafka:5.5.11
ports:
- containerPort: 9092
env:
- name: KAFKA_BROKER_ID
value: "0"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "3"
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
value: "2"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: INTERNAL
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "3"
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: "false"
- name: KAFKA_LISTENERS
value: INTERNAL://:9093,PLAINTEXT://:9092
- name: KAFKA_ADVERTISED_LISTENERS
value: INTERNAL://kafka-service-3:9093,PLAINTEXT://kafka-service-3:9092
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: INTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- name: KAFKA_ZOOKEEPER_CONNECT
value: zk-cs:2181
volumeMounts:
- name: kafka
mountPath: /var
volumes:
- name: kafka
persistentVolumeClaim:
claimName: kf-prod-pvc-4
---
apiVersion: v1
kind: Service
metadata:
name: kafka-service-0
spec:
selector:
app: kafka-0
ports:
- name: client-port
protocol: TCP
port: 9092
targetPort: 9092
- name: broker-port
protocol: TCP
port: 9093
targetPort: 9093
---
apiVersion: v1
kind: Service
metadata:
name: kafka-service-1
spec:
selector:
app: kafka-1
ports:
- name: client-port
protocol: TCP
port: 9092
targetPort: 9092
- name: broker-port
protocol: TCP
port: 9093
targetPort: 9093
---
apiVersion: v1
kind: Service
metadata:
name: kafka-service-2
spec:
selector:
app: kafka-2
ports:
- name: client-port
protocol: TCP
port: 9092
targetPort: 9092
- name: broker-port
protocol: TCP
port: 9093
targetPort: 9093
---
apiVersion: v1
kind: Service
metadata:
name: kafka-service-3
spec:
selector:
app: kafka-3
ports:
- name: client-port
protocol: TCP
port: 9092
targetPort: 9092
- name: broker-port
protocol: TCP
port: 9093
targetPort: 9093
K8S 환경에서 Deployment로 배포한 POD들의 IP는 계속 변할 수 있으므로 SERVICE를 통한 단일 진입점으로 각 POD들을 맵핑시켰다.
K8S 환경에서 서비스의 단일 진입점 도메인은 Service의 name으로 사용할 수 있다.
중요한 점은 kafka 브로커들이 하나의 클러스터에 한 번에 모이지 않는 경우가 발생할 수 있는데 ( bootstrap.servers 의 요소의 개수가 kafka 브로커의 수와 같지 않음 ) 단순 초기화 오류로 해당 kafka broker pod를 삭제하고 다시 띄우면된다. ( deployment 삭제가 아님을 주의하자 )
spring:
kafka:
bootstrap-servers: kafka-service-test-0:9092,kafka-service-test-1:9092,kafka-service-test-2:9092,kafka-service-test-3:9092
server:
port: 9000
mks와 kafka on eks의 차이가 어떤게 있나요? 비용면에서 차이가 큰가요?