
2024-09-20 13:01:18
java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner flush
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:107)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:292)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.processElement(AbstractExternalOneInputPythonFunctionOperator.java:146)
at org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.processElement(ExternalPythonProcessOperator.java:112)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:430)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:535)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collectWithTimestamp(StreamSourceContexts.java:115)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:190)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:143)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)
Caused by: java.lang.RuntimeException: Failed to close remote bundle
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:423)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:407)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:86)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 694, in process_bundle
bundle_processor.process_bundle(instruction_id))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1119, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 569, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 571, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 262, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 265, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 183, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
AttributeError: 'NoneType' object has no attribute 'get'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 311, in _execute
response = task()
^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 656, in do_instruction
return getattr(self, request_type)(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 712, in process_bundle
self.bundle_processor_cache.discard(instruction_id)
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 564, in discard
processor.shutdown()
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1265, in shutdown
op.teardown()
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 159, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.teardown
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 160, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.teardown
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 162, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.teardown
AttributeError: 'NoneType' object has no attribute 'values'
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:61)
at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:522)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:615)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:421)
... 7 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 694, in process_bundle
bundle_processor.process_bundle(instruction_id))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1119, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 569, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 571, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 262, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 265, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 183, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
AttributeError: 'NoneType' object has no attribute 'get'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 311, in _execute
response = task()
^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 656, in do_instruction
return getattr(self, request_type)(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 712, in process_bundle
self.bundle_processor_cache.discard(instruction_id)
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 564, in discard
processor.shutdown()
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1265, in shutdown
op.teardown()
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 159, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.teardown
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 160, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.teardown
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 162, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.teardown
AttributeError: 'NoneType' object has no attribute 'values'
at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:329)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:314)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
... 3 more
- 장애 검색 중 https://docs.aws.amazon.com/managed-flink/latest/java/flink-1-18.html 문서를 보고
- Beam 관련 jar dependency 문제로 생각하여 관련 종속성 해결을 위해
Beam runner등의 jar 파일을pom.xml에 선언하여 빌드한 zip 파일을 배포 하였다.- 그러나, 동일한 장애 메시지가 발생하거나 다른 메시지 였지만 여전히 동작하지 않았다.
- 이후 https://docs.aws.amazon.com/managed-flink/latest/java/flink-1-19.html 버전의 문서를 보니까
"There is no compatible Apache Flink Runner for Flink 1.19. For more information, see Flink Version Compatibility" 와 같이 기술 되어 있었다.- Beam 을 이용한 앱 개발이 아니라 상관 있을까 싶었지만, 한 번 Apache Flink 런타임 버전을 1.18 로 다운그레이드 후 재실행 보았다.
- 역시나, 동일한 장애 메시지가 반환 되었다.
- Github 저장소 검색 등을 하면서 많은 삽질 이후, 로컬에 설치한 Python 패키지 경로인 site-package 하위의 apache_beam 경로 내 자료들을 살펴 보았다.
- Pyflink 경로에는 종속성이 필요한 부분은 모두 jar 파일이 존재 하였는데,
- apache_beam 경로에는
.py,.pxd,.pyx확장자 파일만 존재하였다.- 그래서 혹시 로컬에서
pip install apache-flink==1.19.0을 통해 설치했을 때 종속성으로 같이 설치 되었던,apache-beam==2.48.0버전과 AWS Managed Apache Flink 인스턴스 내 설치 되어 있는 apache-beam 패키지 버전이 달라서 발생하는 문제일까? 라는 생각으로- 로컬 작업 시 사용했던 가상환경의
pip list를requirements.txt로 만들고StreamExecutionEnvironment객체 하위의set_python_requirements메서드를 이용하여requirements.txt의 파일 경로를requirements_file_path인자값으로 넘겨주었다.- 이후 실행 로그를 보니 이미 버전이 같은 패키지는 캐싱 된 걸로 사용하였고, 아닐 경우 재설치 하여 Overwrite 되는 것 같았다.
- 결과적으로 앱이 정상적으로 실행 되었다.
🏷️ 나의 PIP 리스트 참고
$ python3 -m pip list | grep apache
>>>
apache-beam 2.48.0
apache-flink 1.19.1
apache-flink-libraries 1.19.1
🏷️ site-package 경로 참고

# Set flink stream environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_python_requirements(requirements_file_path="file:///" + python_source_dir + "/requirements.txt") # datastream api 이용 시 apache-beam 종속성 문제 해결을 위함
table_env = StreamTableEnvironment.create(env)