
java.util.NoSuchElementException 장애가 발생하고 있었다.
2024-09-21 12:26:20
java.util.NoSuchElementException
at java.base/java.util.ArrayList$Itr.next(ArrayList.java:1000)
at org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:117)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:134)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:799)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:753)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:753)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:712)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:929)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:753)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
at java.base/java.lang.Thread.run(Thread.java:829)
Failed 된 것을 확인 하였고, 장애 메시지와 함께 구글링 하였다.CompactOperator 클래스를 사용한다는 것을 알았다.CompactOperator 클래스에서 이전 체크포인트의 상태 값을 get() 및 put() 하게 되는데 이 과정에서 장애가 발생한 것이라 한다.filesystem connector 를 이용하여 AWS S3 로 Json 데이터를 write 할 때 auto-compaction 을 true 로 설정하여 사용하고 있다.filesystem connector 사용 시 flink-connector-files-1.19.0.jar 을 참조하는데, flink-connector-files 문서 를 보니까 table api 에서 auto-compaction 활성화 시 org.apache.flink.connector.file.table.stream.compact 클래스를 사용하는 것 같았다.java.util.NoSuchElementException 장애가 발생한 것 같다.at java.base/java.util.ArrayList$Itr.next(ArrayList.java:1000) 와 같은데 org.apache.flink.connector.file.table.stream.compact 소스 코드를 보면 java.util.ArrayList 를 import 하고 있다.if (context.isRestored()) {
this.expiredFiles.putAll(this.expiredFilesState.get().iterator().next());
}
🏷️ 나는 Java 를 잘 모르기 때문에 ChatGPT 에게 CompactorOperator 소스 코드를 학습 시키고 코드 113 라인 역할에 대해 물어보았다.
# ChatGPT
>>>
this.expiredFiles.putAll(this.expiredFilesState.get().iterator().next());
이 코드는 이전 체크포인트에서 저장된 상태(state)를 복원하는 역할을 합니다.
자세히 설명하면 다음과 같습니다:
- this.expiredFilesState.get(): 현재 상태를 가져옵니다. 이 상태는 Map<Long, List<Path>> 형태로 저장되어 있습니다.
- iterator().next(): 상태에서 첫 번째 요소(Map)를 가져옵니다.
- putAll(...): 가져온 Map의 모든 항목을 this.expiredFiles에 추가합니다. 이를 통해 이전에 저장된 만료된 파일(expired files) 목록을 복원하게 됩니다.
즉, 이 코드는 이전 실행에서 만료된 파일들의 목록을 복원하여 현재 expiredFiles에 저장하는 역할을 합니다.