Spark 내부동작

데이터는 디스크에 파일로 저장됨: 일에 맞게 최적화 필요

Unstructured

  • Text -> Human readable

Semi-structured

  • Json, XML, CSV

Structured

  • Parquet(가장 많이 사용되는), avro, orc, SequenceFile


splittable 여부는 hdfs에 파일이 저장될 때, 데이터 블록 단위로 저장된다.그 데이터 블록이 업로드 될 때, spark에서 로딩이 될 때 파티션으로 바로 올라갈 수 있느냐? -> splittable

csv와 json은 압축이 되면 splittable하지 않는다. 압축이 안된 상태라면 splittable

parquet


parquet이 사용하는 방식

  • 하나의 데이터 블록은 하나의 row group으로 구성됨. 같은 row group 안에서는 column wise storage 방식으로 저장.

대부분 우리가 쓰는 데이터는 파티션이 1개이다. 데이터가 적기 때문에
df에 repartition을 해주게 되면, 동일한 갯수로 데이터가 분배된다.

df.coalesce는 파티션의 수를 줄이는 역할이다. 셔플링을 최소화하는 방향으로 파티션을 줄인다.

parquet은 파티션의 수만큼 나눠서 저장하는듯 -> 이 부분은 좀 더 알아보자.
parquet은 저장할 때, snappy.parquet인데, 이 부분은 압축 방식중 하나이다. 압축되어도 splitable하다.


parquet 같은 경우 mergeSchema를 통해서 schemaeveolution이 가능하다. 컬럼이 맞지 않아도 된다.


스키마 2,3의 컬럼이 달라도 합칠 수 있는 것을 확인할 수 있음.

Execution plan

spark.read.option("header", True).csv(“test.csv”). \ where("gender <> 'F'"). \
select("name", "gender"). \
groupby("gender"). \
count(). \ show()

show를 하면 스파크에서 도는 데이터프레임이 일부 드라이버쪽으로 넘어와서 디스플레이 되는 것

이 것은 하나 혹은 그 이상의 파티션으로 구성될 것임.
filtering(where) or selecting같은 경우 셔플링이 필요없고 해당 파티션에서 작동 가능-> executor에서 돌아가는 task들이 독립적으로 수행.

groupby가 호출 되는 순간, group by key에 맞게 같은 값을 가진 레코드끼리 같은 파티션으로 재정렬, 이 때 셔플링이 생김.

count는 해당 파티션 내에서 가능 병렬적으로 수행가능하다.

show는 action이라고 함. 앞의 작업을 실제로 실행을 시키는 역할. spark는 lazy execution을 하게 됨. 처음에는 앞의 transforamtion 단의 작용들은 수행되지 않음. write, read, collect가 해당된다.

transformation에는 2가지 종류가 포함된다.
파티션 내에서 병렬적으로 독립적으로 가능한 narrow transformation이 있고
셔플링이 필요한 wide transformation이 있다.

Transformations

● Narrow Dependencies: 독립적인 Partition level 작업 1. select, filter, map 등등
● Wide Dependencies: Shuffling이 필요한 작업 1. groupby, reduceby, partitionby, repartition 등등
❖ Actions
● Read, Write, Show, Collect -> Job을 실행시킴 (실제 코드가 실행됨) -> job이라고 부름 하나의 job은 하나 혹은 그 이상의 stage로 구성되고, stage는 하나 혹은 다수의 transformation으로 구성.
stage는 셔플링 없이 병렬적으로 독립적으로 가능한 연산

● Lazy Execution
1. 어떤 장점이 존재할까?
• 더 많은 오퍼레이션을 볼 수 있기에 최적화를 더 잘 할 수 있음.그래서SQL이더선호됨

결론

❖ Action -> Job -> 1+ Stages -> 1+ Tasks ❖ Action
● Job을 하나 만들어내고 코드가 실제로 실행됨
❖ Job
● 하나 혹은 그 이상의 Stage로 구성됨
● Stage는 Shuffling이 발생하는 경우 새로 생김

  • stage는 task들의 집합, 하나의 task는 narrow transformation으로 구성

❖ Stage에 속한 task들은 파티션 수 만큼 병렬로 진행

  • task란 가장 작은 실행 유닛. executor의 태스크로 실행된 것이고, 하나의 narrow dependency trasformation이라고 하는 것이다.

● DAG의 형태로 구성된 Task들 존재
● 여기 Task들은 병렬 실행이 가능

❖ Task
● 가장 작은 실행 유닛으로 Executor에 의해 실행됨

위의 코드는 job이 2개다.
왜냐하면 첫번째 header가 true이기 때문에 무엇이 있는지 확인해야 스키마를 확인할 수 있는 것이다.

두번째 job은 show라는 트리거에의해 실행.
stage 간에는 exchange가 있다. 결국 셔플링이 필요한 것이다.

우리가 스키마를 더 잘 파악하기 위해 inferschema option을 넣는 다면, 하나의 job이 더 붙는 것이다.

데이터프레임이 2가지 종류가 있다고 하자. 하나는 작은 byte를 가진 데이터프레임이라면, 최적화를 위해 broadcast join을 사용할 수 있다.

일반 join

broadcast join

join_df = df_large.join(df_small, join_expr, "inner")

join_df = df_large.join(broadcast(df_small), join_expr, "inner")

spark.sql.adaptive.autoBroadcastJoinThreshold 라는 환경변수를 통해 일정 byte 이내라면 broadcast로 진행하도록 한다.

Bucketing 과 Partitioning

입력되는 데이터가 얼마나 최적화된 format으로 있는 가가 처리되는 시간과 리소를 단축할 수 있다.

여기서 partitioning은 spark의 파티션과는 다르다.

◆ Bucketing과 File System Partitioning 소개
❖ 둘다 Hive 메타스토어의 사용이 필요: saveAsTable
❖ 데이터 저장을 이후 반복처리에 최적화된 방법으로 하는 것
이 두 방법 모두 스파크 테이블을 이용.
최적화를 통해서 리소스와 시간 단축하기위해 사용.

Bucketing

● 먼저 Aggregation이나 Window 함수나 JOIN에서 많이 사용되는 컬럼이 있는지?
● 있다면 데이터를 이 특정 컬럼(들)을 기준으로 테이블로 저장
▪ 이 때의 버킷의 수도 지정

이 경우 지정된 버킷의 수만큼 파티션이 만들어지는데, 파티션의 수도 굉장히 중요해짐. join 대상이 되는 두 테이블의 파티션이 같지 않다면 또 셔플링이 필요하기 때문에, bucketing을 하는 것이 큰 의미가 없다.

File System Partitioning

● 원래 Hive에서 많이 사용
● 데이터의 특정 컬럼(들)을 기준으로 폴더 구조를 만들어 데이터 저장 최적화
▪ 위의 컬럼들을 Partition Key라고 부름 이 key는 하나 혹은 그 이상의 컬럼이여도 상관 없다.

Bucketing

이 때 한번 셔플링이 생기긴함. 하지만 이 셔플링을 통해 많은 셔플링을 막을 수 있다면 더 효율적임.

bucketBy(3,"id")

버킷의 수 만큼 hdfs에 저장

❖ DataFrame을 특정 ID를 기준으로 나눠서 테이블로 저장
● 다음부터는 이를 로딩하여 사용함으로써 반복 처리시 시간 단축
▪ DataFrameWriter의 bucketBy 함수 사용
• Bucket의 수와 기준 ID 지정
● 데이터의 특성을 잘 알고 있는 경우 사용 가능

File System Partitioning

❖ 데이터를 Partition Key 기반 폴더 (“Partition") 구조로 물리적으로 나눠 저장
● DataFrame에서 이야기하는 Partition xxx
● Hive에서 사용하는 Partitioning을 말함
❖ Partitioning의 예와 잇점
● 굉장히 큰 로그 파일을 데이터 생성시간 기반으로 데이터 읽기를 많이 한다면?
1. 데이터 자체를 연도-월-일의 폴더 구조로 저장
2. 보통 위의 구조로 이미 저장되는 경우가 많음
● 이를 통해 데이터를 읽기 과정을 최적화 (스캐닝 과정이 줄어들거나 없어짐)
● 데이터 관리도 쉬워짐 (Retention Policy 적용시)
❖ DataFrameWriter의 partitionBy 사용
● Partition key를 잘못 선택하면 엄청나게 많은 파일들이 생성됨!
파티션 -> key에 맞게 디렉토리가 생성되기 때문에 만약 많이 생기면 오버헤드

  • 그래서 cardinality라고 함.

만약 연도 월 일 시간 컬럼이 있다면, partition by로 저장하게 된다면 4개의 파티션 키가 있다.

정리

내가 생각하는 버켓팅과 파티셔닝의 차이

버켓팅

  • 버켓 id로 지정한 컬럼을 기준으로 레코드를 쪼개서 parquet으로 저장
    파티셔닝
  • 애초에 컬럼의 레코드의 값으로 ex)연도로 디렉토리를 생성해서 parquet 넣기

버켓팅을 제대로 하게 되면 셔플링이 생기지 않음. 굳이 다른 것을 찾아볼 필요가 없기 때문에? -> 이부분은 봐야할 듯.

bucketing


exchange가 있다는 것은 셔플링이 존재했다는 것

profile
군도리

0개의 댓글

Powered by GraphCDN, the GraphQL CDN