Spark 완벽 가이드 ch7. 다양한 데이터 타입 다루기

Q·2023년 1월 17일
0

Spark 완벽 가이드

목록 보기
8/24
from pyspark.sql import functions as F
path='/FileStore/tables/all/*.csv'
df = spark.read.format('csv').option('header','true').option('inferSchema', 'true').load(path).coalesce(5)
df.cache()
df.createOrReplaceTempView('dfTable')
df.count()
Out[5]: 541909
  • count 메서드는 트랜스포메이션이 아닌 액션이라 전체 크기를 알아보는 용도가 아닌 캐싱 작업을 수행하는 용도로 사용되기도 한다.

집계 함수

count

#다음 count 함수는 액션이 아닌 트랜스포메이션
df.select(F.count('StockCode')).show()
+----------------+ 
count(StockCode)| 
+----------------+ 
541909| 
+----------------+
  • count(*) 구문은 null값을 가진 로우를 포함
  • count함수에 특정 컬럼을 지정하면 null값 포함 X

countDistinct

#전체 레코드 수가 아닌 고유 레코드 수를 카운트
df.select(F.countDistinct('StockCode')).show()
+-------------------------+ 
count(DISTINCT StockCode)| 
+-------------------------+ 
4070| 
+-------------------------+

approx_count_distinct

  • 빅데이터를 사용해 연산을 수행하는 경우 질문에 대한 정확한 답을 얻기 위해서는 연산, 네트워크, 저장소 등 상당한 비용이 들 수밖에 없다.
  • 따라서 수용 가능한 정도의 정확도에 맞춰 근사치를 계산하는 것이 비용을 고려했을 때 효율적
#근사치만으로도 유의미하다면 해당 함수를 이용해 근사치 계산
df.select(F.approx_count_distinct('StockCode',0.1)).show()
+--------------------------------+ 
approx_count_distinct(StockCode)| 
+--------------------------------+ 
3364| 
+--------------------------------+
  • 0.1은 최대 추정 오류율(maximum estimation error)
  • 위 예제에선 큰 오류율을 설정했기에 기대치에서 크게 벗어나는 결과를 얻었지만, countDistinct보다 빠르게 결과 반환
    • 해당 예제에선 2.80초 -> 1.39초
    • 대규모 데이터셋을 사용할 때 훨씬 더 성능이 좋아짐

first와 last

#row 기반 동작
df.select(F.first('StockCode'), F.last('StockCode')).show()
+-----------------------+----------------------+ 
first(StockCode, false)|last(StockCode, false)| 
+-----------------------+----------------------+ 
85123A| 22138| 
+-----------------------+----------------------+

min과 max

df.select(F.min('Quantity'), F.max('Quantity')).show()
+-------------+-------------+ 
min(Quantity)|max(Quantity)| 
+-------------+-------------+ 
-80995| 80995| 
+-------------+-------------+

sum

df.select(F.sum('Quantity')).show()

sumDistinct

#고윳값 합산
df.select(F.sumDistinct('Quantity')).show()

avg

df.select(F.count('Quantity'), F.sum('Quantity'), F.avg('Quantity'), F.mean('Quantity')).show()

분산과 표준편차

  • 스파크는 표본표준편차뿐만 아니라 모표준편차방식도 지원
    • 모표준분산, 모표준편차: var_pop / stddev_pop
df.select(F.var_pop('Quantity'), F.var_samp('Quantity'),\
         F.stddev_pop('Quantity'), F.stddev_samp('Quantity')).show()

비대칭도와 첨도

  • 비대칭도와 첨도 모두 데이터의 변곡점을 측정하는 방법
    • 비대칭도는 데이터 평균의 비대칭 정도를 측정
    • 첨도는 데이터 끝 부분을 측정
df.select(F.skewness('Quantity'), F.kurtosis('Quantity')).show()

공분산과 상관관계

df.select(F.corr('InvoiceNo', 'Quantity'), F.covar_samp('InvoiceNo', 'Quantity'), F.covar_pop('InvoiceNo', 'Quantity')).show()

복합 데이터 타입의 집계

df.agg(F.collect_list('Country'), F.collect_set('country')).show()

그룹화

  • 그룹화 작업은 두 단계로 이뤄짐
    1. 하나 이상의 컬럼 그룹화
    2. 집계 연산 수행
  • 첫 번째 단계에서는 RelationalGroupedDataset이 반환
  • 두 번째 단계에서는 DataFrame이 반환

표현식을 이용한 그룹화

  • count함수는 select구문에 표현식으로 지정하기보다 agg메서드를 사용하는 것이 좋음
    • agg메서드는 여러 집계 처리를 한 번에 지정할 수 있음
df.groupBy('InvoiceNo').agg(
  F.count('Quantity').alias('quan'),
  F.expr('count(Quantity)')
).show(5)
df.groupBy('InvoiceNo').agg(F.expr('avg(Quantity)'),F.expr('stddev_pop(Quantity)')).show(5)

윈도우 함수

  • 데이터의 특정 윈도우를 대상으로 고유의 집계 연산 수행
  • 윈도우 명세는 함수에 전달될 로우를 결정하는 것
  • group-by함수와의 차이점
    • group-by함수를 사용하면 모든 로우 레코드가 단일 그룹으로만 이동
    • 윈도우 함수는 프레임에 입력되는 모든 로우에 대해 결괏값을 계산
      • 프레임: 로우 그룹 기반의 테이블
dfWithDate = df.withColumn("date", F.to_date(F.col("invoicedate"), "MM/d/yyyy H:mm"))
dfWithDate.show(5)
dfWithDate.createOrReplaceTempView('dfWithDate')
#1.윈도우 명세 만들기
from pyspark.sql.window import Window
#그룹을 나누는 기준 컬럼, 파티션(그룹)의 정렬 방식, 첫 로우부터 현재 로우까지 확인
window = Window.partitionBy('customerId','date').orderBy(F.desc('quantity')).rowsBetween(Window.unboundedPreceding, Window.currentRow)
#2.집계함수 정의(1에서 정의한 윈도우 명세도 함께 사용)
maxPurchaseQuantity = F.max(F.col('quantity')).over(window)
purchaseDenseRank = F.dense_rank().over(window)
purchaseRank = F.rank().over(window)
dfWithDate.where('customerid is not null').orderBy('customerid')\
.select(F.col('customerid'), F.col('date'), F.col('quantity'), maxPurchaseQuantity.alias('maxPurchaseQuantity'), purchaseDenseRank.alias('purchaseDenseRank'), purchaseRank.alias('purchsaeRank')).show(5)

그룹화 셋

  • 여러 그룹에 걸쳐 집계할 수 있는 무언가가 필요할 때 그룹화 셋을 이용
  • 여러 집계를 결합하는 저수준 기능
  • group-by 구문에서 원하는 형태로 집계 생성
# null값에 따라 집계 수준이 달라짐 따라서 null값을 제거해야함
dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView('dfNoNull')

롤업

  • 그룹별 합계, 전체 합계를 구할 수 있음
  • 링크
rollUp=dfNoNull.rollup('date', 'country').agg(F.sum('quantity')).orderBy('date')
rollUp.show(10)
#책엔 안나오지만 좀 더 찾아본 결과, 인자의 순서의 영향을 받는 듯 하다(위 링크 참고)
dfNoNull.rollup('country','date').agg(F.sum('quantity')).orderBy('country').show(10)
  • 두 컬럼 모두 null인 로우가 두 컬럼에 속한 레코드의 전체 합계

큐브

  • 롤업을 고차원적으로 사용할 수 있게함
  • 요소들을 계층적으로 다루는 대신, 모든 차원에 대해 동일한 작업 수행
  • rollup 함수와는 다르게 인자의 순서가 달라도 결과는 같음
dfNoNull.cube('date','country').agg(F.sum('quantity')).orderBy('date').show(10)
  • 그냥 모든 조합에 대한 합계를 구할 수 있다고 보면 될 듯

그룹화 메타데이터

  • 집계 수준에 따라 쉽게 필터링하기 위해 집계 수준을 조회하는 경우가 생길 수 있음
dfNoNull.cube('date','country').agg(F.sum('quantity'), F.grouping_id()).orderBy('date').show(10)
dfNoNull.rollup('date','country').agg(F.sum('quantity'), F.grouping_id()).orderBy('date').show(10)
  • 0: date(첫 번째 인자)와 country(두 번째 인자)별 조합에 따라 총 수량 제공
  • 1: country에 상관없이 date를 기반으로 총 수량 제공
  • 2: date에 상관없이 country를 기반으로 총 수량 제공
  • 3: country나 date 상관없이 총 수량 제공

피벗

  • 로우를 컬럼으로 변환
  • 피벗을 통해 컬럼의 모든 값을 단일 그룹화해서 계산 가능
pivoted = dfWithDate.groupBy('date').pivot('country').sum()
pivoted.printSchema()

사용자 정의 집계 함수

  • 사용자 정의 집계 함수(user-defined aggregation function, UDAF)는 직접 제작한 함수나 비즈니스 규칙에 기반을 둔 자체 집계 함수를 정의하는 방법
  • UDAF를 사용해서 입력 데이터 그룹에 직접 개발한 연산 수행
  • 스파크는 입력 데이터의 모든 그룹의 중간 결과를 단일 AggregationBuffer에 저장해서 관리함
  • UDAF는 현재 스칼라와 자바로만 사용 가능
profile
Data Engineer

0개의 댓글