AWS Managed Apache Flink - Table API 으로 Kafka connector 사용 시 발생하는 org.apache.flink.table.api.ValidationException 장애 대응

김재민·2024년 9월 24일

flink

목록 보기
10/13
post-thumbnail

Spec


  • Python: 3.11
  • Apache Flink: 1.19.0 (pyflink 사용)
  • MSK(Kafka): 3.5.2
  • Java: 11
  • Maven: 3.9.9

Background


Table api 를 이용하여 아래 코드와 같이 connector 를 kafka 로 선언한 후 데이터 소비를 시도 하였는데, org.apache.flink.table.api.ValidationException 장애가 발생 하였다.

# Table api - kafka connector example

table_env.execute_sql("""
    CREATE TABLE source_kafka_table (
        id STRING,
        city STRING,
        number STRING,
        ins_time STRING
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'test-topic',
        'properties.bootstrap.servers' = 'my-bootstrap-server:9096',
        'properties.group.id' = 'flink_test_group',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'json',
        'properties.security.protocol' = 'SASL_SSL',
        'properties.sasl.mechanism' = 'SCRAM-SHA-512',
        'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="my-username" password="my-password";'
    )
""")

장애 메시지


py4j.protocol.Py4JJavaError: An error occurred while calling o8.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.table_kafka_source'.
...
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='kafka'
        at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:807)
        at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:781)
        at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:224)
        ...
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

장애 원인


Kafka connector 옵션으로 Kafka 소스 테이블을 생성 하려고 할 때 Flink 에서 Kafka connector 관련 클래스를 인식할 수 없어 발생한 장애이다.


로컬 환경일 경우 장애 대응 방법


flink-sql-connector-kafka jar 파일을 버전에 맞게 다운로드 한 후 코드상에서 TableEnvironment 객체에 pipeline.jars 인자값으로 jar 경로를 선언하거나, $PYFLINK_HOME 경로에 존재하는 lib 디렉토리에 직접 jar 파일을 위치 시키면 된다.

🏷️ 나의 경우 Kafka Client 3.2.0 버전을 사용하고 있었고, Apache Flink 는 1.19 버전이기 때문에 메이븐 저장소에서 flink-sql-connector-kafka-3.2.0-1.19.jar 파일을 다운로드 받았다. (MSK Kakfa 버전이 3.5.2 이지만 Kafka 는 하위 버전의 Client 와도 호환 되기 때문에 이상 없다.)


방법1. TableEnvironment 객체에 pipeline.jars 선언하는 방법

  • AWS Managed Apache Flink 공식 문서에서 해당 방법은 로컬 테스트 하는 경우에만 사용하는 것을 권장하고 있다.
  • APP Deploy 시에는 pom.xml 을 통해 종속성을 관리할 수 있도록 안내한다.(해당 방법은 아래 추가로 기록 하겠다.)
  • 아래 코드 예시와 같이 TableEnvironment 객체를 통해 설정할 수 있다.
# 코드 예시

def set_jar_classpath(table_env: TableEnvironment):
    ''' 로컬 환경 테스트 시 Flink connector 관련 종속성 해결을 위한 jar 경로 선언하는 함수 '''
	current_dir = os.path.dirname(os.path.realpath(__file__))
    libs_dir = os.path.join(current_dir, "lib")
    jar_files = [f"file:///{os.path.abspath(os.path.join(libs_dir, jar))}" for jar in os.listdir(libs_dir) if jar.endswith(".jar")]
    jar_files_str = ";".join(jar_files)

    table_env.get_config().get_configuration().set_string("pipeline.jars", jar_files_str)

    for jar in jar_files_str.split(";"):
        print(jar)

📌 Github 코드 참고: https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/blob/main/python/S3Sink/main.py


  • 아래 명령어를 이용하여 PYFLINK_HOME 경로를 찾은 후 lib 디렉토리에 jar 파일을 직접 위치 시키면 된다.
  • 위치 시킨 후 pyflink 앱을 실행 시키면 Job manager 가 lib 경로를 스캔하여 kafka connector 이용에 필요한 java class 를 인식하게 된다.
$ python3 -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__)))"


pom.xml 에 jar 종속성 리스트를 선언한 후 mvn package 명령어로 빌드하면 된다.

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-kafka</artifactId>
    <version>3.2.0-1.19</version>
    <scope>provided</scope>
</dependency>

🏷️ 위와 같이 pom.xml 내용 중 <dependency> 태그를 이용해서 종속성을 추가할 수 있다. 전체 pom.xml 예시는 https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/blob/main/python/S3Sink/pom.xml 내용을 참고하면 된다.
🏷️ 해당 문서는 장애 대응 방법만 기록하기 때문에 pom.xml 파일을 mvn 명령어로 빌드 후 AWS Managed Apache Flink 앱으로 배포하는 방법은 AWS Managed Flink 앱 배포 및 모니터링 하는 방법(with. Apache Flink 1.19) 문서에서 확인할 수 있다.


📌 jar 종속성 관련 장애는 대부분 해당 장애 대응 방법으로 해결이 될 것이니 참고하길 바란다.

profile
안녕하세요. 데이터 엔지니어 김재민 입니다.

0개의 댓글