생산 장비의 Deep Learning 분석을 적용할 장비를 확대했다. (1 -> 5)
Source 는 NIFI 이며 Kafka 에 데이터를 담고 Spark Structured Streaming 으로 SINK 하고 있는 환경이다.
이 때 Spark 에서 offset을 commit 하지 못하고 Application 이 죽는 현상이 발생 했다.
Spark Structured Streaming 어플리케이션을 다시 한 번 띄웠지만 여전히 offset 을 commit 하지 못하고 Application 이 죽었다.
Application 실행 시 executor 들이 죽었다가 다시 한 번 살아나고 이 후 아래 로그를 뱉고 꺼진다.
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (1048.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
Spark Driver 의 Memory 할당 문제
수집하는 데이터가 이미지이며, 해당 데이터를 건 당 FTP로 저장해줄 필요가 있어 dataframe 을 collect 해 주고 있다. 여기서 메모리를 많이 사용하는 것으로 예상 된다.
Spark 에서 Driver Memory 할당 문제로 보여 아래와 같이 단기적으로 많은 오프셋을 대응해보기 위해 memory 할당을 했다.
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-cores 8 \
--driver-memory 80G \
...
그러나 똑같은 현상으로 같은 에러 로그를 만났다.
어디가 문제일까?
아래의 질의응답에서 답을 얻을 수 있었다.
그러나 드라이버 자체의 memory 를 늘렸는데도 왜 이런 에러를 만나는 걸까?
maxResultSize 어떤 것이 다른지 확인이 필요하다.
아래 링크에서 maxResultSize 에 대한 설명을 찾아볼 수가 있었다.
"As per the official Spark documentation, spark.driver.maxResultSize defines the maximum limit of the total size of the serialized result that a driver can store for each Spark collect action (data in bytes). Sometimes this property also helps in the performance tuning of Spark Application. For more details on this property refer to Spark Configuration."
maxResultSize 는 드라이버가 각 Spark 수집 작업(바이트 단위 데이터)에 대해 저장할 수 있는 직렬화된 결과의 총 크기에 대한 최대 제한 값이라고 한다.
그러므로 collect 시 발생한 데이터 사이즈가 기본 값 1G 를 넘어기 때문에 발생하는 에러 이므로 driver 의 메모리와는 상관 없이 에러가 발생하는 것으로 보인다.
우선에는 기존에 사용하던 kafka-connect API 를 활용해서 에러가 난 offset 부터 시작하여 데이터를 천천히 처리했다.
이 후 Spark Application 에 maxOffsetsPerTrigger
옵션을 지정한 후 기동 시켰다.
글의 흐름 상 conf.set("spark.driver.maxResultSize", "<X>g")
옵션에서 1G 이상 Memory Size 를 증가 시키는게 방법이겠지만, 이게 원초적인 방안이 된다고 생각하지 않았다.
아래와 같은 이유에서 이런 판단을 하게 됐다.
그렇다면 여기서 정말 근본적인 해결책은 무엇일까?
maxResultSize
를 늘리는 것 보다는 maxOffsetsPerTrigger
를 적당히 지정하여 Backpressure 에 대비하는 것이 훨씬 근본적인 해결책이 된다.
물론 Dynamic allocation 을 활성화 시켜 탄력적으로 늘리는 방법도 있겠지만, 기본적으로 수집 때 DataFrame 을 collect 하게 되고 이는 executor 들에 부하를 주는 문제가 아니고 maxResultSize
를 늘려주는 것도 아니기 때문에 근본적인 해결책이 될 수 없을거라고 판단했다.
다만 maxOffsetsPerTrigger
는 몇가지 문제가 있어 충분히 고려해볼만한 문제가 발생한다.
maxOffsetsPerTrigger
는 Spark Structured Streaming 옵션이며, Spark Structured Streaming 은 첫 배치 때 이 옵션의 적용을 받지 않는다고 한다. maxOffsetsPerTrigger
를 작게 설정하면 당연히 Spark Application 에 부하는 가지 않겠지만 데이터 처리량이 줄어들게 되며, 이는 분석 서비스 장애와 직결된다.이러한 이유 때문에 현재는 maxOffsetsPerTrigger
를 작게 설정해서도 서비스가 가능한 수준이지만 추 후 kafka topic 의 partition 을 늘리고 같은 그룹으로 Spark Structured Streaming Application 을 늘려서 동작을 시켜보고자 한다.