[토크ON세미나] 아파치 스파크 입문 - 5강 운영과 모니터링 Q&A

넘실넘실·2025년 2월 1일

▶️ 원본 영상: https://youtu.be/eKLvaOJ3c5Q

🔖 Spark Runtime Architecture

  • 스파크 드라이브에서 메인 함수 실행, 스파크 세션 생성
    • Spark Context = Spark Session
  • spark submit으로 클러스터 매니저(Mesos, YARN, Standalone)로 job과 필요한 리소스(executor 개수, 코어 개수, executor 별 메모리) 요청
    • 클러스터 매니저의 역할은 요청에 맞게 워커를 할당. 리소스가 없다면 spark submit을 뱉어버림
  • 드라이버 프로그램의 스파크 세션 생성
    • Spark Context = Spark Session

🔖 Spark runtime components

  • 왼쪽: 클러스터 안에 드라이버가 포함
    • 제약된 리소스를 공유
    • 대부분의 애플리케이션들은 드라이버를 클러스터 안에 띄움. 드라이버 이슈가 발생하면 클러스터 매니저가 재구동해줌
  • 오른쪽: 클러스터 바깥에 드라이버가 포함
    • 전용 리소스를 점유해야하는 경우

🔖 Spark Driver

  • Spark session 생성
  • 클러스터 매니저에게 작업 할당된 executor에 대해서 어떤 태스크를 실행할 지

🔖 Terminology

  • Driver: SparkContext(=Spark session)를 포함하는 프로세스. 메인 함수 포함. 실제로 동작하는 애플리케이션
  • Executor: 실제로 스파크에서 병렬로 동작할 수 있도록 각각의 노드에 분산돼서 작동하는 프로세스
  • Master: 어플리케이션을 매니지할 수 있는 프로세스.
    • 클러스터 매니저의 구성(Yarn, Standalone 등등)에 따라 달라짐
  • Work: 노드 단위.

🔖 Spark Deployment

  • shared storage, object storage, s3, HDFS, 로컬 파일 시스템
  • 하둡에서는 Yarn을 사용
  • 스파크만 사용하겠다 → Standalone

🔖 Cluster Management

  • 드라이버 프로그램은 SparkContext 생성
  • 던진 코드는 각각의 worker executor들이 수행

🔖 Spark Deployement - Local Mode

./bin/spark-shell --master local[12]

./bin/spark-submiet --name "MyFirstApp" --master local[12] myApp.jar
val conf = new SparkConf()
              .setMaster("local[12]")
              .setAppnAme("MyFirstApp")
              .set("spark.executor.memory", "3g")
val sc = new SparkContext(conf)

🔖 Spark Deployement - StandAlone Mode

  • 스파크 전용 구성: 노드들 전체 리소스를 사용. CPU, 메모리 전체 사용
  • 주키퍼를 이용하여 마스터 이중화 관리를 주의해야 함
    • YARN은 마스터 관리를 따로 해주기 때문. 마스터가 죽거나 다시 살리는 작업을 하지만 StandAlone 모드는 백업이나 리더를 지정하여 관리를 해줘야 함

🔖 Spark Deployement - Yarn Mode

  • YARN을 쓰는 목적은 다양한 어플리케이션을 사용
  • YARN에서 전체 리소스 매니지먼트
  • spark job을 요청하면 YARN에서 리소스를 확인하고 정보를 줌
  • 리소스를 요청하면 YARN의 리소스 매니저가 노드 매니저를 통해서 애플리케이션이 동작 될 수 있도록 할당
  • 노드 매니저 안에 YARN 컨테이너로 구성
  • YARN 컨테이너 안에는 executor가 실행. 캐시와 태스크 영역이 들어가 있음
  • YARN mode
    • YARN client mode: 드라이버가 밖으로 빼야한다면
    • YARN cluster mode: 드라이버가 클러스터 내에서 실행할 수 있다면

🔖 Spark Deployment - Yarn Cluster

  • 리소스 매니저로부터 스파크 마스터와 executor를 할당 받게 됨

🔖 Spark Deployement - Yarn Client

  • 스파크 드라이버가 바깥의 클라이언트 쪽에서 로드
Spark Deployment - Running thrift server

sbin/start-thriftserver.sh
  --master yarn \
  --deploy-mode client \
  --executor-memeory 30G \
  --num-executor 60 \
  --executor-cores 8 \
  --driver-cores 12 \
  --supervise \
  --driver-memory 40G \
  --name "thrift"
  • thrift 서버를 띄우면 JDBC, ODBC 연결이 가능
  • 드라이버 쪽이 드라이버 thrift 서버가 됨
    -> 드라이버가 executor에 비해서 많은 리소스를 점유해야 함
    -> 드라이버 코어를 따로 설정하거나 클라이언트로 더 많은 자원을 드라이버에 주는 YARN client mode를 사용할 수 있음

🔖 Dynamic Resources Allocation on YARN

  • exeternal shuffle을 사용하여 리소스의 할당을 워커노드에 따라서 scale-in/out
    • scale out 하는 경우 수행하고 있는 RDD의 정보를 보내줘야 할 때 사용하는 것이 external shuffle
  • 설정 방법
    1. shuffle plugin add jar
    2. yarn-site.xml add plugin
<property>
  <name>yarn.nodemanager.aux-servies</name>
  <value>mapreduce_shuffle,spark_shuffle</value>
</property>
<property>
  <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
  <value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
  1. edit spark-default.conf
spark.dynamicAllocation.enalbe true
spark.shuffle.service.enabled true
spark.dynamicAllocation.minExecutors 50
spark.dynamicAllocation.maxExecutors 100
spark.dynamicAllocation.ubutuakExecutors 50
spark.dynamicAllocation.cacheExecutorIdleTimeout 600

Spark Deployment - Job Server

  • YARN 기반으로 리소스 매니저를 사용하고 있고 매번 제출할 때마다 드라이버가 동작되면 SparkContext가 생성되고 executor 할당을 받아 job이 실행됨
    -> 그러나 부트 타임이 걸림. SQL을 한번 실행할거지만 SparkContext가 여러 개 생성됨
    -> 이를 해결 하기 위해 Job server 사용
  • 하나의 SparkContext를 공유하여 여러 개의 쿼리를 처리

🔖 Spark with YARN on EMR

  • S3에 데이터가 저장되고 데이터를 읽는 방식으로 진행
  • EMR 노드에 있는 디스크는 temporary 영역으로만 사용, 물리적으로 저장되는 건 S3

🔖 Spark meet K8s

  • 쿠버네티스 기반으로 스파크를 구동할 수 있음
  • 쿠버네티스 파드(pod): 여러가지 프로세스를 띄울 수 있음. 셔플 서비스를 통해 스케일링 지원
  • 쿠버네티스를 고려해볼 만한 케이스: ML 연산 작업 시 리소스를 많이 쓰거나 리소스를 해제 후 다시 받는 케이스가 많음

🔖 Design Choices

  • Scala vs Java vs Python vs R
    • 분석가는 pyspark 추천
    • R은 R 서버에 의존적임
  • Dataframe vs SQL vs Dtasets vs RDDs
    • Dataframe: 가장 일반적인 인터페이스
    • Dataset: 컴파일 타임 에러나 방어코드, 코드 제외 등등
  • Object Serialization in RDDs(spark.serializer to org.apache.spark.serializer.KryoSerializer)
  • Dynamic allocaiton:

🔖 Memory Pressure and Garbage Collection

  • Measuring the impact of gargace collection
    • adding -verbose: gc -XX:+PrintGCDetails -XX: +PrintGCTimeStamps to Spark’s JVM options using the spark.executor.extraJavaOption configuration paramter
  • Garbage collection tuning
    • ry the G1GC garbage collector with -XX:+UseG1GC
    • large executor heap sizes, it can be important to increase the G1 region size with -XX:G1HeapRegionSize

🔖 Parallelism

  • spark.default.parallelism
  • spark.sql.shuffle.partitions

🔖 Temporary Data Storage(Caching)

  • raw data를 불러와서 처리하는 4개의 데이터프레임이 존재한다고 가정하자. 캐쉬로 해서 각각 executor 캐시 영역으로 올려버리면 데이터프레임은 저 캐시를 바라보기 때문에 raw data를 처음부터 읽지 않음
DF1 = spark.read.format("csv")\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .load("/data/flight-data/csv/2015-summary.csv")
DF2 = DF1.groupBy("DEST_CONTRY_NAME").count().collect()
DF3 = DF1.groupBy("ORIGIN_COUNTRY_NAME").count().collect()
DF4 = DF1.gruopBy("count").count().collect()
DF1.cache()
DF1.cont()

🔖 Data locality

  • Data locality means close the data is to the code to be processed
Data LocalityMeaningSpecial Notes
ANY데이터가 어디에 있는지 모름다른 옵션이 없는 한 권장되지 않음
PROCESS_LOCALJVM에 이미 포함
데이터와 코드가 같은 위치에 있음
locality가 가장 좋은 경우
NODE_LOCAL해당 블락 해당 노드에 있음네트워크를 타지 않고 디스크만 타면 됨
RACK_LOCAL노드에는 없지만 같은 RACK에는 있음내부 I/O만 사용
NO_PERF어디서든 동일하게 접근 가능has no locality preference

🔖 Resource management using YARN

🔖 JVM Memory Management

  • Java Heap 사이즈는 전체 executor 메모리의 90%
  • Storage 영역이 실제로 캐시되는 영역. 기본이 60%
  • 캐쉬가 다 차면 LRU 알고리즘에 의하여 오래된 것들은 ??

🔖 Monitoring

  • 클러스터에서는 드라이버와 각각의 executor을 모니터링
  • OS/머신
  • 클러스터
  • 네크워크
  • JVM

🔖 Spark First Aids

⚠️ Spark Job이 시작되지 않음

  • [증상]

    • submit 했지만 spark job이 시작되지 않음
    • spark UI가 드라이버 제외하고 클러스터에서 어떤 노드도 보이지 않음
    • spark UI에 들어가면 없다고 뜸
  • [원인]

    • 다른 포트를 사용하고 있음
    • 리소스를 제대로 할당받지 못하면 스파크 드라이버가 뜨지 않음

⚠️ Slow Tasks or Stragglers

  • 전체적으로 데이터가 균등하게 퍼져 동등하게 일을 하는 것이 베스트
  • [원인]
  • 전체 태스크 중에 종료된 건 80%인데 종료되지 않은 20%는 retry나 뭔가를 하고 있음. 그 - retry는 out of memory일 수도 있고 디스크나 네트워크 문제일 수도 있음
  • skew된 작업
  • [해결법]
    • 파티션을 늘림(리파티셔닝): 특정 블럭이 데이터가 균등하지 않고 커져있는 상태로 들어가 있다면 오래걸림.
      • 100개의 태스크가 있다고 가정하자. 200개를 리파티션 하면 200개의 태스크가 실행
    • 메모리를 더 올려줌

Driver OutOfMemoryError or Driver Unresponsice

⚠️ Spark Application is unresponsive or crashed OutOfMemoryErros or garbage collection messages in the driver logs

  • [해결법]
    • collect, count, take, show 같이 드라이버쪽에 호출되는 함수는 디버깅 단계에서 필요하지만 실제 어플리케이션에서는 사용 자제
    • broadcast join은 롱러닝 프로세스에서는 부담이 됨
      -> 드라이버 메모리나 드라이버 자원이 필요하다면 클라이언트처럼 따로 빼서 전담하여 메모리나 코어를 줄 수 있음

Executor OutOfMemoryError or Executor Unresponsive

⚠️ OutOfMemoryErros or garbage collection messages in the executor logs Executors that crash or become unresponsive

  • [원인]
    • external system과 연결하여 데이터를 옮김
  • [해결법]
    • 메모리 늘리기 = executor 늘리기
    • JVM 모니터링을 통해 executor 쪽 OutOfMemeory가 심하다면 JMX 통하여 모니터링
profile
어쩌다보니 데이터쟁이

0개의 댓글