from pyspark.sql import functions as F
path='/FileStore/tables/all/*.csv'
df = spark.read.format('csv').option('header','true').option('inferSchema', 'true').load(path).coalesce(5)
df.cache()
df.createOrReplaceTempView('dfTable')
df.count()
Out[5]: 541909
- count 메서드는 트랜스포메이션이 아닌 액션이라 전체 크기를 알아보는 용도가 아닌 캐싱 작업을 수행하는 용도로 사용되기도 한다.
집계 함수
count
df.select(F.count('StockCode')).show()
+----------------+
count(StockCode)|
+----------------+
541909|
+----------------+
- count(*) 구문은 null값을 가진 로우를 포함
- count함수에 특정 컬럼을 지정하면 null값 포함 X
countDistinct
df.select(F.countDistinct('StockCode')).show()
+-------------------------+
count(DISTINCT StockCode)|
+-------------------------+
4070|
+-------------------------+
approx_count_distinct
- 빅데이터를 사용해 연산을 수행하는 경우 질문에 대한 정확한 답을 얻기 위해서는 연산, 네트워크, 저장소 등 상당한 비용이 들 수밖에 없다.
- 따라서 수용 가능한 정도의 정확도에 맞춰 근사치를 계산하는 것이 비용을 고려했을 때 효율적
df.select(F.approx_count_distinct('StockCode',0.1)).show()
+--------------------------------+
approx_count_distinct(StockCode)|
+--------------------------------+
3364|
+--------------------------------+
- 0.1은 최대 추정 오류율(maximum estimation error)
- 위 예제에선 큰 오류율을 설정했기에 기대치에서 크게 벗어나는 결과를 얻었지만, countDistinct보다 빠르게 결과 반환
- 해당 예제에선 2.80초 -> 1.39초
- 대규모 데이터셋을 사용할 때 훨씬 더 성능이 좋아짐
first와 last
df.select(F.first('StockCode'), F.last('StockCode')).show()
+-----------------------+----------------------+
first(StockCode, false)|last(StockCode, false)|
+-----------------------+----------------------+
85123A| 22138|
+-----------------------+----------------------+
min과 max
df.select(F.min('Quantity'), F.max('Quantity')).show()
+-------------+-------------+
min(Quantity)|max(Quantity)|
+-------------+-------------+
-80995| 80995|
+-------------+-------------+
sum
df.select(F.sum('Quantity')).show()
sumDistinct
df.select(F.sumDistinct('Quantity')).show()
avg
df.select(F.count('Quantity'), F.sum('Quantity'), F.avg('Quantity'), F.mean('Quantity')).show()
분산과 표준편차
- 스파크는 표본표준편차뿐만 아니라 모표준편차방식도 지원
- 모표준분산, 모표준편차: var_pop / stddev_pop
df.select(F.var_pop('Quantity'), F.var_samp('Quantity'),\
F.stddev_pop('Quantity'), F.stddev_samp('Quantity')).show()
비대칭도와 첨도
- 비대칭도와 첨도 모두 데이터의 변곡점을 측정하는 방법
- 비대칭도는 데이터 평균의 비대칭 정도를 측정
- 첨도는 데이터 끝 부분을 측정
df.select(F.skewness('Quantity'), F.kurtosis('Quantity')).show()
공분산과 상관관계
df.select(F.corr('InvoiceNo', 'Quantity'), F.covar_samp('InvoiceNo', 'Quantity'), F.covar_pop('InvoiceNo', 'Quantity')).show()
복합 데이터 타입의 집계
df.agg(F.collect_list('Country'), F.collect_set('country')).show()
그룹화
- 그룹화 작업은 두 단계로 이뤄짐
- 하나 이상의 컬럼 그룹화
- 집계 연산 수행
- 첫 번째 단계에서는 RelationalGroupedDataset이 반환
- 두 번째 단계에서는 DataFrame이 반환
표현식을 이용한 그룹화
- count함수는 select구문에 표현식으로 지정하기보다 agg메서드를 사용하는 것이 좋음
- agg메서드는 여러 집계 처리를 한 번에 지정할 수 있음
df.groupBy('InvoiceNo').agg(
F.count('Quantity').alias('quan'),
F.expr('count(Quantity)')
).show(5)
df.groupBy('InvoiceNo').agg(F.expr('avg(Quantity)'),F.expr('stddev_pop(Quantity)')).show(5)
윈도우 함수
- 데이터의 특정 윈도우를 대상으로 고유의 집계 연산 수행
- 윈도우 명세는 함수에 전달될 로우를 결정하는 것
- group-by함수와의 차이점
- group-by함수를 사용하면 모든 로우 레코드가 단일 그룹으로만 이동
- 윈도우 함수는 프레임에 입력되는 모든 로우에 대해 결괏값을 계산
dfWithDate = df.withColumn("date", F.to_date(F.col("invoicedate"), "MM/d/yyyy H:mm"))
dfWithDate.show(5)
dfWithDate.createOrReplaceTempView('dfWithDate')
from pyspark.sql.window import Window
window = Window.partitionBy('customerId','date').orderBy(F.desc('quantity')).rowsBetween(Window.unboundedPreceding, Window.currentRow)
maxPurchaseQuantity = F.max(F.col('quantity')).over(window)
purchaseDenseRank = F.dense_rank().over(window)
purchaseRank = F.rank().over(window)
dfWithDate.where('customerid is not null').orderBy('customerid')\
.select(F.col('customerid'), F.col('date'), F.col('quantity'), maxPurchaseQuantity.alias('maxPurchaseQuantity'), purchaseDenseRank.alias('purchaseDenseRank'), purchaseRank.alias('purchsaeRank')).show(5)
그룹화 셋
- 여러 그룹에 걸쳐 집계할 수 있는 무언가가 필요할 때 그룹화 셋을 이용
- 여러 집계를 결합하는 저수준 기능
- group-by 구문에서 원하는 형태로 집계 생성
dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView('dfNoNull')
롤업
- 그룹별 합계, 전체 합계를 구할 수 있음
- 링크
rollUp=dfNoNull.rollup('date', 'country').agg(F.sum('quantity')).orderBy('date')
rollUp.show(10)
dfNoNull.rollup('country','date').agg(F.sum('quantity')).orderBy('country').show(10)
- 두 컬럼 모두 null인 로우가 두 컬럼에 속한 레코드의 전체 합계
큐브
- 롤업을 고차원적으로 사용할 수 있게함
- 요소들을 계층적으로 다루는 대신, 모든 차원에 대해 동일한 작업 수행
- rollup 함수와는 다르게 인자의 순서가 달라도 결과는 같음
dfNoNull.cube('date','country').agg(F.sum('quantity')).orderBy('date').show(10)
- 그냥 모든 조합에 대한 합계를 구할 수 있다고 보면 될 듯
그룹화 메타데이터
- 집계 수준에 따라 쉽게 필터링하기 위해 집계 수준을 조회하는 경우가 생길 수 있음
dfNoNull.cube('date','country').agg(F.sum('quantity'), F.grouping_id()).orderBy('date').show(10)
dfNoNull.rollup('date','country').agg(F.sum('quantity'), F.grouping_id()).orderBy('date').show(10)
- 0: date(첫 번째 인자)와 country(두 번째 인자)별 조합에 따라 총 수량 제공
- 1: country에 상관없이 date를 기반으로 총 수량 제공
- 2: date에 상관없이 country를 기반으로 총 수량 제공
- 3: country나 date 상관없이 총 수량 제공
피벗
- 로우를 컬럼으로 변환
- 피벗을 통해 컬럼의 모든 값을 단일 그룹화해서 계산 가능
pivoted = dfWithDate.groupBy('date').pivot('country').sum()
pivoted.printSchema()
사용자 정의 집계 함수
- 사용자 정의 집계 함수(user-defined aggregation function, UDAF)는 직접 제작한 함수나 비즈니스 규칙에 기반을 둔 자체 집계 함수를 정의하는 방법
- UDAF를 사용해서 입력 데이터 그룹에 직접 개발한 연산 수행
- 스파크는 입력 데이터의 모든 그룹의 중간 결과를 단일 AggregationBuffer에 저장해서 관리함
- UDAF는 현재 스칼라와 자바로만 사용 가능