AWS Managed Apache Flink - Parallelism scaling 할 때 발생하는 java.util.NoSuchElementException 장애 대응

김재민·2024년 9월 21일

flink

목록 보기
6/13
post-thumbnail

Spec


  • Flink: 1.19.0
  • Python: 3.11
  • Java: 11

장애 상황


  • AWS Managed Apache Flink 앱 배포 후 자동 크기 조정(Auto Scaling)을 활성화 하였는데, 병렬성 조정 된 이후 Apache Flink Dashboard 를 확인 해보니 java.util.NoSuchElementException 장애가 발생하고 있었다.

장애 상태 이미지



장애 메시지


2024-09-21 12:26:20
java.util.NoSuchElementException
	at java.base/java.util.ArrayList$Itr.next(ArrayList.java:1000)
	at org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:117)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:134)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:799)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:753)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:753)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:712)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:929)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:753)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
	at java.base/java.lang.Thread.run(Thread.java:829)

장애 원인 분석


  • Flink dashboard 를 보니 Pyflink 앱의 compact-operator 단계에서 Failed 된 것을 확인 하였고, 장애 메시지와 함께 구글링 하였다.
  • FLINK-31689 Jira 이슈를 통해 데이터 압축 시 CompactOperator 클래스를 사용한다는 것을 알았다.
  • Flink 의 병렬성이 조정될 경우 CompactOperator 클래스에서 이전 체크포인트의 상태 값을 get()put() 하게 되는데 이 과정에서 장애가 발생한 것이라 한다.
  • 내가 개발한 PyFlink App 은 Table api 에서 filesystem connector 를 이용하여 AWS S3 로 Json 데이터를 write 할 때 auto-compactiontrue 로 설정하여 사용하고 있다.
  • filesystem connector 사용 시 flink-connector-files-1.19.0.jar 을 참조하는데, flink-connector-files 문서 를 보니까 table api 에서 auto-compaction 활성화 시 org.apache.flink.connector.file.table.stream.compact 클래스를 사용하는 것 같았다.
  • 해당 클래스에서 역시 CompactorOperator 를 참조한다.
  • 그래서 내가 배포한 PyFlink App 의 병렬성이 조정 되면서 이전 체크포인트 상태값과 호환 되지 못하고 java.util.NoSuchElementException 장애가 발생한 것 같다.
  • 장애 메시지 첫 줄을 보면 at java.base/java.util.ArrayList$Itr.next(ArrayList.java:1000) 와 같은데 org.apache.flink.connector.file.table.stream.compact 소스 코드를 보면 java.util.ArrayList 를 import 하고 있다.
  • 그리고 코드 113 라인을 보면 다음고 같다.
if (context.isRestored()) {
    this.expiredFiles.putAll(this.expiredFilesState.get().iterator().next());
}

🏷️ 나는 Java 를 잘 모르기 때문에 ChatGPT 에게 CompactorOperator 소스 코드를 학습 시키고 코드 113 라인 역할에 대해 물어보았다.

# ChatGPT
>>>
this.expiredFiles.putAll(this.expiredFilesState.get().iterator().next()); 
이 코드는 이전 체크포인트에서 저장된 상태(state)를 복원하는 역할을 합니다. 
자세히 설명하면 다음과 같습니다:

- this.expiredFilesState.get(): 현재 상태를 가져옵니다. 이 상태는 Map<Long, List<Path>> 형태로 저장되어 있습니다.
- iterator().next(): 상태에서 첫 번째 요소(Map)를 가져옵니다.
- putAll(...): 가져온 Map의 모든 항목을 this.expiredFiles에 추가합니다. 이를 통해 이전에 저장된 만료된 파일(expired files) 목록을 복원하게 됩니다.

즉, 이 코드는 이전 실행에서 만료된 파일들의 목록을 복원하여 현재 expiredFiles에 저장하는 역할을 합니다.
  • 역시, 병렬성이 스케일링 되면서 이전 상태값을 가져오는 과정에서 장애가 발생한 것이다.
  • 이 장애를 해결하기 위해 https://github.com/apache/flink/pull/22400 에 PR 이 올라와 있지만 아직 병합 되지 않은 것 같다.

지금까지의 대응 가능한 방법 기록


  1. auto-scaling 을 포기하지 못할 경우: table api 에서 filesystem connector 사용 시 auto-compaction 속성을 false 로 사용한다.
  2. auto-scaling 을 포기해도 될 경우: auto-scaling 설정을 disable 하여 사용한다.(auto-compaction 값은 true 로 사용한다)
  3. https://github.com/apache/flink/blob/761d22978d6b04924c4bb2fdf8bffacfe920500d/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java 아직 pr 이 완료되지 않았지만 해당 CompactOperator.java 파일을 이용하여 새로 빌드한 jar 를 사용한다.
profile
안녕하세요. 데이터 엔지니어 김재민 입니다.

0개의 댓글