230706 - 빅데이터 처리 시스템

김지석·2023년 7월 12일
0

- Spark 내부동작

- Spark 파일포맷

데이터는 디스크에 파일로 저장됨: 일에 맞게 최적화 필요

  • HDFS에 저장되어 있는 파일이 어떤 포맷으로 저장됐는지가 성능에 굉장한 영향을 미침
  • 가장 선호되는 타입 : Structured
    • Binary file, 압축되어 있음, 내부에 스키마 정보를 갖고 있음

Spark의 주요 파일 타입

  • 1: 압축되면 Splittable 하지 않음 (압축 방식에 따라 다름 - snappy 압축이라면 Splittable)
    • Splittable = 데이터 블록들이 파티션으로 바로 올라갈 수 있는지
  • 2: Spark의 기본 파일 포맷
  • gzip으로 압축된 CSV, JSON 파일은 Splittable하지 않기 때문에 하나의 executor가 일단 처리하게 되며 메모리 에러가 날 확률이 높음

Parquet: Spark의 기본 파일 포맷

  • 트위터와 클라우데라에서 공동 개발 (Doug Cutting)

DataFrame에서 다른 포맷 사용 방법

❖ DataFrame.write.format("avro"). …
❖ DataFrame.write.format("parquet"). …

- Execution Plan

다음 데이터 프레임 연산을 자세히 보자

spark.read.option("header", True).csv(“test.csv”). \
	where("gender <> 'F'"). \
	select("name", "gender"). \
	groupby("gender"). \
	count(). \
	show()

Transformations and Actions

  • Transformations
    • Narrow Dependencies: 독립적인 Partition level 작업
      • select, filter, map 등등
    • Wide Dependencies: Shuffling이 필요한 작업
      • groupby, reduceby, partitionby, repartition 등등
  • Actions
    • Read, Write, Show, Collect -> Job을 실행시킴 (실제 코드가 실행됨)
    • Lazy Execution
    • 어떤 장점이 존재할까?
      • 더 많은 오퍼레이션을 볼 수 있기에 최적화를 더 잘할 수 있음. 그래서 SQL이 더 선호됨
  • 하나의 Job은 하나 혹은 그 이상의 Stage로 구성됨, 각 Stage는 하나 혹은 그 이상의 Transformations로 구성됨.
  • 1 Stage = 셔플링 없이 독립적, 병렬적으로 가능한 연산들. (셔플링이 생기면 2개 Stage)
  • 하나의 Stage는 다수의 Task들로 구성됨, 각 Task들은 하나의
    Narrow Dependencies Transformation을 실행

Jobs, Stages, Tasks

  • Action -> Job -> 1+ Stages -> 1+ Tasks
  • Action
    • Job을 하나 만들어내고 코드가 실제로 실행됨
  • Job
    • 하나 혹은 그 이상의 Stage로 구성됨
    • Stage는 Shuffling이 발생하는 경우 새로 생김
  • Stage
    • DAG의 형태로 구성된 Task들 존재
    • 여기 Task들은 병렬 실행이 가능
  • Task
    • 가장 작은 실행 유닛으로 Executor에 의해 실행됨

Transformations and Actions 시각화

❖ 다음 DataFrame 조작 코드에 적용해보자

spark.read.option("header", True).csv(“test.csv”). \
	where("gender <> 'F'"). \
	select("name", "gender"). \
	groupby("gender"). \
	count(). \
	show()

  • SHOW Action이 만들어낸 Job은 2개의 Stage로 구성됨
  • 첫번째 Stage = WHERE, SELECT
  • 두번째 Stage = GROUPBY 이후에 COUNT
  • .option("inferSchema", True)가 추가되면 JOB이 하나 더 추가됨
    • csv파일을 읽을 때 Schema의 타입을 제대로 알고 싶을 때 사용

WordCount 코드

spark = SparkSession \
	.builder \
	.master("local[3]") \
	.appName("SparkSchemaDemo") \
	.config("spark.sql.adaptive.enabled", False) \
	.config("spark.sql.shuffle.partitions", 3) \
	.getOrCreate()

df = spark.read.text("shakespeare.txt") # show()로 불렀을 때 Job이 하나 생김
df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count()

df_count.show()

# 하나의 Job, groupby로 인해 2개의 Stage
# show()로 안부르면 Job이 안생김.

WordCount Stage Visualization

JOIN 코드

spark = SparkSession \
	.builder \
	.master("local[3]") \
	.appName("SparkSchemaDemo") \
	.config("spark.sql.adaptive.enabled", False) \
	.config("spark.sql.shuffle.partitions", 3) \
	.getOrCreate()
    
df_large = spark.read.json("large_data/")    # Job 1
df_small = spark.read.json("small_data/")    # Job 2

join_expr = df_large.id == df_small.id
join_df = df_large.join(df_small, join_expr, "inner")

join_df.show()    # Job 3

# 이때 Job은 3개가 만들어짐.

JOIN Stage Visualization

BROADCAST JOIN 코드

from pyspark.sql.functions import broadcast
spark = SparkSession \
	.builder \
	.master("local[3]") \
	.appName("SparkSchemaDemo") \
	.config("spark.sql.adaptive.enabled", False) \
	.config("spark.sql.shuffle.partitions", 3) \
	.getOrCreate()

df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")

join_expr = df_large.id == df_small.id
join_df = df_large.join(broadcast(df_small), join_expr, "inner")

join_df.show()

# 이때 
  • 명시적으로 broadcast를 호출하지 않아도 옵션 세팅에 따라서 Spark이 알아서 최적화
    • 이때 사용되는 configuration = spark.sql.adaptive.autoBroadcastJoinThreshold

BROADCAST JOIN Query Visualization

- Bucketing과 File System Partitioning

Bucketing과 File System Partitioning 소개

  • 둘다 Hive 메타스토어의 사용이 필요: saveAsTable
  • 데이터 저장을 이후 반복처리에 최적화된 방법으로 하는 것
  • Bucketing
    • 셔플링을 최소화하는 것이 목적
    • 먼저 Aggregation이나 Window 함수나 JOIN에서 많이 사용되는 컬럼이 있는지?
    • 있다면 데이터를 이 특정 컬럼(들)을 기준으로 테이블로 저장
      • 이 때의 버킷의 개수도 지정
  • File System Partitioning
    • File System에 저장되는 데이터를 특정 컬럼 혹은 컬럼들의 집합으로 나누어 저장.
    • 원래 Hive에서 많이 사용
    • 데이터의 특정 컬럼(들)을 기준으로 폴더 구조를 만들어 데이터 저장 최적화
      • 위의 컬럼들을 Partition Key라고 부름

Bucketing

  • DataFrame을 특정 ID(컬럼)를 기준으로 나눠서 테이블로 저장
    • 한번 셔플링이 발생하지만 다수의 셔플링이 발생하는 것을 방지한다.
    • 다음부터는 이를 로딩하여 사용함으로써 반복 처리시 시간 단축
      • DataFrameWriter의 bucketBy 함수 사용
        • Bucket의 수와 기준 ID(컬럼) 지정
    • 데이터의 특성을 잘 알고 있는 경우 사용 가능

File System Partitioning

  • 데이터를 Partition Key 기반 폴더 (“Partition") 구조로 물리적으로 나눠 저장
    • DataFrame에서 이야기하는 Partition
    • Hive에서 사용하는 Partitioning을 말함
  • Partitioning의 예와 잇점
    • 굉장히 큰 로그 파일을 데이터 생성시간 기반으로 데이터 읽기를 많이 한다면?
      1. 데이터 자체를 연도-월-일의 폴더 구조로 저장
      2. 보통 위의 구조로 이미 저장되는 경우가 많음
    • 이를 통해 데이터를 읽기 과정을 최적화 (스캐닝 과정이 줄어들거나 없어짐)
    • 데이터 관리도 쉬워짐 (Retention Policy 적용시)
  • DataFrameWriter의 partitionBy 사용
    • Partition key를 잘못 선택하면 엄청나게 많은 파일들이 생성됨!
    • Cardinality(가능한 값의 경우의 수)가 낮은 것으로 선택해야 함.

profile
초짜에요...

0개의 댓글