FlinkSessionJob (w/ k8s) Guide

Chan hae OH·2025년 9월 17일

Flink

목록 보기
1/1

1. Flink 사용 Guide

Flink 를 쿠버네티스 위에서 사용하기 위한 전반적인 가이드를 작성합니다.
Flink Kubernetes Operator 를 활용하여 k8s 에 배포합니다.

2. Namespace 준비 (최초 1회)

# namespace 조회
kubectl get ns | grep flink-learning-lab
# 없을 경우 만들기
kubectl create ns flink-learning-lab

3. Flink Kubernetes Operator 기동 (최초 1회)

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.12/

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.12/docs/concepts/architecture/

설치 참고 : https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.12/docs/try-flink-kubernetes-operator/quick-start/

Flink Kubernetes Operator v1.11 이상 부터 Flink v2.0 을 지원합니다.

1.12 버전 문서 반영 PR : https://github.com/apache/flink-kubernetes-operator/pull/1027

1.11 버전 문서 반영 PR : https://github.com/apache/flink-kubernetes-operator/pull/1028

3.1.1. certificate manager 생성

kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
helm repo add flink-operator-repo

3.2. 최초 helm repository 등록 (Local)

helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.12.0/

3.3. helm chart down

helm pull flink-operator-repo/flink-kubernetes-operator
tar -zvxf flink-kubernetes-operator-1.12.1-helm.tgz

conf/flink-conf.yaml 을 아래와 같이 수정합니다.

사실 테스트 목적에서 변경할 거리가 없긴 합니다.

flink-conf.yaml
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Flink job/cluster related configs
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1

# These parameters are required for Java 17 support.
# Flink 1.18 uses env.java.opts.all, if a user supplies their own version of these opts in their FlinkDeployment the options below will be overridden.
# env.java.default-opts.all is used for 1.19 onwards so users can supply their own opts.all in their Job deployments and have these appended.
kubernetes.operator.default-configuration.flink-version.v1_18.env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED
kubernetes.operator.default-configuration.flink-version.v1_19+.env.java.default-opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED
# Flink operator related configs
# kubernetes.operator.reconcile.interval: 60 s
# kubernetes.operator.reconcile.parallelism: 5
# kubernetes.operator.flink.client.cancel.timeout: 1 min
# kubernetes.operator.resource.cleanup.timeout: 60 s
# kubernetes.operator.observer.rest-ready.delay: 10 s
# kubernetes.operator.observer.progress-check.interval: 10 s
# kubernetes.operator.observer.savepoint.trigger.grace-period: 10 s
# kubernetes.operator.flink.client.timeout: 10 s
# kubernetes.operator.deployment.rollback.enabled: false
# kubernetes.operator.deployment.readiness.timeout: 5min
# kubernetes.operator.user.artifacts.base.dir: /opt/flink/artifacts
# kubernetes.operator.job.upgrade.ignore-pending-savepoint: false
# kubernetes.operator.watched.namespaces: ns1,ns2
# kubernetes.operator.label.selector: flink=enabled
# kubernetes.operator.dynamic.namespaces.enabled: false
# kubernetes.operator.retry.initial.interval: 5 s
# kubernetes.operator.retry.interval.multiplier: 2
# kubernetes.operator.retry.max.attempts: 10
# kubernetes.operator.exception.stacktrace.enabled: false
# kubernetes.operator.exception.stacktrace.max.length: 2048
# kubernetes.operator.exception.field.max.length: 2048
# kubernetes.operator.exception.throwable.list.max.count: 2
# kubernetes.operator.exception.label.mapper: Job has already been submitted:duplicatedJobFound,Server returned HTTP response code:httpResponseCodeFound
kubernetes.operator.leader-election.enabled: true
kubernetes.operator.leader-election.lease-name: flink-operator-lease
kubernetes.execution.shutdown-on-application-finish: true

# kubernetes.operator.snapshot.resource.enabled: true
# kubernetes.operator.savepoint.dispose-on-delete: true

# kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
# kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE

4. Helm 배포

helm install flink-kubernetes-operator -n flink-kubernetes-operator ./flink-kubernetes-operator --create-namespace

5. Flink RBAC (최초 1회)

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.12/docs/operations/rbac/

Service Account
apiVersion: v1
kind: ServiceAccount
metadata:
name: flink-sa
namespace: flink-learning-lab
RBAC
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  labels:
    app.kubernetes.io/name: flink-kubernetes-operator
    app.kubernetes.io/version: 1.0.1
  namespace: flink-learning-lab
  name: flink-role
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - configmaps
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  verbs:
  - '*'
- apiGroups: ["flink.apache.org"]
  resources: ["flinkdeployments"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: flink-role-binding
  namespace: flink-learning-lab
subjects:
  - kind: ServiceAccount
    name: flink-sa
    namespace: flink-learning-lab
roleRef:
  kind: Role
  name: flink-role
  apiGroup: rbac.authorization.k8s.io
  
kubectl apply -f rbac

6. Flink Session Cluster 배포

1.17.2
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-learning-lab-cluster-117
  namespace: flink-learning-lab
spec:
  image: flink:1.17
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "16"
    web.submit.enable: true
    web.cancel.enable: true
  serviceAccount: flink-sa
  jobManager:
    replicas: 1
    resource:
      memory: 32G
      cpu: 8
  taskManager:
    replicas: 5
    resource:
      memory: 64G
      cpu: 16
  podTemplate:
    spec:
      initContainers:
        - name: dependency-downloader
          image: alpine:3.18
          command:
            - /bin/sh
            - -c
            - |
              cd /flink-libs

              wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.17.2/flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar
              wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar
              wget https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
              wget https://repo1.maven.org/maven2/org/antlr/antlr-runtime/3.5.2/antlr-runtime-3.5.2.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
              wget https://repo1.maven.org/maven2/commons-logging/commons-logging/1.3.5/commons-logging-1.3.5.jar
              wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.5.4/postgresql-42.5.4.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.17/flink-sql-connector-kafka-3.1.0-1.17.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-java-bridge/1.17.2/flink-table-api-java-bridge-1.17.2.jar
              wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.1.0/kafka-clients-3.1.0.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.17.2/flink-avro-1.17.2.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.17.2/flink-avro-confluent-registry-1.17.2.jar
              wget https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/7.0.16/kafka-schema-registry-client-7.0.16.jar
              wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.2/avro-1.10.2.jar
              wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.13.5/jackson-databind-2.13.5.jar
              wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.13.5/jackson-core-2.13.5.jar
              wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.13.5/jackson-annotations-2.13.5.jar
          volumeMounts:
            - mountPath: /flink-libs
              name: flink-lib-volume
      containers:
        - name: flink-main-container
          env:
            - name: TZ
              value: Asia/Seoul
          volumeMounts:
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar
              subPath: flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/hive-exec-3.1.3.jar
              subPath: hive-exec-3.1.3.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/libfb303-0.9.3.jar
              subPath: libfb303-0.9.3.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/antlr-runtime-3.5.2.jar
              subPath: antlr-runtime-3.5.2.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
              subPath: flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/commons-logging-1.3.5.jar
              subPath: commons-logging-1.3.5.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/postgresql-42.5.4.jar
              subPath: postgresql-42.5.4.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.17.jar
              subPath: flink-sql-connector-kafka-3.1.0-1.17.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/flink-table-api-java-bridge-1.17.2.jar
              subPath: flink-table-api-java-bridge-1.17.2.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/flink-avro-confluent-registry-1.17.2.jar
              subPath: flink-avro-confluent-registry-1.17.2.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/kafka-clients-3.1.0.jar
              subPath: kafka-clients-3.1.0.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/flink-avro-1.17.2.jar
              subPath: flink-avro-1.17.2.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/kafka-schema-registry-client-7.0.16.jar
              subPath: kafka-schema-registry-client-7.0.16.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/avro-1.10.2.jar
              subPath: avro-1.10.2.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/jackson-databind-2.13.5.jar
              subPath: jackson-databind-2.13.5.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/jackson-core-2.13.5.jar
              subPath: jackson-core-2.13.5.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/jackson-annotations-2.13.5.jar
              subPath: jackson-annotations-2.13.5.jar
      imagePullSecrets:
        - name: harbor-creds
      volumes:
        - name: flink-lib-volume
          emptyDir: {}
  mode: standalone
1.19.3
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-learning-lab-cluster-119
  namespace: flink-learning-lab
spec:
  image: flink:1.19
  flinkVersion: v1_19
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "16"
    web.submit.enable: true
    web.cancel.enable: true
  serviceAccount: flink-sa
  jobManager:
    replicas: 1
    resource:
      memory: 32G
      cpu: 8
  taskManager:
    replicas: 5
    resource:
      memory: 64G
      cpu: 16
  podTemplate:
    spec:
      initContainers:
        - name: dependency-downloader
          image: alpine:3.18
          command:
            - /bin/sh
            - -c
            - |

              cd /flink-libs

              wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.19.3/flink-sql-connector-hive-3.1.3_2.12-1.19.3.jar
              wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar
              wget https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
              wget https://repo1.maven.org/maven2/org/antlr/antlr-runtime/3.5.2/antlr-runtime-3.5.2.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
              wget https://repo1.maven.org/maven2/commons-logging/commons-logging/1.3.5/commons-logging-1.3.5.jar
              wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.5.4/postgresql-42.5.4.jar
              # wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.3.0-1.19/flink-connector-kafka-3.3.0-1.19.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.3.0-1.19/flink-sql-connector-kafka-3.3.0-1.19.jar
              wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.0/kafka-clients-3.3.0.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.19.3/flink-avro-1.19.3.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.19.3/flink-avro-confluent-registry-1.19.3.jar
              wget https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/7.0.16/kafka-schema-registry-client-7.0.16.jar
              wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.2/avro-1.10.2.jar
              wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.13.5/jackson-databind-2.13.5.jar
              wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.13.5/jackson-core-2.13.5.jar
              wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.13.5/jackson-annotations-2.13.5.jar
          volumeMounts:
            - mountPath: /flink-libs
              name: flink-lib-volume
      containers:
        - name: flink-main-container
          env:
            - name: TZ
              value: Asia/Seoul
          volumeMounts:
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/flink-sql-connector-hive-3.1.3_2.12-1.19.3.jar
              subPath: flink-sql-connector-hive-3.1.3_2.12-1.19.3.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/hive-exec-3.1.3.jar
              subPath: hive-exec-3.1.3.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/libfb303-0.9.3.jar
              subPath: libfb303-0.9.3.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/antlr-runtime-3.5.2.jar
              subPath: antlr-runtime-3.5.2.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
              subPath: flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/commons-logging-1.3.5.jar
              subPath: commons-logging-1.3.5.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/postgresql-42.5.4.jar
              subPath: postgresql-42.5.4.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/flink-sql-connector-kafka-3.3.0-1.19.jar
              subPath: flink-sql-connector-kafka-3.3.0-1.19.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/kafka-clients-3.3.0.jar
              subPath: kafka-clients-3.3.0.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/flink-avro-1.19.3.jar
              subPath: flink-avro-1.19.3.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/flink-avro-confluent-registry-1.19.3.jar
              subPath: flink-avro-confluent-registry-1.19.3.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/kafka-schema-registry-client-7.0.16.jar
              subPath: kafka-schema-registry-client-7.0.16.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/avro-1.10.2.jar
              subPath: avro-1.10.2.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/jackson-databind-2.13.5.jar
              subPath: jackson-databind-2.13.5.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/jackson-core-2.13.5.jar
              subPath: jackson-core-2.13.5.jar
            - name: flink-lib-volume
              mountPath: /opt/flink/lib/jackson-annotations-2.13.5.jar
              subPath: jackson-annotations-2.13.5.jar
      imagePullSecrets:
        - name: harbor-creds
      volumes:
        - name: flink-lib-volume
          emptyDir: {}
  mode: standalone
kubectl apply -f flink-session-cluster-{version}.yaml

7. Flink SQL Gateway 배포

1.17.2
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-sql-gateway-config-117
  namespace: flink-learning-lab
data:
  # 이 파일은 SQL Gateway Pod에 마운트됩니다.
  flink-conf.yaml: |+
    # =================================================================
    # Flink SQL Gateway 관련 설정
    # =================================================================
    # 세션 타임아웃 설정 (예: 1시간 동안 활동이 없으면 세션 종료)
    sql-gateway.session.idle-timeout: 1h
    # 세션 상태를 주기적으로 체크하는 간격
    sql-gateway.session.check-interval: 1m
    sql-gateway.endpoint.rest.address: 0.0.0.0
    sql-gateway.endpoint.rest.port: 8083
    # =================================================================
    # Flink 클러스터 연결 및 일반 설정
    # =================================================================
    # 이 설정은 Deployment의 command-line 인수로도 전달할 수 있습니다.
    # 둘 다 지정된 경우 command-line 인수가 우선합니다.
    jobmanager.rpc.address: flink-learning-lab-cluster-117-rest.flink-learning-lab.svc.cluster.local
    jobmanager.rpc.port: 6123
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-sql-gateway-117
  namespace: flink-learning-lab
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink-sql-gateway-117
  template:
    metadata:
      labels:
        app: flink-sql-gateway-117
    spec:
      initContainers:
        - name: dependency-downloader
          image: alpine:3.18
          command:
            - /bin/sh
            - -c
            - |
              cd /flink-libs

              wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.17.2/flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar
              wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar
              wget https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
              wget https://repo1.maven.org/maven2/org/antlr/antlr-runtime/3.5.2/antlr-runtime-3.5.2.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
              wget https://repo1.maven.org/maven2/commons-logging/commons-logging/1.3.5/commons-logging-1.3.5.jar
              wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.5.4/postgresql-42.5.4.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.17/flink-sql-connector-kafka-3.1.0-1.17.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-java-bridge/1.17.2/flink-table-api-java-bridge-1.17.2.jar
              wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.1.0/kafka-clients-3.1.0.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.17.2/flink-avro-1.17.2.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.17.2/flink-avro-confluent-registry-1.17.2.jar
              wget https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/7.0.16/kafka-schema-registry-client-7.0.16.jar
              wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.2/avro-1.10.2.jar
              wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.13.5/jackson-databind-2.13.5.jar
              wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.13.5/jackson-core-2.13.5.jar
              wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.13.5/jackson-annotations-2.13.5.jar
          volumeMounts:
            - mountPath: /flink-libs
              name: flink-lib-volume
      containers:
      - name: sql-gateway
        image: flink:1.17.2-scala_2.12-java8
        command: ["/opt/flink/bin/sql-gateway.sh", "start-foreground"]
        ports:
        - containerPort: 8083
          name: sql-gateway
        resources:
          requests:
            memory: "16Gi"
            cpu: "8"
          limits:
            memory: "16Gi"
            cpu: "8"
        volumeMounts:
          - name: flink-config-volume-117
            mountPath: /opt/flink/conf/flink-conf.yaml
            subPath: flink-conf.yaml
          - name: flink-lib-volume
            mountPath: /opt/flink/lib/flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar
            subPath: flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar
          - name: flink-lib-volume
            mountPath: /opt/flink/lib/hive-exec-3.1.3.jar
            subPath: hive-exec-3.1.3.jar
          - name: flink-lib-volume
            mountPath: /opt/flink/lib/libfb303-0.9.3.jar
            subPath: libfb303-0.9.3.jar
          - name: flink-lib-volume
            mountPath: /opt/flink/lib/antlr-runtime-3.5.2.jar
            subPath: antlr-runtime-3.5.2.jar
          - name: flink-lib-volume
            mountPath: /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
            subPath: flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
          - name: flink-lib-volume
            mountPath: /opt/flink/lib/commons-logging-1.3.5.jar
            subPath: commons-logging-1.3.5.jar
          - name: flink-lib-volume
            mountPath: /opt/flink/lib/postgresql-42.5.4.jar
            subPath: postgresql-42.5.4.jar
          - name: flink-lib-volume
            mountPath: /opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.17.jar
            subPath: flink-sql-connector-kafka-3.1.0-1.17.jar
          - name: flink-lib-volume
            mountPath: /opt/flink/lib/flink-table-api-java-bridge-1.17.2.jar
            subPath: flink-table-api-java-bridge-1.17.2.jar
          - name: flink-lib-volume
            mountPath: /opt/flink/lib/flink-avro-confluent-registry-1.17.2.jar
            subPath: flink-avro-confluent-registry-1.17.2.jar
          - name: flink-lib-volume
            mountPath: /opt/flink/lib/kafka-clients-3.1.0.jar
            subPath: kafka-clients-3.1.0.jar
          - name: flink-lib-volume
            mountPath: /opt/flink/lib/flink-avro-1.17.2.jar
            subPath: flink-avro-1.17.2.jar
          - name: flink-lib-volume
            mountPath: /opt/flink/lib/kafka-schema-registry-client-7.0.16.jar
            subPath: kafka-schema-registry-client-7.0.16.jar
          - name: flink-lib-volume
            mountPath: /opt/flink/lib/avro-1.10.2.jar
            subPath: avro-1.10.2.jar
          - name: flink-lib-volume
            mountPath: /opt/flink/lib/jackson-databind-2.13.5.jar
            subPath: jackson-databind-2.13.5.jar
          - name: flink-lib-volume
            mountPath: /opt/flink/lib/jackson-core-2.13.5.jar
            subPath: jackson-core-2.13.5.jar
          - name: flink-lib-volume
            mountPath: /opt/flink/lib/jackson-annotations-2.13.5.jar
            subPath: jackson-annotations-2.13.5.jar
      volumes:
      - name: flink-config-volume-117
        configMap:
          name: flink-sql-gateway-config-117
      - name: flink-lib-volume
        emptyDir: { }
---
apiVersion: v1
kind: Service
metadata:
  name: sql-gateway-service-117
  namespace: flink-learning-lab
spec:
  selector:
    app: flink-sql-gateway-117
  ports:
  - name: sql-gateway
    port: 8083
    targetPort: 8083
    nodePort: 30083
  type: NodePort
1.19.3
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-sql-gateway-config-119
  namespace: flink-learning-lab
data:
  # 이 파일은 SQL Gateway Pod에 마운트됩니다.
  config.yaml: |+
    # =================================================================
    # Flink SQL Gateway 관련 설정
    # =================================================================
    # 세션 타임아웃 설정 (예: 1시간 동안 활동이 없으면 세션 종료)
    sql-gateway.session.idle-timeout: 1h
    # 세션 상태를 주기적으로 체크하는 간격
    sql-gateway.session.check-interval: 1m
    # =================================================================
    # Flink 클러스터 연결 및 일반 설정
    # =================================================================
    sql-gateway.endpoint.rest.address: 0.0.0.0
    sql-gateway.endpoint.rest.port: 8083
    
    jobmanager.rpc.address: flink-learning-lab-cluster-119-rest.flink-learning-lab.svc.cluster.local
    jobmanager.rpc.port: 6123
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-sql-gateway-119
  namespace: flink-learning-lab
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink-sql-gateway-119
  template:
    metadata:
      labels:
        app: flink-sql-gateway-119
    spec:
      initContainers:
        - name: dependency-downloader
          image: alpine:3.18
          command:
            - /bin/sh
            - -c
            - |
              
              cd /flink-libs
              
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.19.3/flink-sql-connector-hive-3.1.3_2.12-1.19.3.jar
              wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar
              wget https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
              wget https://repo1.maven.org/maven2/org/antlr/antlr-runtime/3.5.2/antlr-runtime-3.5.2.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
              wget https://repo1.maven.org/maven2/commons-logging/commons-logging/1.3.5/commons-logging-1.3.5.jar
              wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.5.4/postgresql-42.5.4.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.3.0-1.19/flink-sql-connector-kafka-3.3.0-1.19.jar
              wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.0/kafka-clients-3.3.0.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.19.3/flink-avro-1.19.3.jar
              wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.19.3/flink-avro-confluent-registry-1.19.3.jar
              wget https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/7.0.16/kafka-schema-registry-client-7.0.16.jar
              wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.2/avro-1.10.2.jar
              wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.13.5/jackson-databind-2.13.5.jar
              wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.13.5/jackson-core-2.13.5.jar
              wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.13.5/jackson-annotations-2.13.5.jar
          volumeMounts:
            - mountPath: /flink-libs
              name: flink-lib-volume
      containers:
      - name: sql-gateway
        image: flink:1.19.3-scala_2.12-java8
        command: ["/opt/flink/bin/sql-gateway.sh", "start-foreground"]
        ports:
        - containerPort: 8083
          name: sql-gateway
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1"
        volumeMounts:
        - name: flink-config-volume-119
          mountPath: /opt/flink/conf/config.yaml
          subPath: config.yaml
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/flink-sql-connector-hive-3.1.3_2.12-1.19.3.jar
          subPath: flink-sql-connector-hive-3.1.3_2.12-1.19.3.jar
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/hive-exec-3.1.3.jar
          subPath: hive-exec-3.1.3.jar
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/libfb303-0.9.3.jar
          subPath: libfb303-0.9.3.jar
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/antlr-runtime-3.5.2.jar
          subPath: antlr-runtime-3.5.2.jar
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
          subPath: flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/commons-logging-1.3.5.jar
          subPath: commons-logging-1.3.5.jar
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/postgresql-42.5.4.jar
          subPath: postgresql-42.5.4.jar
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/flink-sql-connector-kafka-3.3.0-1.19.jar
          subPath: flink-sql-connector-kafka-3.3.0-1.19.jar
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/kafka-clients-3.3.0.jar
          subPath: kafka-clients-3.3.0.jar
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/flink-avro-1.19.3.jar
          subPath: flink-avro-1.19.3.jar
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/flink-avro-confluent-registry-1.19.3.jar
          subPath: flink-avro-confluent-registry-1.19.3.jar
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/kafka-schema-registry-client-7.0.16.jar
          subPath: kafka-schema-registry-client-7.0.16.jar
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/avro-1.10.2.jar
          subPath: avro-1.10.2.jar
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/jackson-databind-2.13.5.jar
          subPath: jackson-databind-2.13.5.jar
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/jackson-core-2.13.5.jar
          subPath: jackson-core-2.13.5.jar
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/jackson-annotations-2.13.5.jar
          subPath: jackson-annotations-2.13.5.jar
      volumes:
      - name: flink-config-volume-119
        configMap:
          name: flink-sql-gateway-config-119
      - name: flink-lib-volume
        emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
  name: sql-gateway-service-119
  namespace: flink-learning-lab
spec:
  selector:
    app: flink-sql-gateway-119
  ports:
  - name: sql-gateway
    port: 8083
    targetPort: 8083
    nodePort: 30083
  type: NodePort
kubectl apply -f flink-sql-gateway-{version}.yaml

8. Local SQL Client 환경 세팅

아래는 공식 다운로드 링크 입니다.

압축 해제 후 conf 폴더를 아래와 같이 수정해줍니다.

  • 1.17.2

    1.17.2 는 conf 의 flink-conf.yaml 을 수정합니다.

  • 1.19.3

    1.19.3 은 conf 의 config.yaml 을 수정합니다. 1.18 버전 부터 flink-conf.yaml 에서 config.yaml 로 변경 되었습니다.

rest.port: 8081
# 아래 rest.address 는 flink cluster 가 1.17 일 경우 입니다.
rest.address: flink-learning-lab-cluster-117-rest.flink-learning-lab.svc.cluster.local
# 1.19 일 경우 아래와 같이 합니다.
# rest.address: flink-learning-lab-cluster-119-rest.flink-learning-lab.svc.cluster.local
# 아래 rest.address 의 경우 localhost 로 되어 있다면 삭제 해줍니다.

Local 에서 ${FLINK_HOME}/bin 을 Path 로 포함 시켜주세요.
SQL-Client 를 쉽게 사용하기 위함 입니다.

9. Flink Session Job 테스트

sql
SET execution.runtime-mode=streaming;
SET pipeline.name= 'topic_name';

CREATE TABLE IF NOT EXISTS `kafka_source`(
--     `header` ROW<col1 STRING, col2 INTEGER, col3 STRING, col4 STRING>,
--     `policy` ROW<col1 ARRAY<ROW<col1_1 BIGINT, col1_2 BIGINT, col1_3 BIGINT, col1_4 STRING>>>
    `value` string
) WITH (
    'connector'='kafka',
    'topic'='topic_name',
    'properties.bootstrap.servers'='bootstrapsevers(ip:port,ip2:port2...)',
    'properties.group.id'='topic_name-flink',
    'scan.startup.mode'='earliest-offset',
    'value.format'='json',
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'SCRAM-SHA-512',
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="id" password="pass";'
);

CREATE TABLE IF NOT EXISTS `kafka_target`(
--     `header` ROW<tid STRING, messageType INTEGER, contentType STRING, brand STRING>,
--     `policy` ROW<triggerPolicies ARRAY<ROW<id BIGINT, startTime BIGINT, endTime BIGINT, utcOffset STRING>>>
    `value` string
) WITH (
'connector'='kafka',
'topic'='DEV-FLINK-ONBOARDING',
'properties.bootstrap.servers'='target_bootstrapservers(ip:port,ip2:port2...)',
'value.format'='json',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'SCRAM-SHA-512',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="id" password="pass";'
);

INSERT INTO kafka_target SELECT * FROM kafka_source LIMIT 1;
profile
Data Engineer

0개의 댓글