
Table api 를 이용하여 아래 코드와 같이 connector 를 kafka 로 선언한 후 데이터 소비를 시도 하였는데,
org.apache.flink.table.api.ValidationException장애가 발생 하였다.
# Table api - kafka connector example
table_env.execute_sql("""
CREATE TABLE source_kafka_table (
id STRING,
city STRING,
number STRING,
ins_time STRING
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'properties.bootstrap.servers' = 'my-bootstrap-server:9096',
'properties.group.id' = 'flink_test_group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'SCRAM-SHA-512',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="my-username" password="my-password";'
)
""")
py4j.protocol.Py4JJavaError: An error occurred while calling o8.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.table_kafka_source'.
...
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='kafka'
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:807)
at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:781)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:224)
...
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Kafka connector 옵션으로 Kafka 소스 테이블을 생성 하려고 할 때 Flink 에서 Kafka connector 관련 클래스를 인식할 수 없어 발생한 장애이다.
flink-sql-connector-kafka jar파일을 버전에 맞게 다운로드 한 후 코드상에서TableEnvironment객체에pipeline.jars인자값으로 jar 경로를 선언하거나,$PYFLINK_HOME경로에 존재하는lib디렉토리에 직접 jar 파일을 위치 시키면 된다.
🏷️ 나의 경우 Kafka Client 3.2.0 버전을 사용하고 있었고, Apache Flink 는 1.19 버전이기 때문에 메이븐 저장소에서 flink-sql-connector-kafka-3.2.0-1.19.jar 파일을 다운로드 받았다. (MSK Kakfa 버전이 3.5.2 이지만 Kafka 는 하위 버전의 Client 와도 호환 되기 때문에 이상 없다.)
TableEnvironment 객체를 통해 설정할 수 있다.# 코드 예시
def set_jar_classpath(table_env: TableEnvironment):
''' 로컬 환경 테스트 시 Flink connector 관련 종속성 해결을 위한 jar 경로 선언하는 함수 '''
current_dir = os.path.dirname(os.path.realpath(__file__))
libs_dir = os.path.join(current_dir, "lib")
jar_files = [f"file:///{os.path.abspath(os.path.join(libs_dir, jar))}" for jar in os.listdir(libs_dir) if jar.endswith(".jar")]
jar_files_str = ";".join(jar_files)
table_env.get_config().get_configuration().set_string("pipeline.jars", jar_files_str)
for jar in jar_files_str.split(";"):
print(jar)
📌 Github 코드 참고: https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/blob/main/python/S3Sink/main.py
Job manager 가 lib 경로를 스캔하여 kafka connector 이용에 필요한 java class 를 인식하게 된다.$ python3 -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__)))"
pom.xml 에 jar 종속성 리스트를 선언한 후 mvn package 명령어로 빌드하면 된다.
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>3.2.0-1.19</version>
<scope>provided</scope>
</dependency>
🏷️ 위와 같이 pom.xml 내용 중 <dependency> 태그를 이용해서 종속성을 추가할 수 있다. 전체 pom.xml 예시는 https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/blob/main/python/S3Sink/pom.xml 내용을 참고하면 된다.
🏷️ 해당 문서는 장애 대응 방법만 기록하기 때문에 pom.xml 파일을 mvn 명령어로 빌드 후 AWS Managed Apache Flink 앱으로 배포하는 방법은 AWS Managed Flink 앱 배포 및 모니터링 하는 방법(with. Apache Flink 1.19) 문서에서 확인할 수 있다.
📌 jar 종속성 관련 장애는 대부분 해당 장애 대응 방법으로 해결이 될 것이니 참고하길 바란다.