AWS Managed Apache Flink - Datastream 사용할 때 발생하는 BeamPythonFunctionRunner Error 장애 대응

김재민·2024년 9월 23일

flink

목록 보기
8/13
post-thumbnail

Spec


  • MAC OS M1(Silicon chip)
  • Python: 3.11
  • Apache Flink: 1.19.0
  • AWS Managed Apache Flink: 1.19

Background


  • Datastream api 의 FlinkKafkaConsumer 를 이용하여 AWS MSK(Kafka) 데이터를 읽은 후
  • ProcessFunction 을 상속 받은 커스텀 Class 를 구현하여 전처리 기준에 따라
  • side-output tag 를 이용해 데이터스트림을 분기 하였다.
  • 그 중 하나는 Table api 의 create_temporary_view() 메서드를 이용하여 테이블로 변환 후 Filesystem connector 로 S3에 Sink 하였고
  • 다른 하나는 Datastream api 의 StreamingFileSink 를 이용하여 S3 에 Sink 하였다.
  • 로컬 환경에서는 이상없이 실행 되었으나 AWS Managed Apache Flink 에 업로드 하니 Beam Runner 관련 장애가 발생하였다.

장애 메시지


2024-09-20 13:01:18
java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner flush
	at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:107)
	at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:292)
	at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.processElement(AbstractExternalOneInputPythonFunctionOperator.java:146)
	at org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.processElement(ExternalPythonProcessOperator.java:112)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:430)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:535)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collectWithTimestamp(StreamSourceContexts.java:115)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:190)
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:143)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)
Caused by: java.lang.RuntimeException: Failed to close remote bundle
	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:423)
	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:407)
	at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:86)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 694, in process_bundle
    bundle_processor.process_bundle(instruction_id))
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1119, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 569, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 571, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 262, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 265, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 183, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
AttributeError: 'NoneType' object has no attribute 'get'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 311, in _execute
    response = task()
               ^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 656, in do_instruction
    return getattr(self, request_type)(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 712, in process_bundle
    self.bundle_processor_cache.discard(instruction_id)
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 564, in discard
    processor.shutdown()
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1265, in shutdown
    op.teardown()
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 159, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.teardown
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 160, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.teardown
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 162, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.teardown
AttributeError: 'NoneType' object has no attribute 'values'

	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
	at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:61)
	at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:522)
	at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:615)
	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:421)
	... 7 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 694, in process_bundle
    bundle_processor.process_bundle(instruction_id))
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1119, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 569, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 571, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 262, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 265, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 183, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
AttributeError: 'NoneType' object has no attribute 'get'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 311, in _execute
    response = task()
               ^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 656, in do_instruction
    return getattr(self, request_type)(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 712, in process_bundle
    self.bundle_processor_cache.discard(instruction_id)
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 564, in discard
    processor.shutdown()
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1265, in shutdown
    op.teardown()
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 159, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.teardown
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 160, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.teardown
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 162, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.teardown
AttributeError: 'NoneType' object has no attribute 'values'

	at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
	at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
	at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
	at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
	at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
	at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:329)
	at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:314)
	at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
	at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	... 3 more

장애 대응 방법(with. 해결 과정)


  • 장애 검색 중 https://docs.aws.amazon.com/managed-flink/latest/java/flink-1-18.html 문서를 보고
  • Beam 관련 jar dependency 문제로 생각하여 관련 종속성 해결을 위해 Beam runner 등의 jar 파일을 pom.xml 에 선언하여 빌드한 zip 파일을 배포 하였다.
  • 그러나, 동일한 장애 메시지가 발생하거나 다른 메시지 였지만 여전히 동작하지 않았다.
  • 이후 https://docs.aws.amazon.com/managed-flink/latest/java/flink-1-19.html 버전의 문서를 보니까
    "There is no compatible Apache Flink Runner for Flink 1.19. For more information, see Flink Version Compatibility" 와 같이 기술 되어 있었다.
  • Beam 을 이용한 앱 개발이 아니라 상관 있을까 싶었지만, 한 번 Apache Flink 런타임 버전을 1.18 로 다운그레이드 후 재실행 보았다.
  • 역시나, 동일한 장애 메시지가 반환 되었다.
  • Github 저장소 검색 등을 하면서 많은 삽질 이후, 로컬에 설치한 Python 패키지 경로인 site-package 하위의 apache_beam 경로 내 자료들을 살펴 보았다.
  • Pyflink 경로에는 종속성이 필요한 부분은 모두 jar 파일이 존재 하였는데,
  • apache_beam 경로에는 .py, .pxd, .pyx 확장자 파일만 존재하였다.
  • 그래서 혹시 로컬에서 pip install apache-flink==1.19.0 을 통해 설치했을 때 종속성으로 같이 설치 되었던, apache-beam==2.48.0 버전과 AWS Managed Apache Flink 인스턴스 내 설치 되어 있는 apache-beam 패키지 버전이 달라서 발생하는 문제일까? 라는 생각으로
  • 로컬 작업 시 사용했던 가상환경의 pip list requirements.txt 로 만들고 StreamExecutionEnvironment 객체 하위의 set_python_requirements 메서드를 이용하여 requirements.txt 의 파일 경로를 requirements_file_path 인자값으로 넘겨주었다.
  • 이후 실행 로그를 보니 이미 버전이 같은 패키지는 캐싱 된 걸로 사용하였고, 아닐 경우 재설치 하여 Overwrite 되는 것 같았다.
  • 결과적으로 앱이 정상적으로 실행 되었다.

🏷️ 나의 PIP 리스트 참고

$ python3 -m pip list | grep apache
>>>
apache-beam            2.48.0
apache-flink           1.19.1
apache-flink-libraries 1.19.1

🏷️ site-package 경로 참고

Python dependency file path 선언하는 방법

# Set flink stream environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_python_requirements(requirements_file_path="file:///" + python_source_dir + "/requirements.txt")  # datastream api 이용 시 apache-beam 종속성 문제 해결을 위함
table_env = StreamTableEnvironment.create(env)


profile
안녕하세요. 데이터 엔지니어 김재민 입니다.

0개의 댓글