Kafka & Strimzi Operator

유재혁·2023년 11월 17일
0

1. Kafka

1.1 Kafka 란?

카프카(Kafka)는 파이프라인, 스트리밍 분석, 데이터 통합 및 미션 크리티컬 애플리케이션을 위해 설계된 고성능 분산 이벤트 스트리밍 플랫폼이다. Pub-Sub 모델의 메시지 큐 형태로 동작하며 분산환경에 특화되어 있다. Fortune 100개 기업 중 80% 이상이 Kafka를 사용한다. 국내에서도 많이 사용하는 추세다.

1.2 Kafka가 생긴 이유?

비즈니스 소셜 네트워크 서비스인 링크드인 (linked-in) 에서 개발했다.
다음은 카프카 개발 전 링크드인의 데이터 처리 시스템이다.

1.3 기존 데이터 시스템의 문제점

각 애플리케이션과 DB가 end-to-end 로 연결되어 있고(각 파이프라인이 파편화 되어있음), 요구사항이 늘어남에 따라 데이터 시스템 복잡도가 높아지면서 다음과 같은 문제가 발생하게 되었다.

  1. 시스템 복잡도 증가 (Complexity)
  • 통합된 전송 영역이 없어 데이터 흐름을 파악하기 어렵고, 시스템 관리가 어려움
  • 특정 부분에서 장애 발생 시 조치 시간 증가 (=> 연결 되어있는 애플리케이션들을 모두 확인해야 하기 때문에)
  • HW 교체 / SW 업그레이드 시 관리포인트가 늘어나고, 작업시간 증가 (=> 연결된 애플리케이션에 side effect 가 없는지 확인해야 함)
  1. 데이터 파이프라인 관리의 어려움
  • 각 애플리케이션과 데이터 시스템 간의 별도의 파이프라인 존재하고, 파이프라인 마다 데이터 포맷과 처리 방식이 다름
  • 새로운 파이프라인 확장이 어려워지면서, 확장성 및 유연성이 떨어짐 또한 데이터 불일치 가능성이 있어 신뢰도 감소

이러한 문제를 해결하기 위해 새로운 시스템의 개발 필요성이 높아졌고, 다음과 같은 목표를 가지고 새로운 시스템을 개발했다.

모든 시스템으로 데이터를 전송할 수 있고, 실시간 처리도 가능하며, 급속도로 성장하는 서비스를 위해 확장이 용이한 시스템을 만들자!

1.4 Kafka를 적용한 링크드인의 데이터 처리 시스템


카프카를 적용함으로써 앞서 말했던 문제점들이 어느정도 완화되었다.

  • 모든 이벤트/데이터의 흐름을 중앙에서 관리할 수 있게 됨
  • 새로운 서비스/시스템이 추가되도 카프카가 제공하는 표준 포맷으로 연결하면 되므로 확장성과 신뢰성이 증가
  • 개발자는 각 서비스간의 연결이 아닌, 서비스들의 비즈니스 로직에 집중 가능

2. Strimzi Operator

2.1 Strimzi Operator 란?

Strimzi operator는 쿠버네티스 환경에서 카프카 설치와 운영을 단순화 했습니다. 쿠버네티스 crd를 이용하여 주키퍼, 카프카 클러스터를 설치할 수 있고 토픽 등도 crd로 관리합니다.

2.2 아키텍처

clouster operator는 각 컴퍼넌트를 crd로 배포하고 관리

  • cluster operator: kafaka cluster, zookeeper cluster 등 컴퍼넌트를 배포하고 관리
  • entity operator: user operator와 topic operator를 관리
  • topic operator: topic생성, 삭제 등 topic관리
  • user operator: kafaka 유저 관리
  • zookeeper cluster : 카프카의 메타데이터 관리 및 브로커의 정상 상태 점검
  • kafka cluster : 카프카 클러스터(여러 대 브로커 구성) 구성

2.3 설치

# 네임스페이스 생성
root@rook-01:~# kubectl create namespace kafka
namespace/kafka created

# Repo 추가
root@rook-01:~# helm repo add strimzi https://strimzi.io/charts/
"strimzi" has been added to your repositories
root@rook-01:~# helm show values strimzi/strimzi-kafka-operator
# Default values for strimzi-kafka-operator.

# Default replicas for the cluster operator
replicas: 1

# If you set `watchNamespaces` to the same value as ``.Release.Namespace` (e.g. `helm ... --namespace $NAMESPACE`),
# the chart will fail because duplicate RoleBindings will be attempted to be created in the same namespace
watchNamespaces: []
watchAnyNamespace: false

defaultImageRegistry: quay.io
defaultImageRepository: strimzi
defaultImageTag: 0.38.0

image:
  registry: ""
  repository: ""
  name: operator
  tag: ""
  # imagePullSecrets:
  #   - name: secretname
logVolume: co-config-volume
logConfigMap: strimzi-cluster-operator
logConfiguration: ""
logLevel: ${env:STRIMZI_LOG_LEVEL:-INFO}
fullReconciliationIntervalMs: 120000
operationTimeoutMs: 300000
kubernetesServiceDnsDomain: cluster.local
featureGates: ""
tmpDirSizeLimit: 1Mi

# Example on how to configure extraEnvs
# extraEnvs:
#   - name: JAVA_OPTS
#     value: "-Xms256m -Xmx256m"

extraEnvs: []

tolerations: []
affinity: {}
annotations: {}
labels: {}
nodeSelector: {}
priorityClassName: ""

podSecurityContext: {}
securityContext: {}
rbac:
  create: yes
serviceAccountCreate: yes
serviceAccount: strimzi-cluster-operator

leaderElection:
  enable: true

# If you are using the grafana dashboard sidecar,
# you can import some default dashboards here
dashboards:
  enabled: false
  namespace: ~
  label: grafana_dashboard # this is the default value from the grafana chart
  labelValue: "1" # this is the default value from the grafana chart
  annotations: {}
  extraLabels: {}

# Docker images that operator uses to provision various components of Strimzi. To use your own registry prefix the
# repository name with your registry URL.
# Ex) repository: registry.xyzcorp.com/strimzi/kafka
kafka:
  image:
    registry: ""
    repository: ""
    name: kafka
    tagPrefix: ""
kafkaConnect:
  image:
    registry: ""
    repository: ""
    name: kafka
    tagPrefix: ""
topicOperator:
  image:
    registry: ""
    repository: ""
    name: operator
    tag: ""
userOperator:
  image:
    registry:
    repository:
    name: operator
    tag: ""
kafkaInit:
  image:
    registry: ""
    repository: ""
    name: operator
    tag: ""
tlsSidecarEntityOperator:
  image:
    registry: ""
    repository: ""
    name: kafka
    tagPrefix: ""
kafkaMirrorMaker:
  image:
    registry: ""
    repository: ""
    name: kafka
    tagPrefix: ""
kafkaBridge:
  image:
    registry: ""
    repository:
    name: kafka-bridge
    tag: 0.27.0
kafkaExporter:
  image:
    registry: ""
    repository: ""
    name: kafka
    tagPrefix: ""
kafkaMirrorMaker2:
  image:
    registry: ""
    repository: ""
    name: kafka
    tagPrefix: ""
cruiseControl:
  image:
    registry: ""
    repository: ""
    name: kafka
    tagPrefix: ""
kanikoExecutor:
  image:
    registry: ""
    repository: ""
    name: kaniko-executor
    tag: ""
mavenBuilder:
  image:
    registry: ""
    repository: ""
    name: maven-builder
    tag: ""
resources:
  limits:
    memory: 384Mi
    cpu: 1000m
  requests:
    memory: 384Mi
    cpu: 200m
livenessProbe:
  initialDelaySeconds: 10
  periodSeconds: 30
readinessProbe:
  initialDelaySeconds: 10
  periodSeconds: 30

createGlobalResources: true
# Create clusterroles that extend existing clusterroles to interact with strimzi crds
# Ref: https://kubernetes.io/docs/reference/access-authn-authz/rbac/#aggregated-clusterroles
createAggregateRoles: false
# Override the exclude pattern for exclude some labels
labelsExclusionPattern: ""
# Controls whether Strimzi generates network policy resources (By default true)
generateNetworkPolicy: true
# Override the value for Connect build timeout
connectBuildTimeoutMs: 300000

# 차트 설치 : 오퍼레이터 파드 설치
root@rook-01:~# helm install kafka-operator strimzi/strimzi-kafka-operator --version 0.38.0 --namespace kafka
NAME: kafka-operator
LAST DEPLOYED: Fri Nov 17 20:44:07 2023
NAMESPACE: kafka
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Thank you for installing strimzi-kafka-operator-0.38.0

To create a Kafka cluster refer to the following documentation.

https://strimzi.io/docs/operators/latest/deploying.html#deploying-cluster-operator-helm-chart-str

# 배포한 리소스 확인 : Operator 디플로이먼트(파드)
root@rook-01:~# kubectl get deploy,pod -n kafka
NAME                                       READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/strimzi-cluster-operator   1/1     1            1           37s

NAME                                            READY   STATUS    RESTARTS   AGE
pod/strimzi-cluster-operator-5d6f48c6f9-k9d8t   1/1     Running   0          37s

# 오퍼레이터가 지원하는 카프카 버전 확인
root@rook-01:~# kubectl describe deploy -n kafka | grep KAFKA_IMAGES: -A3
      STRIMZI_KAFKA_IMAGES:                               3.5.0=quay.io/strimzi/kafka:0.38.0-kafka-3.5.0
                                                          3.5.1=quay.io/strimzi/kafka:0.38.0-kafka-3.5.1
                                                          3.6.0=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0

# 배포한 리소스 확인 : CRDs - 각각이 제공 기능으로 봐도됨!
root@rook-01:~# kubectl get crd | grep strimzi
kafkabridges.kafka.strimzi.io                         2023-11-17T11:44:06Z
kafkaconnectors.kafka.strimzi.io                      2023-11-17T11:44:06Z
kafkaconnects.kafka.strimzi.io                        2023-11-17T11:44:06Z
kafkamirrormaker2s.kafka.strimzi.io                   2023-11-17T11:44:06Z
kafkamirrormakers.kafka.strimzi.io                    2023-11-17T11:44:06Z
kafkanodepools.kafka.strimzi.io                       2023-11-17T11:44:06Z
kafkarebalances.kafka.strimzi.io                      2023-11-17T11:44:06Z
kafkas.kafka.strimzi.io                               2023-11-17T11:44:06Z
kafkatopics.kafka.strimzi.io                          2023-11-17T11:44:06Z
kafkausers.kafka.strimzi.io                           2023-11-17T11:44:06Z
strimzipodsets.core.strimzi.io                        2023-11-17T11:44:06Z

3. Kafka 실습

3.1 Kafka 클러스터 설치

# 카프카 클러스터 YAML 파일 확인 : listeners(3개), podAntiAffinity
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/kafka-1.yaml

root@rook-01:~/kafka# cat kafka-1.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 3.6.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: false
      - name: external
        port: 9094
        type: nodeport
        tls: false
    readinessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.6"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 10Gi
        deleteClaim: true
    template:
      pod:
        affinity:
          podAntiAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              - labelSelector:
                  matchExpressions:
                    - key: app.kubernetes.io/name
                      operator: In
                      values:
                        - kafka
                topologyKey: "topology.ebs.csi.aws.com/zone"

  zookeeper:
    replicas: 3
    readinessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: true
    template:
      pod:
        affinity:
          podAntiAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              - labelSelector:
                  matchExpressions:
                    - key: app.kubernetes.io/name
                      operator: In
                      values:
                        - zookeeper
                topologyKey: "topology.ebs.csi.aws.com/zone"

  entityOperator:
    topicOperator: {}
    userOperator: {}

# 카프카 클러스터 배포 : 카프카(브로커 3개), 주키퍼(3개), entityOperator 디플로이먼트
## 배포 시 requiredDuringSchedulingIgnoredDuringExecution 지원
root@rook-01:~/kafka# kubectl apply -f kafka-1.yaml -n kafka
kafka.kafka.strimzi.io/my-cluster created

# 배포된 리소스들 확인
root@rook-01:~/kafka# k get all -n kafka
NAME                                              READY   STATUS    RESTARTS   AGE
pod/my-cluster-entity-operator-75765b6dbb-6ttcx   3/3     Running   0          28s
pod/my-cluster-kafka-0                            1/1     Running   0          83s
pod/my-cluster-kafka-1                            1/1     Running   0          83s
pod/my-cluster-kafka-2                            1/1     Running   0          83s
pod/my-cluster-zookeeper-0                        1/1     Running   0          2m32s
pod/my-cluster-zookeeper-1                        1/1     Running   0          2m32s
pod/my-cluster-zookeeper-2                        1/1     Running   0          2m32s
pod/strimzi-cluster-operator-5d6f48c6f9-k9d8t     1/1     Running   0          5m30s

NAME                                          TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                                        AGE
service/my-cluster-kafka-0                    NodePort    10.103.49.87     <none>        9094:32141/TCP                                 85s
service/my-cluster-kafka-1                    NodePort    10.103.201.88    <none>        9094:32139/TCP                                 85s
service/my-cluster-kafka-2                    NodePort    10.111.53.95     <none>        9094:32140/TCP                                 85s
service/my-cluster-kafka-bootstrap            ClusterIP   10.105.188.180   <none>        9091/TCP,9092/TCP,9093/TCP                     85s
service/my-cluster-kafka-brokers              ClusterIP   None             <none>        9090/TCP,9091/TCP,8443/TCP,9092/TCP,9093/TCP   85s
service/my-cluster-kafka-external-bootstrap   NodePort    10.102.173.46    <none>        9094:32299/TCP                                 85s
service/my-cluster-zookeeper-client           ClusterIP   10.105.18.24     <none>        2181/TCP                                       2m34s
service/my-cluster-zookeeper-nodes            ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP                     2m34s

NAME                                         READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/my-cluster-entity-operator   1/1     1            1           28s
deployment.apps/strimzi-cluster-operator     1/1     1            1           5m30s

NAME                                                    DESIRED   CURRENT   READY   AGE
replicaset.apps/my-cluster-entity-operator-75765b6dbb   1         1         1       28s
replicaset.apps/strimzi-cluster-operator-5d6f48c6f9     1         1         1       5m30s


# 배포된 리소스 확인 : 주키퍼 설치 완료 후 >> 카프카 브로커 설치됨
root@rook-01:~/kafka# kubectl get kafka -n kafka
NAME         DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS   READY   WARNINGS
my-cluster   3                        3                     True
root@rook-01:~/kafka# kubectl get cm,secret -n kafka
NAME                                                DATA   AGE
configmap/kube-root-ca.crt                          1      6m45s
configmap/my-cluster-entity-topic-operator-config   1      53s
configmap/my-cluster-entity-user-operator-config    1      53s
configmap/my-cluster-kafka-0                        3      109s
configmap/my-cluster-kafka-1                        3      109s
configmap/my-cluster-kafka-2                        3      108s
configmap/my-cluster-zookeeper-config               2      2m58s
configmap/strimzi-cluster-operator                  1      5m55s

NAME                                            TYPE                 DATA   AGE
secret/my-cluster-clients-ca                    Opaque               1      3m
secret/my-cluster-clients-ca-cert               Opaque               3      3m
secret/my-cluster-cluster-ca                    Opaque               1      3m
secret/my-cluster-cluster-ca-cert               Opaque               3      3m
secret/my-cluster-cluster-operator-certs        Opaque               4      3m
secret/my-cluster-entity-topic-operator-certs   Opaque               4      53s
secret/my-cluster-entity-user-operator-certs    Opaque               4      53s
secret/my-cluster-kafka-brokers                 Opaque               12     109s
secret/my-cluster-zookeeper-nodes               Opaque               12     2m58s
secret/sh.helm.release.v1.kafka-operator.v1     helm.sh/release.v1   1      5m55s


# 배포된 리소스 확인 : 카프카/주키퍼 strimzipodsets 생성 확인 >> sts 스테이트풀렛 사용 X
root@rook-01:~/kafka# kubectl get strimzipodsets -n kafka
NAME                   PODS   READY PODS   CURRENT PODS   AGE
my-cluster-kafka       3      3            3              2m4s
my-cluster-zookeeper   3      3            3              3m14s

# 배포된 리소스 확인 : 서비스 Service(Headless) 등 생성 확인 - listeners(3개)
root@rook-01:~/kafka# kubectl get svc,endpointslice -n kafka
NAME                                          TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                                        AGE
service/my-cluster-kafka-0                    NodePort    10.103.49.87     <none>        9094:32141/TCP                                 2m17s
service/my-cluster-kafka-1                    NodePort    10.103.201.88    <none>        9094:32139/TCP                                 2m17s
service/my-cluster-kafka-2                    NodePort    10.111.53.95     <none>        9094:32140/TCP                                 2m17s
service/my-cluster-kafka-bootstrap            ClusterIP   10.105.188.180   <none>        9091/TCP,9092/TCP,9093/TCP                     2m17s
service/my-cluster-kafka-brokers              ClusterIP   None             <none>        9090/TCP,9091/TCP,8443/TCP,9092/TCP,9093/TCP   2m17s
service/my-cluster-kafka-external-bootstrap   NodePort    10.102.173.46    <none>        9094:32299/TCP                                 2m17s
service/my-cluster-zookeeper-client           ClusterIP   10.105.18.24     <none>        2181/TCP                                       3m26s
service/my-cluster-zookeeper-nodes            ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP                     3m26s

NAME                                                                       ADDRESSTYPE   PORTS                        ENDPOINTS                                      AGE
endpointslice.discovery.k8s.io/my-cluster-kafka-0-257vt                    IPv4          9094                         10.244.125.243                                 2m16s
endpointslice.discovery.k8s.io/my-cluster-kafka-1-ln88l                    IPv4          9094                         10.244.125.244                                 2m16s
endpointslice.discovery.k8s.io/my-cluster-kafka-2-mhlr4                    IPv4          9094                         10.244.125.245                                 2m16s
endpointslice.discovery.k8s.io/my-cluster-kafka-bootstrap-xjpbj            IPv4          9093,9091,9092               10.244.125.243,10.244.125.245,10.244.125.244   2m16s
endpointslice.discovery.k8s.io/my-cluster-kafka-brokers-845jb              IPv4          9093,9091,9090 + 2 more...   10.244.125.243,10.244.125.245,10.244.125.244   2m17s
endpointslice.discovery.k8s.io/my-cluster-kafka-external-bootstrap-whkl9   IPv4          9094                         10.244.125.243,10.244.125.245,10.244.125.244   2m16s
endpointslice.discovery.k8s.io/my-cluster-zookeeper-client-p49ng           IPv4          2181                         10.244.140.248,10.244.125.242,10.244.155.125   3m26s
endpointslice.discovery.k8s.io/my-cluster-zookeeper-nodes-k8fc2            IPv4          3888,2888,2181               10.244.140.248,10.244.125.242,10.244.155.125   3m26s

# kafka 클러스터 Listeners 정보 확인 : 각각 9092 평문, 9093 TLS, 세번째 정보는 External 접속 시 NodePort 정보
root@rook-01:~/kafka# kubectl get kafka -n kafka my-cluster -o jsonpath={.status.listeners} | jq
[
  {
    "addresses": [
      {
        "host": "my-cluster-kafka-bootstrap.kafka.svc",
        "port": 9092
      }
    ],
    "bootstrapServers": "my-cluster-kafka-bootstrap.kafka.svc:9092",
    "name": "plain"
  },
  {
    "addresses": [
      {
        "host": "my-cluster-kafka-bootstrap.kafka.svc",
        "port": 9093
      }
    ],
    "bootstrapServers": "my-cluster-kafka-bootstrap.kafka.svc:9093",
    "name": "tls"
  },
  {
    "addresses": [
      {
        "host": "172.19.187.145",
        "port": 32299
      }
    ],
    "bootstrapServers": "172.19.187.145:32299",
    "name": "external"
  }
]

3.2 테스트용 파드 생성 후 카프카 클러스터 정보 확인

# 파일 다운로드
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/myclient.yaml

VERSION=3.6 envsubst < myclient.yaml | kubectl apply -f -

root@rook-01:~/kafka# kubectl get pod -l name=kafkaclient -owide
NAME             READY   STATUS    RESTARTS   AGE   IP               NODE      NOMINATED NODE   READINESS GATES
myclient-9cn65   1/1     Running   0          31s   10.244.155.126   rook-01   <none>           <none>
myclient-gvhbs   1/1     Running   0          31s   10.244.140.251   rook-02   <none>           <none>
myclient-qpfzm   1/1     Running   0          31s   10.244.125.247   rook-03   <none>           <none>

# Kafka client 에서 제공되는 kafka 관련 도구들 확인
root@rook-01:~/kafka# kubectl exec -it ds/myclient -- ls /opt/bitnami/kafka/bin
connect-distributed.sh        kafka-metadata-quorum.sh
connect-mirror-maker.sh       kafka-metadata-shell.sh
connect-plugin-path.sh        kafka-mirror-maker.sh
connect-standalone.sh         kafka-producer-perf-test.sh
kafka-acls.sh                 kafka-reassign-partitions.sh
kafka-broker-api-versions.sh  kafka-replica-verification.sh
kafka-cluster.sh              kafka-run-class.sh
kafka-configs.sh              kafka-server-start.sh
kafka-console-consumer.sh     kafka-server-stop.sh
kafka-console-producer.sh     kafka-storage.sh
kafka-consumer-groups.sh      kafka-streams-application-reset.sh
kafka-consumer-perf-test.sh   kafka-topics.sh
kafka-delegation-tokens.sh    kafka-transactions.sh
kafka-delete-records.sh       kafka-verifiable-consumer.sh
kafka-dump-log.sh             kafka-verifiable-producer.sh
kafka-e2e-latency.sh          trogdor.sh
kafka-features.sh             windows
kafka-get-offsets.sh          zookeeper-security-migration.sh
kafka-jmx.sh                  zookeeper-server-start.sh
kafka-leader-election.sh      zookeeper-server-stop.sh
kafka-log-dirs.sh             zookeeper-shell.sh


# 카프카 파드의 SVC 도메인이름을 변수에 지정
root@rook-01:~/kafka# SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092
root@rook-01:~/kafka# echo "export SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092" >> /etc/profile

# 브로커 정보
root@rook-01:~/kafka# kubectl exec -it ds/myclient -- kafka-broker-api-versions.sh --bootstrap-server $SVCDNS
my-cluster-kafka-1.my-cluster-kafka-brokers.kafka.svc:9092 (id: 1 rack: null) -> (
        Produce(0): 0 to 9 [usable: 9],
        Fetch(1): 0 to 15 [usable: 15],
        ListOffsets(2): 0 to 8 [usable: 8],
        Metadata(3): 0 to 12 [usable: 12],
        LeaderAndIsr(4): 0 to 7 [usable: 7],
        StopReplica(5): 0 to 4 [usable: 4],
        UpdateMetadata(6): 0 to 8 [usable: 8],
        ControlledShutdown(7): 0 to 3 [usable: 3],
        OffsetCommit(8): 0 to 8 [usable: 8],
        OffsetFetch(9): 0 to 8 [usable: 8],
        FindCoordinator(10): 0 to 4 [usable: 4],
        JoinGroup(11): 0 to 9 [usable: 9],
        Heartbeat(12): 0 to 4 [usable: 4],
        LeaveGroup(13): 0 to 5 [usable: 5],
        SyncGroup(14): 0 to 5 [usable: 5],
        DescribeGroups(15): 0 to 5 [usable: 5],
        ListGroups(16): 0 to 4 [usable: 4],
        SaslHandshake(17): 0 to 1 [usable: 1],
        ApiVersions(18): 0 to 3 [usable: 3],
        CreateTopics(19): 0 to 7 [usable: 7],
        DeleteTopics(20): 0 to 6 [usable: 6],
        DeleteRecords(21): 0 to 2 [usable: 2],
        InitProducerId(22): 0 to 4 [usable: 4],
        OffsetForLeaderEpoch(23): 0 to 4 [usable: 4],
        AddPartitionsToTxn(24): 0 to 4 [usable: 4],
        AddOffsetsToTxn(25): 0 to 3 [usable: 3],
        EndTxn(26): 0 to 3 [usable: 3],
        WriteTxnMarkers(27): 0 to 1 [usable: 1],
        TxnOffsetCommit(28): 0 to 3 [usable: 3],
        DescribeAcls(29): 0 to 3 [usable: 3],
        CreateAcls(30): 0 to 3 [usable: 3],
        DeleteAcls(31): 0 to 3 [usable: 3],
        DescribeConfigs(32): 0 to 4 [usable: 4],
        AlterConfigs(33): 0 to 2 [usable: 2],
        AlterReplicaLogDirs(34): 0 to 2 [usable: 2],
        DescribeLogDirs(35): 0 to 4 [usable: 4],
        SaslAuthenticate(36): 0 to 2 [usable: 2],
        CreatePartitions(37): 0 to 3 [usable: 3],
        CreateDelegationToken(38): 0 to 3 [usable: 3],
        RenewDelegationToken(39): 0 to 2 [usable: 2],
        ExpireDelegationToken(40): 0 to 2 [usable: 2],
        DescribeDelegationToken(41): 0 to 3 [usable: 3],
        DeleteGroups(42): 0 to 2 [usable: 2],
        ElectLeaders(43): 0 to 2 [usable: 2],
        IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
        AlterPartitionReassignments(45): 0 [usable: 0],
        ListPartitionReassignments(46): 0 [usable: 0],
        OffsetDelete(47): 0 [usable: 0],
        DescribeClientQuotas(48): 0 to 1 [usable: 1],
        AlterClientQuotas(49): 0 to 1 [usable: 1],
        DescribeUserScramCredentials(50): 0 [usable: 0],
        AlterUserScramCredentials(51): 0 [usable: 0],
        DescribeQuorum(55): UNSUPPORTED,
        AlterPartition(56): 0 to 3 [usable: 3],
        UpdateFeatures(57): 0 to 1 [usable: 1],
        Envelope(58): 0 [usable: 0],
        DescribeCluster(60): 0 [usable: 0],
        DescribeProducers(61): 0 [usable: 0],
        UnregisterBroker(64): UNSUPPORTED,
        DescribeTransactions(65): 0 [usable: 0],
        ListTransactions(66): 0 [usable: 0],
        AllocateProducerIds(67): 0 [usable: 0],
        ConsumerGroupHeartbeat(68): UNSUPPORTED
)
my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9092 (id: 0 rack: null) -> (
        Produce(0): 0 to 9 [usable: 9],
        Fetch(1): 0 to 15 [usable: 15],
        ListOffsets(2): 0 to 8 [usable: 8],
        Metadata(3): 0 to 12 [usable: 12],
        LeaderAndIsr(4): 0 to 7 [usable: 7],
        StopReplica(5): 0 to 4 [usable: 4],
        UpdateMetadata(6): 0 to 8 [usable: 8],
        ControlledShutdown(7): 0 to 3 [usable: 3],
        OffsetCommit(8): 0 to 8 [usable: 8],
        OffsetFetch(9): 0 to 8 [usable: 8],
        FindCoordinator(10): 0 to 4 [usable: 4],
        JoinGroup(11): 0 to 9 [usable: 9],
        Heartbeat(12): 0 to 4 [usable: 4],
        LeaveGroup(13): 0 to 5 [usable: 5],
        SyncGroup(14): 0 to 5 [usable: 5],
        DescribeGroups(15): 0 to 5 [usable: 5],
        ListGroups(16): 0 to 4 [usable: 4],
        SaslHandshake(17): 0 to 1 [usable: 1],
        ApiVersions(18): 0 to 3 [usable: 3],
        CreateTopics(19): 0 to 7 [usable: 7],
        DeleteTopics(20): 0 to 6 [usable: 6],
        DeleteRecords(21): 0 to 2 [usable: 2],
        InitProducerId(22): 0 to 4 [usable: 4],
        OffsetForLeaderEpoch(23): 0 to 4 [usable: 4],
        AddPartitionsToTxn(24): 0 to 4 [usable: 4],
        AddOffsetsToTxn(25): 0 to 3 [usable: 3],
        EndTxn(26): 0 to 3 [usable: 3],
        WriteTxnMarkers(27): 0 to 1 [usable: 1],
        TxnOffsetCommit(28): 0 to 3 [usable: 3],
        DescribeAcls(29): 0 to 3 [usable: 3],
        CreateAcls(30): 0 to 3 [usable: 3],
        DeleteAcls(31): 0 to 3 [usable: 3],
        DescribeConfigs(32): 0 to 4 [usable: 4],
        AlterConfigs(33): 0 to 2 [usable: 2],
        AlterReplicaLogDirs(34): 0 to 2 [usable: 2],
        DescribeLogDirs(35): 0 to 4 [usable: 4],
        SaslAuthenticate(36): 0 to 2 [usable: 2],
        CreatePartitions(37): 0 to 3 [usable: 3],
        CreateDelegationToken(38): 0 to 3 [usable: 3],
        RenewDelegationToken(39): 0 to 2 [usable: 2],
        ExpireDelegationToken(40): 0 to 2 [usable: 2],
        DescribeDelegationToken(41): 0 to 3 [usable: 3],
        DeleteGroups(42): 0 to 2 [usable: 2],
        ElectLeaders(43): 0 to 2 [usable: 2],
        IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
        AlterPartitionReassignments(45): 0 [usable: 0],
        ListPartitionReassignments(46): 0 [usable: 0],
        OffsetDelete(47): 0 [usable: 0],
        DescribeClientQuotas(48): 0 to 1 [usable: 1],
        AlterClientQuotas(49): 0 to 1 [usable: 1],
        DescribeUserScramCredentials(50): 0 [usable: 0],
        AlterUserScramCredentials(51): 0 [usable: 0],
        DescribeQuorum(55): UNSUPPORTED,
        AlterPartition(56): 0 to 3 [usable: 3],
        UpdateFeatures(57): 0 to 1 [usable: 1],
        Envelope(58): 0 [usable: 0],
        DescribeCluster(60): 0 [usable: 0],
        DescribeProducers(61): 0 [usable: 0],
        UnregisterBroker(64): UNSUPPORTED,
        DescribeTransactions(65): 0 [usable: 0],
        ListTransactions(66): 0 [usable: 0],
        AllocateProducerIds(67): 0 [usable: 0],
        ConsumerGroupHeartbeat(68): UNSUPPORTED
)
my-cluster-kafka-2.my-cluster-kafka-brokers.kafka.svc:9092 (id: 2 rack: null) -> (
        Produce(0): 0 to 9 [usable: 9],
        Fetch(1): 0 to 15 [usable: 15],
        ListOffsets(2): 0 to 8 [usable: 8],
        Metadata(3): 0 to 12 [usable: 12],
        LeaderAndIsr(4): 0 to 7 [usable: 7],
        StopReplica(5): 0 to 4 [usable: 4],
        UpdateMetadata(6): 0 to 8 [usable: 8],
        ControlledShutdown(7): 0 to 3 [usable: 3],
        OffsetCommit(8): 0 to 8 [usable: 8],
        OffsetFetch(9): 0 to 8 [usable: 8],
        FindCoordinator(10): 0 to 4 [usable: 4],
        JoinGroup(11): 0 to 9 [usable: 9],
        Heartbeat(12): 0 to 4 [usable: 4],
        LeaveGroup(13): 0 to 5 [usable: 5],
        SyncGroup(14): 0 to 5 [usable: 5],
        DescribeGroups(15): 0 to 5 [usable: 5],
        ListGroups(16): 0 to 4 [usable: 4],
        SaslHandshake(17): 0 to 1 [usable: 1],
        ApiVersions(18): 0 to 3 [usable: 3],
        CreateTopics(19): 0 to 7 [usable: 7],
        DeleteTopics(20): 0 to 6 [usable: 6],
        DeleteRecords(21): 0 to 2 [usable: 2],
        InitProducerId(22): 0 to 4 [usable: 4],
        OffsetForLeaderEpoch(23): 0 to 4 [usable: 4],
        AddPartitionsToTxn(24): 0 to 4 [usable: 4],
        AddOffsetsToTxn(25): 0 to 3 [usable: 3],
        EndTxn(26): 0 to 3 [usable: 3],
        WriteTxnMarkers(27): 0 to 1 [usable: 1],
        TxnOffsetCommit(28): 0 to 3 [usable: 3],
        DescribeAcls(29): 0 to 3 [usable: 3],
        CreateAcls(30): 0 to 3 [usable: 3],
        DeleteAcls(31): 0 to 3 [usable: 3],
        DescribeConfigs(32): 0 to 4 [usable: 4],
        AlterConfigs(33): 0 to 2 [usable: 2],
        AlterReplicaLogDirs(34): 0 to 2 [usable: 2],
        DescribeLogDirs(35): 0 to 4 [usable: 4],
        SaslAuthenticate(36): 0 to 2 [usable: 2],
        CreatePartitions(37): 0 to 3 [usable: 3],
        CreateDelegationToken(38): 0 to 3 [usable: 3],
        RenewDelegationToken(39): 0 to 2 [usable: 2],
        ExpireDelegationToken(40): 0 to 2 [usable: 2],
        DescribeDelegationToken(41): 0 to 3 [usable: 3],
        DeleteGroups(42): 0 to 2 [usable: 2],
        ElectLeaders(43): 0 to 2 [usable: 2],
        IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
        AlterPartitionReassignments(45): 0 [usable: 0],
        ListPartitionReassignments(46): 0 [usable: 0],
        OffsetDelete(47): 0 [usable: 0],
        DescribeClientQuotas(48): 0 to 1 [usable: 1],
        AlterClientQuotas(49): 0 to 1 [usable: 1],
        DescribeUserScramCredentials(50): 0 [usable: 0],
        AlterUserScramCredentials(51): 0 [usable: 0],
        DescribeQuorum(55): UNSUPPORTED,
        AlterPartition(56): 0 to 3 [usable: 3],
        UpdateFeatures(57): 0 to 1 [usable: 1],
        Envelope(58): 0 [usable: 0],
        DescribeCluster(60): 0 [usable: 0],
        DescribeProducers(61): 0 [usable: 0],
        UnregisterBroker(64): UNSUPPORTED,
        DescribeTransactions(65): 0 [usable: 0],
        ListTransactions(66): 0 [usable: 0],
        AllocateProducerIds(67): 0 [usable: 0],
        ConsumerGroupHeartbeat(68): UNSUPPORTED
)

# 토픽 리스트 확인
root@rook-01:~/kafka# kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --list
__consumer_offsets
__strimzi-topic-operator-kstreams-topic-store-changelog
__strimzi_store_topic

3.3 Topic 생성

mytopic.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: ${TOPICNAME}
  labels:
    strimzi.io/cluster: "my-cluster"
spec:
  partitions: 1
  replicas: 3
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824
    min.insync.replicas: 2

# 토픽 Topic 생성 (kubectl native) : 파티션 1개 리플리케이션 3개, envsubst 활용
root@rook-01:~/kafka# curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/3/mytopic.yaml
root@rook-01:~/kafka# TOPICNAME=mytopic1 envsubst < mytopic.yaml | kubectl apply -f - -n kafka
kafkatopic.kafka.strimzi.io/mytopic1 created

root@rook-01:~/kafka# kubectl get kafkatopics -n kafka
NAME                                                                                               CLUSTER      PARTITIONS   REPLICATION FACTOR   READY
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a                                        my-cluster   50           3                    True
mytopic1                                                                                           my-cluster   1            3                    True
strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55                                     my-cluster   1            3                    True
strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b   my-cluster   1            3                    True

# 토픽 Topic 생성 : 파티션 1개 리플리케이션 3개
root@rook-01:~/kafka# kubectl exec -it ds/myclient -- kafka-topics.sh --create --bootstrap-server $SVCDNS --topic mytopic2 --partitions 1 --replication-factor 3 --config retention.ms=172800000
Created topic mytopic2.

# 토픽 상세 정보 확인
root@rook-01:~/kafka# kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe
Topic: mytopic2 TopicId: vwomymGLTfmYwwBil0XYEw PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2,retention.ms=172800000,message.format.version=3.0-IV1
        Topic: mytopic2 Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1

4. KEDA

4.1 KEDA 란?

Kubernetes Event-Drived Autoscaling

쿠버네티스에서 지원하는 POD오토 스케일러는 CPU/MEM사용율 기반으로 POD의 개수를 동적으로 스케일링하는 기능을 지원한다. CPU/MEM사용율외에 외부 지표(예: job queue size, http request rate ..)등에 기반해서 POD개수를 스케일링 하려면 상당히 번거롭고 많은 작업을 해야한다 (지표를 prometheus등의 메트릭스 서버에 수집하고, HPA에서 외부 메트릭스 서버 값 참조 하는 방식 등등...). 이를 쉽게 하기 위해서 KEDA 프로젝트가 오픈 소스로 시작 되었으며, CNCF재단의 인큐베이팅 프로젝트의 하나이다.

4.2 KEDA Architecture


KEDA는 다양한 소스로 부터 이벤트를 받아 애플리케이션 파드들을 오토 스케일링 하기 위한 목적으로 개발된 경량의 쿠버네티스 컴포넌트이다.

아래의 3가지 역할을 수행한다.

Agent
이벤트가 유무에 따라 애플리케이션 Deployment를 activate/deactivate시키는 역활 수행하는데, keda-operator POD에 의해서 수행 된다.

Metrics
다양한 이벤트를 제공하는 메트릭스 서버 역활을 수행한다(like kubernetes metrics server).제공하는 지표들은 쿠버네티스 Horizontal Pod Autoscaler에 의해서 참조가 되어 진다. keda-operator-metrics-apiserver POD에 의해서 수행된다.

Admission Webhooks
KEDA관련 자원들의 변경시 Validate를 자동으로 수행하는 역활을 한다(ex: 하나의 Scaled Target에 대해서 복수의 ScaledObject자원 허용 불가 등등..). keda-admission-webhooks POD에 의해서 수행 된다.

4.3 동작 방식 및 특징

사용자가 KEDA에 관리를 요청하기 위한 ScaledObject자원을 생성하면, admission에 의해서 검수 되고,
유효한 자원일 경우 SclaedObject와 KEDA가 관리하는 HPA자원이 자동으로 생성된다.
자동으로 생성된 HPA자원의 지표 참조는 쿠베의 메트릭스 서버가 아닌 keda-metrics-apiserver를 참조 하도록 되어 있다.
즉 KEDA는 애플리케이션 POD들의 개수를 조절하 는스케링 작업을 직접 수행하지 않는다
(KEDA는 쿠버네티스 Horiontal POD Autoscaler를 대체할 목적이 아닌 확장/보조를 목표로 개발되었다)
아래 그림은 위의 설명을 이해를 돕기 위해 Kuberntes Addon형태로 제공되는 metrics server를 이용한 HPA구성과 KEDA를 사용한 HPA구성에 대한 그림이다.

4.4 KEDA 설정

# KEDA 설치
root@rook-01:~/kafka# kubectl create namespace keda
namespace/keda created
root@rook-01:~/kafka# helm repo add kedacore https://kedacore.github.io/charts
"kedacore" has been added to your repositories
root@rook-01:~/kafka# helm install keda kedacore/keda --version 2.12.0 --namespace keda
NAME: keda
LAST DEPLOYED: Fri Nov 17 21:09:36 2023
NAMESPACE: keda
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
:::^.     .::::^:     :::::::::::::::    .:::::::::.                   .^.
7???~   .^7????~.     7??????????????.   :?????????77!^.              .7?7.
7???~  ^7???7~.       ~!!!!!!!!!!!!!!.   :????!!!!7????7~.           .7???7.
7???~^7????~.                            :????:    :~7???7.         :7?????7.
7???7????!.           ::::::::::::.      :????:      .7???!        :7??77???7.
7????????7:           7???????????~      :????:       :????:      :???7?5????7.
7????!~????^          !77777777777^      :????:       :????:     ^???7?#P7????7.
7???~  ^????~                            :????:      :7???!     ^???7J#@J7?????7.
7???~   :7???!.                          :????:   .:~7???!.    ~???7Y&@#7777????7.
7???~    .7???7:      !!!!!!!!!!!!!!!    :????7!!77????7^     ~??775@@@GJJYJ?????7.
7???~     .!????^     7?????????????7.   :?????????7!~:      !????G@@@@@@@@5??????7:
::::.       :::::     :::::::::::::::    .::::::::..        .::::JGGGB@@@&7:::::::::
                                                                      ?@@#~
                                                                      P@B^
                                                                    :&G:
                                                                    !5.
                                                                    .Kubernetes Event-driven Autoscaling (KEDA) - Application autoscaling made simple.

Get started by deploying Scaled Objects to your cluster:
    - Information about Scaled Objects : https://keda.sh/docs/latest/concepts/
    - Samples: https://github.com/kedacore/samples

Get information about the deployed ScaledObjects:
  kubectl get scaledobject [--namespace <namespace>]

Get details about a deployed ScaledObject:
  kubectl describe scaledobject <scaled-object-name> [--namespace <namespace>]

Get information about the deployed ScaledObjects:
  kubectl get triggerauthentication [--namespace <namespace>]

Get details about a deployed ScaledObject:
  kubectl describe triggerauthentication <trigger-authentication-name> [--namespace <namespace>]

Get an overview of the Horizontal Pod Autoscalers (HPA) that KEDA is using behind the scenes:
  kubectl get hpa [--all-namespaces] [--namespace <namespace>]

Learn more about KEDA:
- Documentation: https://keda.sh/
- Support: https://keda.sh/support/
- File an issue: https://github.com/kedacore/keda/issues/new/choose

# KEDA 설치 확인
root@rook-01:~/kafka# kubectl get crd | grep keda
clustertriggerauthentications.keda.sh                 2023-11-17T12:09:37Z
scaledjobs.keda.sh                                    2023-11-17T12:09:37Z
scaledobjects.keda.sh                                 2023-11-17T12:09:37Z
triggerauthentications.keda.sh                        2023-11-17T12:09:37Z

# Deploy Consumer application : 컨슈머로 인해 토픽도 생성됨
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/3/keda-deploy-svc.yaml
cat keda-deploy-svc.yaml | yh
kubectl apply -f keda-deploy-svc.yaml
kubectl get pod -n keda -l app=consumer-service

# 확인
kubectl get kafkatopics -n kafka
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic my-topic --describe
kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group keda-consumer --describe
kubectl logs -n keda -l app=consumer-service -f

# kube-ops-view 로 증가/감소 확인

# KEDA 스케일 관련 정책 생성 : LAG 1 기준 달성 시 파드 증가, producer traffic rate 가 기준 이상이 되면 consumer instances 증가
# 컨슈머 LAG (지연) = ‘프로듀서가 보낸 메시지 갯수(카프카에 남아 있는 메시지 갯수)’ - 컨슈머가 가져간 메시지 갯수’
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/3/keda-scale.yaml
cat keda-scale.yaml | yh
kubectl apply -f keda-scale.yaml

# 모니터링
watch 'kubectl get ScaledObject,hpa,pod -n keda' 
kubectl get ScaledObject,hpa -n keda
kubectl logs -n keda -l app=consumer-service

# (터미널1) for문 반복 메시지 보내기
for ((i=1; i<=100;  i++)); do echo "keda-scale-test-$i" ; kubectl exec -it ds/myclient -- sh -c "echo test1-$i | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic my-topic" ; date ; done

# 모니터링 : 증가한 consumer 파드 확인
kubectl get pod -n keda -l app=consumer-service

# 메시지 보내기 취소 후 일정 시간이 지나면 자동으로 consumer 파드가 최소 갯수 1개로 줄어든다
profile
클라우드 엔지니어

0개의 댓글

관련 채용 정보