[데이터 엔지니어링 데브코스 2기] TIL-17주차 대용량 데이터 훈련 대비 Spark, SparkML 실습 (1)

이재호·2024년 2월 5일
0

1. Spark의 기타 개념


Broadcast Variable.

  • 룩업 테이블 등을 브로드캐스팅하여 셔플링을 막는 방식으로 사용.
    - 브로드캐스트 조인에서 사용되는 것과 동일한 테크닉.
    • 대부분 룩업 테이블을 Executor로 전송하는 데 사용.
      • 많은 DB에서 스타 스키마 형태로 팩트 테이블과 디멘션 테이블을 분리.
    • spark.sparkContext.broadcast를 사용.
  • 룩업 테이블을 UDF로 보내는 방법.
    - Closure.
    - Executor쪽에서 보이는 함수나 변수.
    - 테스크의 수만큼 발생.
    - Broadcast.
    - Serializator이 워커 노드 단위로 일어남.
    • Broadcast 데이터셋의 제한 사항.
      • 워커 노드로 공유되는 변경 불가 데이터.
        • 테스크 단위로 처리될 수 있을 정도의 데이터 크기.
  • 예제.
    - 가장 인기있는 히어로 찾기.
    • 히어로의 ID를 찾고 해당하는 이름 찾기.
      • 1: 룩업 테이블을 DataFrame으로 로딩하고 조인.
        • 2: 룩업 테이블을 브로드캐스팅하여 UDF에서 사용.

Accumulators.

  • 일종의 전역 변수. (하둡의 카운터와 유사)
  • 특정 이벤트의 수를 세는 데 사용.
  • 레코드 별로 세거나 합을 구하는 데 사용.
  • Transformation에서 사용. (with column)
  • DataFrame/RDD Foreach에서 사용.

Speculative Execution.

  • 느린 테스크를 다른 워커 노드에 있는 executor에서 중복 실행. 먼저 실행되는 테스크의 결과를 선택하고, 나머지 테스크들은 kill함.
  • 워커 노드의 하드웨어 이슈로 인한 오류 방지 가능.
  • Data Skew로 인해 오래 걸리는 경우 리소스 낭비의 문제점 존재.

Scheduler.

  • Spark의 리소스 할당 정책.
  • Spark App 간의 리소스 할당: 리소스 매니저가 결정하며, YARN의 경우 FIFO, FAIR, CAPACITY 방식으로 할당됨. (한 번 리소스를 할당 받으면 해당 리소스를 끝까지 들고 가는 것이 기본.)
  • 하나의 Spark App 안에서 Job들 간의 리소스 할당: FIFO 형태로 처음 잡이 필요한 대로 리소스를 받아서 쓰는 것이 기본.
  • FIFO(기본) vs. FAIR(라운드 로빈)
  • 병렬성 증대 -> 쓰레드 활용. (FAIR 모드에서 유용.)
  • spark.scheduler.mode = {FIFO, FAIR}

Spark App의 리소스 요구/릴리스 방식.

  • Static(default) vs. Dynamic(spark-submit --num-executors 100 --executor-cores 4 --executor-memory 32G)

Dynamic Resource Allocation.

  • spark.dynamicAllocation.{환경변수}=true로 환경변수 설정을 함.

Driver의 역할.

  • Spark App = 1 Driver + n*Executor
  • main 함수 실행 및 SparkSession/SparkContext 생성.
  • 코드를 테스크로 변환하여 DAG 생성.
  • 이를 executor/logical/physical plan으로 변환.
  • 리소스 매니저의 도움을 받아 테스크들을 실행하고 관리.
  • task의 수가 너무 많아지면 driver 메모리 에러 발생.
  • 위 정보들을 web UI로 노출시킴(4040 port#).

Driver의 메모리 구성.

  • spark.driver.memory, spark.driver.cores, spark.driver.memoryOverhead

Executor 메모리 구성.

  • spark.executor.memory, spark.executor.cores, spark.executor.memoryOverhead

메모리 이슈 정리.

  • Driver와 Executor에서 발생 가능한 이슈.
    - Driver OOM(OutOfMemory):
    - 큰 데이터셋을 collect로 호출했을 때 메모리가 충분하지 않을 경우
    - 큰 데이터셋을 Broadcast JOIN할 경우.
    - Python이나 R 등으로 작성된 코드.
    - 테스크가 너무 많을 경우.
    • Executor OOM:
      • 너무 큰 executor.cores 값.
        • Data Skew (Big Partition)

JVM과 파이썬 간의 통신.

  • PySpark Driver = Python process + JVM process.

PySpark Memory.

  • Spark은 JVM App이지만, PySpark은 Python 프로세스.
  • JVM에서 바로 동작하지 못하고 JVM 메모리를 사용할 수 없음.
  • spark.executor.pyspark.memory
    - PySpark이 사용할 수 있는 메모리 값이 해당 값으로 고정됨 (디폴트는 overhead memory).
  • spark.python.worker.memory
    - 디폴트 값이 512MB (JVM-Python 통신을 위해 Py4J가 필요한 메모리의 양)

Spark - Python 간의 통신.

  • Py4J : 파이썬과 JVM 간의 데이터 교환을 통해 둘 간의 연동을 도와주는 프레임워크.

Caching과 Persist.

  • Storage Memory Pool은 용량의 한계가 있으므로 캐싱이 필요함.
  • 반복돼서 사용되는 데이터프레임을 메모리에 저장(캐싱).
  • 캐싱에는 cache()와 persist()라는 두 가지 방법이 존재. (두 가지 모두 메모리나 디스크에 데이터프레임을 보존함.)
  • 캐싱은 항상 파티션 단위로 메모리에 보존.
  • persist는 인자를 통해서 세부 제어가 가능.
    - useDisk, useMemory, useOffHeap, deserialized, replication 등등.

spark SQL을 사용한 Caching.

  • spark.sql("cache table table_name")
  • spark.sql("cache lazy table table_name")
  • spark.sql("uncache table table_name")
  • 보통 LRU(Least Recently Used)를 기준으로 언캐싱됨.
  • spark.catalog.isCached("table_name") : 테이블이 캐싱됐었는지 체크.
  • spark.catalog.clearCache(): 모든 캐시 클리어.
  • Spark Web UI의 Storage 탭에서 caching 여부 확인 가능.
profile
천천히, 그리고 꾸준히.

0개의 댓글