해당 글은 pyspark로 코드를 작성하게 될 때, 필요한 코드가 생각나지 않을 때 보기 위한 자료이다.
import pyspark.sql.functions as F
path = "path/data.orc"
data = spark.read.orc(path)
path = "path/data.csv"
data = spark.read.csv(path)
data_1 = data.filter(
F.col(column1) == value # 미리 필요한 데이터만 들고오기
).select(
column1,
column2,
column3...
).withColumn(
column4, ....
)
group_key = [key1, key2, key3]
data_1 = data.groupBy(
group_key
).agg(
F.sum(column_1), # 총합
F.countDistinct(column_2), # unique한 개수만 세기
F.count(column_3), # 전체 개수 세기
F.mean(column_4), # 평균값
F.stddev(column_5), # 표준편차
F.min(F.col('column_6')).alias('min'), # 최솟값
F.max(F.col('column_7')).alias('max'), # 최댓값
F.expr('percentile(column_8, array(0.01))')[0].alias('1%'), # 백분위수 1%
F.expr('percentile(column_9, array(0.25))')[0].alias('25%'), # 백분위수 25%
F.expr('percentile(column_10,array(0.5))')[0].alias('50%'), # 백분위수 50%
F.expr('percentile(column_11, array(0.75))')[0].alias('75%'), # 백분위수 75%
F.expr('percentile(column_12, array(0.99))')[0].alias('99%'), # 백분위수 99%
)
group_key = [key1, key2, key3]
agg_columns = [
F.sum(column_1), # 총합
F.countDistinct(column_2), # unique한 개수만 세기
F.count(column_3), # 전체 개수 세기
F.mean(column_4), # 평균값
F.stddev(column_5), # 표준편차
F.min(F.col('column_6')).alias('min'), # 최솟값
F.max(F.col('column_7')).alias('max'), # 최댓값
F.expr('percentile(column_8, array(0.01))')[0].alias('1%'), # 백분위수 1%
F.expr('percentile(column_9, array(0.25))')[0].alias('25%'), # 백분위수 25%
F.expr('percentile(column_10,array(0.5))')[0].alias('50%'), # 백분위수 50%
F.expr('percentile(column_11, array(0.75))')[0].alias('75%'), # 백분위수 75%
F.expr('percentile(column_12, array(0.99))')[0].alias('99%'), # 백분위수 99%
]
data_1 = data.groupBy(
group_key
).agg(
*agg_columns
)
이렇게 group하지 않고 단순히 데이터의 기초 통계를 보고 싶은 것이라면 아래와 같이 하면 된다.
data.summary("mean" , "stddev", "min", "1%","25%", "50%","75%", "99%", "max").show()
data_1 = data.orderBy(column_1)
data_2 = data_1.join(data_2, [join_key], how = 'left')
data_2 = data_1.join(data_2, [data_1.join_key1 == data_2.join_key2], how = 'left')
OR
data_2.withColumn(join_key1, F.col(join_key2)) # 조인하려는 키와 똑같은 이름의 키를 만들어주기
data_2 = data_1.join(data_2, [join_key], how = 'left')