Flink 를 쿠버네티스 위에서 사용하기 위한 전반적인 가이드를 작성합니다.
Flink Kubernetes Operator 를 활용하여 k8s 에 배포합니다.
# namespace 조회
kubectl get ns | grep flink-learning-lab
# 없을 경우 만들기
kubectl create ns flink-learning-lab
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.12/
Flink Kubernetes Operator v1.11 이상 부터 Flink v2.0 을 지원합니다.
1.12 버전 문서 반영 PR : https://github.com/apache/flink-kubernetes-operator/pull/1027
1.11 버전 문서 반영 PR : https://github.com/apache/flink-kubernetes-operator/pull/1028
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
helm repo add flink-operator-repo
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.12.0/
helm pull flink-operator-repo/flink-kubernetes-operator
tar -zvxf flink-kubernetes-operator-1.12.1-helm.tgz
conf/flink-conf.yaml 을 아래와 같이 수정합니다.
flink-conf.yaml사실 테스트 목적에서 변경할 거리가 없긴 합니다.
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Flink job/cluster related configs
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
# These parameters are required for Java 17 support.
# Flink 1.18 uses env.java.opts.all, if a user supplies their own version of these opts in their FlinkDeployment the options below will be overridden.
# env.java.default-opts.all is used for 1.19 onwards so users can supply their own opts.all in their Job deployments and have these appended.
kubernetes.operator.default-configuration.flink-version.v1_18.env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED
kubernetes.operator.default-configuration.flink-version.v1_19+.env.java.default-opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED
# Flink operator related configs
# kubernetes.operator.reconcile.interval: 60 s
# kubernetes.operator.reconcile.parallelism: 5
# kubernetes.operator.flink.client.cancel.timeout: 1 min
# kubernetes.operator.resource.cleanup.timeout: 60 s
# kubernetes.operator.observer.rest-ready.delay: 10 s
# kubernetes.operator.observer.progress-check.interval: 10 s
# kubernetes.operator.observer.savepoint.trigger.grace-period: 10 s
# kubernetes.operator.flink.client.timeout: 10 s
# kubernetes.operator.deployment.rollback.enabled: false
# kubernetes.operator.deployment.readiness.timeout: 5min
# kubernetes.operator.user.artifacts.base.dir: /opt/flink/artifacts
# kubernetes.operator.job.upgrade.ignore-pending-savepoint: false
# kubernetes.operator.watched.namespaces: ns1,ns2
# kubernetes.operator.label.selector: flink=enabled
# kubernetes.operator.dynamic.namespaces.enabled: false
# kubernetes.operator.retry.initial.interval: 5 s
# kubernetes.operator.retry.interval.multiplier: 2
# kubernetes.operator.retry.max.attempts: 10
# kubernetes.operator.exception.stacktrace.enabled: false
# kubernetes.operator.exception.stacktrace.max.length: 2048
# kubernetes.operator.exception.field.max.length: 2048
# kubernetes.operator.exception.throwable.list.max.count: 2
# kubernetes.operator.exception.label.mapper: Job has already been submitted:duplicatedJobFound,Server returned HTTP response code:httpResponseCodeFound
kubernetes.operator.leader-election.enabled: true
kubernetes.operator.leader-election.lease-name: flink-operator-lease
kubernetes.execution.shutdown-on-application-finish: true
# kubernetes.operator.snapshot.resource.enabled: true
# kubernetes.operator.savepoint.dispose-on-delete: true
# kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
# kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE
helm install flink-kubernetes-operator -n flink-kubernetes-operator ./flink-kubernetes-operator --create-namespace
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.12/docs/operations/rbac/
apiVersion: v1
kind: ServiceAccount
metadata:
name: flink-sa
namespace: flink-learning-lab
RBAC
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
labels:
app.kubernetes.io/name: flink-kubernetes-operator
app.kubernetes.io/version: 1.0.1
namespace: flink-learning-lab
name: flink-role
rules:
- apiGroups:
- ""
resources:
- pods
- configmaps
verbs:
- '*'
- apiGroups:
- apps
resources:
- deployments
verbs:
- '*'
- apiGroups: ["flink.apache.org"]
resources: ["flinkdeployments"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: flink-role-binding
namespace: flink-learning-lab
subjects:
- kind: ServiceAccount
name: flink-sa
namespace: flink-learning-lab
roleRef:
kind: Role
name: flink-role
apiGroup: rbac.authorization.k8s.io
kubectl apply -f rbac
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-learning-lab-cluster-117
namespace: flink-learning-lab
spec:
image: flink:1.17
flinkVersion: v1_17
flinkConfiguration:
taskmanager.numberOfTaskSlots: "16"
web.submit.enable: true
web.cancel.enable: true
serviceAccount: flink-sa
jobManager:
replicas: 1
resource:
memory: 32G
cpu: 8
taskManager:
replicas: 5
resource:
memory: 64G
cpu: 16
podTemplate:
spec:
initContainers:
- name: dependency-downloader
image: alpine:3.18
command:
- /bin/sh
- -c
- |
cd /flink-libs
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.17.2/flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar
wget https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
wget https://repo1.maven.org/maven2/org/antlr/antlr-runtime/3.5.2/antlr-runtime-3.5.2.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
wget https://repo1.maven.org/maven2/commons-logging/commons-logging/1.3.5/commons-logging-1.3.5.jar
wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.5.4/postgresql-42.5.4.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.17/flink-sql-connector-kafka-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-java-bridge/1.17.2/flink-table-api-java-bridge-1.17.2.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.1.0/kafka-clients-3.1.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.17.2/flink-avro-1.17.2.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.17.2/flink-avro-confluent-registry-1.17.2.jar
wget https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/7.0.16/kafka-schema-registry-client-7.0.16.jar
wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.2/avro-1.10.2.jar
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.13.5/jackson-databind-2.13.5.jar
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.13.5/jackson-core-2.13.5.jar
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.13.5/jackson-annotations-2.13.5.jar
volumeMounts:
- mountPath: /flink-libs
name: flink-lib-volume
containers:
- name: flink-main-container
env:
- name: TZ
value: Asia/Seoul
volumeMounts:
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar
subPath: flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/hive-exec-3.1.3.jar
subPath: hive-exec-3.1.3.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/libfb303-0.9.3.jar
subPath: libfb303-0.9.3.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/antlr-runtime-3.5.2.jar
subPath: antlr-runtime-3.5.2.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
subPath: flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/commons-logging-1.3.5.jar
subPath: commons-logging-1.3.5.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/postgresql-42.5.4.jar
subPath: postgresql-42.5.4.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.17.jar
subPath: flink-sql-connector-kafka-3.1.0-1.17.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-table-api-java-bridge-1.17.2.jar
subPath: flink-table-api-java-bridge-1.17.2.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-avro-confluent-registry-1.17.2.jar
subPath: flink-avro-confluent-registry-1.17.2.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/kafka-clients-3.1.0.jar
subPath: kafka-clients-3.1.0.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-avro-1.17.2.jar
subPath: flink-avro-1.17.2.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/kafka-schema-registry-client-7.0.16.jar
subPath: kafka-schema-registry-client-7.0.16.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/avro-1.10.2.jar
subPath: avro-1.10.2.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/jackson-databind-2.13.5.jar
subPath: jackson-databind-2.13.5.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/jackson-core-2.13.5.jar
subPath: jackson-core-2.13.5.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/jackson-annotations-2.13.5.jar
subPath: jackson-annotations-2.13.5.jar
imagePullSecrets:
- name: harbor-creds
volumes:
- name: flink-lib-volume
emptyDir: {}
mode: standalone
1.19.3
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-learning-lab-cluster-119
namespace: flink-learning-lab
spec:
image: flink:1.19
flinkVersion: v1_19
flinkConfiguration:
taskmanager.numberOfTaskSlots: "16"
web.submit.enable: true
web.cancel.enable: true
serviceAccount: flink-sa
jobManager:
replicas: 1
resource:
memory: 32G
cpu: 8
taskManager:
replicas: 5
resource:
memory: 64G
cpu: 16
podTemplate:
spec:
initContainers:
- name: dependency-downloader
image: alpine:3.18
command:
- /bin/sh
- -c
- |
cd /flink-libs
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.19.3/flink-sql-connector-hive-3.1.3_2.12-1.19.3.jar
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar
wget https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
wget https://repo1.maven.org/maven2/org/antlr/antlr-runtime/3.5.2/antlr-runtime-3.5.2.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
wget https://repo1.maven.org/maven2/commons-logging/commons-logging/1.3.5/commons-logging-1.3.5.jar
wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.5.4/postgresql-42.5.4.jar
# wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.3.0-1.19/flink-connector-kafka-3.3.0-1.19.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.3.0-1.19/flink-sql-connector-kafka-3.3.0-1.19.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.0/kafka-clients-3.3.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.19.3/flink-avro-1.19.3.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.19.3/flink-avro-confluent-registry-1.19.3.jar
wget https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/7.0.16/kafka-schema-registry-client-7.0.16.jar
wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.2/avro-1.10.2.jar
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.13.5/jackson-databind-2.13.5.jar
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.13.5/jackson-core-2.13.5.jar
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.13.5/jackson-annotations-2.13.5.jar
volumeMounts:
- mountPath: /flink-libs
name: flink-lib-volume
containers:
- name: flink-main-container
env:
- name: TZ
value: Asia/Seoul
volumeMounts:
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-sql-connector-hive-3.1.3_2.12-1.19.3.jar
subPath: flink-sql-connector-hive-3.1.3_2.12-1.19.3.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/hive-exec-3.1.3.jar
subPath: hive-exec-3.1.3.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/libfb303-0.9.3.jar
subPath: libfb303-0.9.3.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/antlr-runtime-3.5.2.jar
subPath: antlr-runtime-3.5.2.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
subPath: flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/commons-logging-1.3.5.jar
subPath: commons-logging-1.3.5.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/postgresql-42.5.4.jar
subPath: postgresql-42.5.4.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-sql-connector-kafka-3.3.0-1.19.jar
subPath: flink-sql-connector-kafka-3.3.0-1.19.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/kafka-clients-3.3.0.jar
subPath: kafka-clients-3.3.0.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-avro-1.19.3.jar
subPath: flink-avro-1.19.3.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-avro-confluent-registry-1.19.3.jar
subPath: flink-avro-confluent-registry-1.19.3.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/kafka-schema-registry-client-7.0.16.jar
subPath: kafka-schema-registry-client-7.0.16.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/avro-1.10.2.jar
subPath: avro-1.10.2.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/jackson-databind-2.13.5.jar
subPath: jackson-databind-2.13.5.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/jackson-core-2.13.5.jar
subPath: jackson-core-2.13.5.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/jackson-annotations-2.13.5.jar
subPath: jackson-annotations-2.13.5.jar
imagePullSecrets:
- name: harbor-creds
volumes:
- name: flink-lib-volume
emptyDir: {}
mode: standalone
kubectl apply -f flink-session-cluster-{version}.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-sql-gateway-config-117
namespace: flink-learning-lab
data:
# 이 파일은 SQL Gateway Pod에 마운트됩니다.
flink-conf.yaml: |+
# =================================================================
# Flink SQL Gateway 관련 설정
# =================================================================
# 세션 타임아웃 설정 (예: 1시간 동안 활동이 없으면 세션 종료)
sql-gateway.session.idle-timeout: 1h
# 세션 상태를 주기적으로 체크하는 간격
sql-gateway.session.check-interval: 1m
sql-gateway.endpoint.rest.address: 0.0.0.0
sql-gateway.endpoint.rest.port: 8083
# =================================================================
# Flink 클러스터 연결 및 일반 설정
# =================================================================
# 이 설정은 Deployment의 command-line 인수로도 전달할 수 있습니다.
# 둘 다 지정된 경우 command-line 인수가 우선합니다.
jobmanager.rpc.address: flink-learning-lab-cluster-117-rest.flink-learning-lab.svc.cluster.local
jobmanager.rpc.port: 6123
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-sql-gateway-117
namespace: flink-learning-lab
spec:
replicas: 1
selector:
matchLabels:
app: flink-sql-gateway-117
template:
metadata:
labels:
app: flink-sql-gateway-117
spec:
initContainers:
- name: dependency-downloader
image: alpine:3.18
command:
- /bin/sh
- -c
- |
cd /flink-libs
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.17.2/flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar
wget https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
wget https://repo1.maven.org/maven2/org/antlr/antlr-runtime/3.5.2/antlr-runtime-3.5.2.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
wget https://repo1.maven.org/maven2/commons-logging/commons-logging/1.3.5/commons-logging-1.3.5.jar
wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.5.4/postgresql-42.5.4.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.17/flink-sql-connector-kafka-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-java-bridge/1.17.2/flink-table-api-java-bridge-1.17.2.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.1.0/kafka-clients-3.1.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.17.2/flink-avro-1.17.2.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.17.2/flink-avro-confluent-registry-1.17.2.jar
wget https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/7.0.16/kafka-schema-registry-client-7.0.16.jar
wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.2/avro-1.10.2.jar
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.13.5/jackson-databind-2.13.5.jar
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.13.5/jackson-core-2.13.5.jar
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.13.5/jackson-annotations-2.13.5.jar
volumeMounts:
- mountPath: /flink-libs
name: flink-lib-volume
containers:
- name: sql-gateway
image: flink:1.17.2-scala_2.12-java8
command: ["/opt/flink/bin/sql-gateway.sh", "start-foreground"]
ports:
- containerPort: 8083
name: sql-gateway
resources:
requests:
memory: "16Gi"
cpu: "8"
limits:
memory: "16Gi"
cpu: "8"
volumeMounts:
- name: flink-config-volume-117
mountPath: /opt/flink/conf/flink-conf.yaml
subPath: flink-conf.yaml
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar
subPath: flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/hive-exec-3.1.3.jar
subPath: hive-exec-3.1.3.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/libfb303-0.9.3.jar
subPath: libfb303-0.9.3.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/antlr-runtime-3.5.2.jar
subPath: antlr-runtime-3.5.2.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
subPath: flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/commons-logging-1.3.5.jar
subPath: commons-logging-1.3.5.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/postgresql-42.5.4.jar
subPath: postgresql-42.5.4.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.17.jar
subPath: flink-sql-connector-kafka-3.1.0-1.17.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-table-api-java-bridge-1.17.2.jar
subPath: flink-table-api-java-bridge-1.17.2.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-avro-confluent-registry-1.17.2.jar
subPath: flink-avro-confluent-registry-1.17.2.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/kafka-clients-3.1.0.jar
subPath: kafka-clients-3.1.0.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-avro-1.17.2.jar
subPath: flink-avro-1.17.2.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/kafka-schema-registry-client-7.0.16.jar
subPath: kafka-schema-registry-client-7.0.16.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/avro-1.10.2.jar
subPath: avro-1.10.2.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/jackson-databind-2.13.5.jar
subPath: jackson-databind-2.13.5.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/jackson-core-2.13.5.jar
subPath: jackson-core-2.13.5.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/jackson-annotations-2.13.5.jar
subPath: jackson-annotations-2.13.5.jar
volumes:
- name: flink-config-volume-117
configMap:
name: flink-sql-gateway-config-117
- name: flink-lib-volume
emptyDir: { }
---
apiVersion: v1
kind: Service
metadata:
name: sql-gateway-service-117
namespace: flink-learning-lab
spec:
selector:
app: flink-sql-gateway-117
ports:
- name: sql-gateway
port: 8083
targetPort: 8083
nodePort: 30083
type: NodePort
1.19.3
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-sql-gateway-config-119
namespace: flink-learning-lab
data:
# 이 파일은 SQL Gateway Pod에 마운트됩니다.
config.yaml: |+
# =================================================================
# Flink SQL Gateway 관련 설정
# =================================================================
# 세션 타임아웃 설정 (예: 1시간 동안 활동이 없으면 세션 종료)
sql-gateway.session.idle-timeout: 1h
# 세션 상태를 주기적으로 체크하는 간격
sql-gateway.session.check-interval: 1m
# =================================================================
# Flink 클러스터 연결 및 일반 설정
# =================================================================
sql-gateway.endpoint.rest.address: 0.0.0.0
sql-gateway.endpoint.rest.port: 8083
jobmanager.rpc.address: flink-learning-lab-cluster-119-rest.flink-learning-lab.svc.cluster.local
jobmanager.rpc.port: 6123
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-sql-gateway-119
namespace: flink-learning-lab
spec:
replicas: 1
selector:
matchLabels:
app: flink-sql-gateway-119
template:
metadata:
labels:
app: flink-sql-gateway-119
spec:
initContainers:
- name: dependency-downloader
image: alpine:3.18
command:
- /bin/sh
- -c
- |
cd /flink-libs
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.19.3/flink-sql-connector-hive-3.1.3_2.12-1.19.3.jar
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar
wget https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
wget https://repo1.maven.org/maven2/org/antlr/antlr-runtime/3.5.2/antlr-runtime-3.5.2.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
wget https://repo1.maven.org/maven2/commons-logging/commons-logging/1.3.5/commons-logging-1.3.5.jar
wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.5.4/postgresql-42.5.4.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.3.0-1.19/flink-sql-connector-kafka-3.3.0-1.19.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.0/kafka-clients-3.3.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.19.3/flink-avro-1.19.3.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.19.3/flink-avro-confluent-registry-1.19.3.jar
wget https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/7.0.16/kafka-schema-registry-client-7.0.16.jar
wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.2/avro-1.10.2.jar
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.13.5/jackson-databind-2.13.5.jar
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.13.5/jackson-core-2.13.5.jar
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.13.5/jackson-annotations-2.13.5.jar
volumeMounts:
- mountPath: /flink-libs
name: flink-lib-volume
containers:
- name: sql-gateway
image: flink:1.19.3-scala_2.12-java8
command: ["/opt/flink/bin/sql-gateway.sh", "start-foreground"]
ports:
- containerPort: 8083
name: sql-gateway
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1"
volumeMounts:
- name: flink-config-volume-119
mountPath: /opt/flink/conf/config.yaml
subPath: config.yaml
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-sql-connector-hive-3.1.3_2.12-1.19.3.jar
subPath: flink-sql-connector-hive-3.1.3_2.12-1.19.3.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/hive-exec-3.1.3.jar
subPath: hive-exec-3.1.3.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/libfb303-0.9.3.jar
subPath: libfb303-0.9.3.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/antlr-runtime-3.5.2.jar
subPath: antlr-runtime-3.5.2.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
subPath: flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/commons-logging-1.3.5.jar
subPath: commons-logging-1.3.5.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/postgresql-42.5.4.jar
subPath: postgresql-42.5.4.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-sql-connector-kafka-3.3.0-1.19.jar
subPath: flink-sql-connector-kafka-3.3.0-1.19.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/kafka-clients-3.3.0.jar
subPath: kafka-clients-3.3.0.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-avro-1.19.3.jar
subPath: flink-avro-1.19.3.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/flink-avro-confluent-registry-1.19.3.jar
subPath: flink-avro-confluent-registry-1.19.3.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/kafka-schema-registry-client-7.0.16.jar
subPath: kafka-schema-registry-client-7.0.16.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/avro-1.10.2.jar
subPath: avro-1.10.2.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/jackson-databind-2.13.5.jar
subPath: jackson-databind-2.13.5.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/jackson-core-2.13.5.jar
subPath: jackson-core-2.13.5.jar
- name: flink-lib-volume
mountPath: /opt/flink/lib/jackson-annotations-2.13.5.jar
subPath: jackson-annotations-2.13.5.jar
volumes:
- name: flink-config-volume-119
configMap:
name: flink-sql-gateway-config-119
- name: flink-lib-volume
emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
name: sql-gateway-service-119
namespace: flink-learning-lab
spec:
selector:
app: flink-sql-gateway-119
ports:
- name: sql-gateway
port: 8083
targetPort: 8083
nodePort: 30083
type: NodePort
kubectl apply -f flink-sql-gateway-{version}.yaml
아래는 공식 다운로드 링크 입니다.
1.17.2 : https://downloads.apache.org/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz
1.19.3 : https://dlcdn.apache.org/flink/flink-1.19.3/flink-1.19.3-bin-scala_2.12.tgz
압축 해제 후 conf 폴더를 아래와 같이 수정해줍니다.
1.17.2
1.17.2 는 conf 의 flink-conf.yaml 을 수정합니다.
1.19.3
1.19.3 은 conf 의 config.yaml 을 수정합니다. 1.18 버전 부터 flink-conf.yaml 에서 config.yaml 로 변경 되었습니다.
rest.port: 8081
# 아래 rest.address 는 flink cluster 가 1.17 일 경우 입니다.
rest.address: flink-learning-lab-cluster-117-rest.flink-learning-lab.svc.cluster.local
# 1.19 일 경우 아래와 같이 합니다.
# rest.address: flink-learning-lab-cluster-119-rest.flink-learning-lab.svc.cluster.local
# 아래 rest.address 의 경우 localhost 로 되어 있다면 삭제 해줍니다.
Local 에서 ${FLINK_HOME}/bin 을 Path 로 포함 시켜주세요.
SQL-Client 를 쉽게 사용하기 위함 입니다.
SET execution.runtime-mode=streaming;
SET pipeline.name= 'topic_name';
CREATE TABLE IF NOT EXISTS `kafka_source`(
-- `header` ROW<col1 STRING, col2 INTEGER, col3 STRING, col4 STRING>,
-- `policy` ROW<col1 ARRAY<ROW<col1_1 BIGINT, col1_2 BIGINT, col1_3 BIGINT, col1_4 STRING>>>
`value` string
) WITH (
'connector'='kafka',
'topic'='topic_name',
'properties.bootstrap.servers'='bootstrapsevers(ip:port,ip2:port2...)',
'properties.group.id'='topic_name-flink',
'scan.startup.mode'='earliest-offset',
'value.format'='json',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'SCRAM-SHA-512',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="id" password="pass";'
);
CREATE TABLE IF NOT EXISTS `kafka_target`(
-- `header` ROW<tid STRING, messageType INTEGER, contentType STRING, brand STRING>,
-- `policy` ROW<triggerPolicies ARRAY<ROW<id BIGINT, startTime BIGINT, endTime BIGINT, utcOffset STRING>>>
`value` string
) WITH (
'connector'='kafka',
'topic'='DEV-FLINK-ONBOARDING',
'properties.bootstrap.servers'='target_bootstrapservers(ip:port,ip2:port2...)',
'value.format'='json',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'SCRAM-SHA-512',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="id" password="pass";'
);
INSERT INTO kafka_target SELECT * FROM kafka_source LIMIT 1;