Broadcast Variable.
- 룩업 테이블 등을 브로드캐스팅하여 셔플링을 막는 방식으로 사용.
- 브로드캐스트 조인에서 사용되는 것과 동일한 테크닉.
- 대부분 룩업 테이블을 Executor로 전송하는 데 사용.
- 많은 DB에서 스타 스키마 형태로 팩트 테이블과 디멘션 테이블을 분리.
- spark.sparkContext.broadcast를 사용.
- 룩업 테이블을 UDF로 보내는 방법.
- Closure.
- Executor쪽에서 보이는 함수나 변수.
- 테스크의 수만큼 발생.
- Broadcast.
- Serializator이 워커 노드 단위로 일어남.
- Broadcast 데이터셋의 제한 사항.
- 워커 노드로 공유되는 변경 불가 데이터.
- 테스크 단위로 처리될 수 있을 정도의 데이터 크기.
- 예제.
- 가장 인기있는 히어로 찾기.
- 히어로의 ID를 찾고 해당하는 이름 찾기.
- 1: 룩업 테이블을 DataFrame으로 로딩하고 조인.
- 2: 룩업 테이블을 브로드캐스팅하여 UDF에서 사용.
Accumulators.
- 일종의 전역 변수. (하둡의 카운터와 유사)
- 특정 이벤트의 수를 세는 데 사용.
- 레코드 별로 세거나 합을 구하는 데 사용.
- Transformation에서 사용. (with column)
- DataFrame/RDD Foreach에서 사용.
Speculative Execution.
- 느린 테스크를 다른 워커 노드에 있는 executor에서 중복 실행. 먼저 실행되는 테스크의 결과를 선택하고, 나머지 테스크들은 kill함.
- 워커 노드의 하드웨어 이슈로 인한 오류 방지 가능.
- Data Skew로 인해 오래 걸리는 경우 리소스 낭비의 문제점 존재.
Scheduler.
- Spark의 리소스 할당 정책.
- Spark App 간의 리소스 할당: 리소스 매니저가 결정하며, YARN의 경우 FIFO, FAIR, CAPACITY 방식으로 할당됨. (한 번 리소스를 할당 받으면 해당 리소스를 끝까지 들고 가는 것이 기본.)
- 하나의 Spark App 안에서 Job들 간의 리소스 할당: FIFO 형태로 처음 잡이 필요한 대로 리소스를 받아서 쓰는 것이 기본.
- FIFO(기본) vs. FAIR(라운드 로빈)
- 병렬성 증대 -> 쓰레드 활용. (FAIR 모드에서 유용.)
- spark.scheduler.mode = {FIFO, FAIR}
Spark App의 리소스 요구/릴리스 방식.
- Static(default) vs. Dynamic(
spark-submit --num-executors 100 --executor-cores 4 --executor-memory 32G
)
Dynamic Resource Allocation.
spark.dynamicAllocation.{환경변수}=true
로 환경변수 설정을 함.
Driver의 역할.
- Spark App = 1 Driver + n*Executor
- main 함수 실행 및 SparkSession/SparkContext 생성.
- 코드를 테스크로 변환하여 DAG 생성.
- 이를 executor/logical/physical plan으로 변환.
- 리소스 매니저의 도움을 받아 테스크들을 실행하고 관리.
- task의 수가 너무 많아지면 driver 메모리 에러 발생.
- 위 정보들을 web UI로 노출시킴(4040 port#).
Driver의 메모리 구성.
- spark.driver.memory, spark.driver.cores, spark.driver.memoryOverhead
Executor 메모리 구성.
- spark.executor.memory, spark.executor.cores, spark.executor.memoryOverhead
메모리 이슈 정리.
- Driver와 Executor에서 발생 가능한 이슈.
- Driver OOM(OutOfMemory):
- 큰 데이터셋을 collect로 호출했을 때 메모리가 충분하지 않을 경우
- 큰 데이터셋을 Broadcast JOIN할 경우.
- Python이나 R 등으로 작성된 코드.
- 테스크가 너무 많을 경우.
- Executor OOM:
- 너무 큰 executor.cores 값.
- Data Skew (Big Partition)
JVM과 파이썬 간의 통신.
- PySpark Driver = Python process + JVM process.
PySpark Memory.
- Spark은 JVM App이지만, PySpark은 Python 프로세스.
- JVM에서 바로 동작하지 못하고 JVM 메모리를 사용할 수 없음.
- spark.executor.pyspark.memory
- PySpark이 사용할 수 있는 메모리 값이 해당 값으로 고정됨 (디폴트는 overhead memory).- spark.python.worker.memory
- 디폴트 값이 512MB (JVM-Python 통신을 위해 Py4J가 필요한 메모리의 양)
Spark - Python 간의 통신.
- Py4J : 파이썬과 JVM 간의 데이터 교환을 통해 둘 간의 연동을 도와주는 프레임워크.
Caching과 Persist.
- Storage Memory Pool은 용량의 한계가 있으므로 캐싱이 필요함.
- 반복돼서 사용되는 데이터프레임을 메모리에 저장(캐싱).
- 캐싱에는 cache()와 persist()라는 두 가지 방법이 존재. (두 가지 모두 메모리나 디스크에 데이터프레임을 보존함.)
- 캐싱은 항상 파티션 단위로 메모리에 보존.
- persist는 인자를 통해서 세부 제어가 가능.
- useDisk, useMemory, useOffHeap, deserialized, replication 등등.
spark SQL을 사용한 Caching.
spark.sql("cache table table_name")
spark.sql("cache lazy table table_name")
spark.sql("uncache table table_name")
- 보통 LRU(Least Recently Used)를 기준으로 언캐싱됨.
spark.catalog.isCached("table_name")
: 테이블이 캐싱됐었는지 체크.spark.catalog.clearCache()
: 모든 캐시 클리어.- Spark Web UI의 Storage 탭에서 caching 여부 확인 가능.