- Spark 내부동작
- Spark 파일포맷
데이터는 디스크에 파일로 저장됨: 일에 맞게 최적화 필요

- HDFS에 저장되어 있는 파일이 어떤 포맷으로 저장됐는지가 성능에 굉장한 영향을 미침
- 가장 선호되는 타입 : Structured
- Binary file, 압축되어 있음, 내부에 스키마 정보를 갖고 있음
Spark의 주요 파일 타입

- 1: 압축되면 Splittable 하지 않음 (압축 방식에 따라 다름 - snappy 압축이라면 Splittable)
- Splittable = 데이터 블록들이 파티션으로 바로 올라갈 수 있는지
- 2: Spark의 기본 파일 포맷
- gzip으로 압축된 CSV, JSON 파일은 Splittable하지 않기 때문에 하나의 executor가 일단 처리하게 되며 메모리 에러가 날 확률이 높음
Parquet: Spark의 기본 파일 포맷
- 트위터와 클라우데라에서 공동 개발 (Doug Cutting)

DataFrame에서 다른 포맷 사용 방법
❖ DataFrame.write.format("avro"). …
❖ DataFrame.write.format("parquet"). …
- Execution Plan
다음 데이터 프레임 연산을 자세히 보자
spark.read.option("header", True).csv(“test.csv”). \
where("gender <> 'F'"). \
select("name", "gender"). \
groupby("gender"). \
count(). \
show()

- Transformations
- Narrow Dependencies: 독립적인 Partition level 작업
- Wide Dependencies: Shuffling이 필요한 작업
- groupby, reduceby, partitionby, repartition 등등
- Actions
- Read, Write, Show, Collect -> Job을 실행시킴 (실제 코드가 실행됨)
- Lazy Execution
- 어떤 장점이 존재할까?
- 더 많은 오퍼레이션을 볼 수 있기에 최적화를 더 잘할 수 있음. 그래서 SQL이 더 선호됨
- 하나의 Job은 하나 혹은 그 이상의 Stage로 구성됨, 각 Stage는 하나 혹은 그 이상의 Transformations로 구성됨.
- 1 Stage = 셔플링 없이 독립적, 병렬적으로 가능한 연산들. (셔플링이 생기면 2개 Stage)
- 하나의 Stage는 다수의 Task들로 구성됨, 각 Task들은 하나의
Narrow Dependencies Transformation을 실행
Jobs, Stages, Tasks
- Action -> Job -> 1+ Stages -> 1+ Tasks
- Action
- Job을 하나 만들어내고 코드가 실제로 실행됨
- Job
- 하나 혹은 그 이상의 Stage로 구성됨
- Stage는 Shuffling이 발생하는 경우 새로 생김
- Stage
- DAG의 형태로 구성된 Task들 존재
- 여기 Task들은 병렬 실행이 가능
- Task
- 가장 작은 실행 유닛으로 Executor에 의해 실행됨

❖ 다음 DataFrame 조작 코드에 적용해보자
spark.read.option("header", True).csv(“test.csv”). \
where("gender <> 'F'"). \
select("name", "gender"). \
groupby("gender"). \
count(). \
show()

- SHOW Action이 만들어낸 Job은 2개의 Stage로 구성됨
- 첫번째 Stage = WHERE, SELECT
- 두번째 Stage = GROUPBY 이후에 COUNT
- .option("inferSchema", True)가 추가되면 JOB이 하나 더 추가됨
- csv파일을 읽을 때 Schema의 타입을 제대로 알고 싶을 때 사용

WordCount 코드
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enabled", False) \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
df = spark.read.text("shakespeare.txt") # show()로 불렀을 때 Job이 하나 생김
df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count()
df_count.show()
# 하나의 Job, groupby로 인해 2개의 Stage
# show()로 안부르면 Job이 안생김.
WordCount Stage Visualization


JOIN 코드
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enabled", False) \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
df_large = spark.read.json("large_data/") # Job 1
df_small = spark.read.json("small_data/") # Job 2
join_expr = df_large.id == df_small.id
join_df = df_large.join(df_small, join_expr, "inner")
join_df.show() # Job 3
# 이때 Job은 3개가 만들어짐.
JOIN Stage Visualization

BROADCAST JOIN 코드
from pyspark.sql.functions import broadcast
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enabled", False) \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")
join_expr = df_large.id == df_small.id
join_df = df_large.join(broadcast(df_small), join_expr, "inner")
join_df.show()
# 이때
- 명시적으로 broadcast를 호출하지 않아도 옵션 세팅에 따라서 Spark이 알아서 최적화
- 이때 사용되는 configuration = spark.sql.adaptive.autoBroadcastJoinThreshold
BROADCAST JOIN Query Visualization

- Bucketing과 File System Partitioning
Bucketing과 File System Partitioning 소개
- 둘다 Hive 메타스토어의 사용이 필요: saveAsTable
- 데이터 저장을 이후 반복처리에 최적화된 방법으로 하는 것
- Bucketing
- 셔플링을 최소화하는 것이 목적
- 먼저 Aggregation이나 Window 함수나 JOIN에서 많이 사용되는 컬럼이 있는지?
- 있다면 데이터를 이 특정 컬럼(들)을 기준으로 테이블로 저장
- File System Partitioning
- File System에 저장되는 데이터를 특정 컬럼 혹은 컬럼들의 집합으로 나누어 저장.
- 원래 Hive에서 많이 사용
- 데이터의 특정 컬럼(들)을 기준으로 폴더 구조를 만들어 데이터 저장 최적화
- 위의 컬럼들을 Partition Key라고 부름
Bucketing
- DataFrame을 특정 ID(컬럼)를 기준으로 나눠서 테이블로 저장
- 한번 셔플링이 발생하지만 다수의 셔플링이 발생하는 것을 방지한다.
- 다음부터는 이를 로딩하여 사용함으로써 반복 처리시 시간 단축
- DataFrameWriter의 bucketBy 함수 사용
- 데이터의 특성을 잘 알고 있는 경우 사용 가능

File System Partitioning
- 데이터를 Partition Key 기반 폴더 (“Partition") 구조로 물리적으로 나눠 저장
- DataFrame에서 이야기하는 Partition
- Hive에서 사용하는 Partitioning을 말함
- Partitioning의 예와 잇점
- 굉장히 큰 로그 파일을 데이터 생성시간 기반으로 데이터 읽기를 많이 한다면?
- 데이터 자체를 연도-월-일의 폴더 구조로 저장
- 보통 위의 구조로 이미 저장되는 경우가 많음
- 이를 통해 데이터를 읽기 과정을 최적화 (스캐닝 과정이 줄어들거나 없어짐)
- 데이터 관리도 쉬워짐 (Retention Policy 적용시)
- DataFrameWriter의 partitionBy 사용
- Partition key를 잘못 선택하면 엄청나게 많은 파일들이 생성됨!
- Cardinality(가능한 값의 경우의 수)가 낮은 것으로 선택해야 함.
