log 1 - pyspark 코드 (기본)

Minseon Kim·2022년 7월 13일
0
post-thumbnail

해당 글은 pyspark로 코드를 작성하게 될 때, 필요한 코드가 생각나지 않을 때 보기 위한 자료이다.

Load orc file

import pyspark.sql.functions as F

path = "path/data.orc"
data  = spark.read.orc(path)

Load csv file

path = "path/data.csv"
data  = spark.read.csv(path)

select, create, filter columns

data_1 = data.filter(
	F.col(column1) == value # 미리 필요한 데이터만 들고오기 
).select(
	column1, 
    column2, 
    column3...
).withColumn(
	column4, ....
)
  • filter를 할 때 여러 조건이 들어가게 되면, 각 조건을 괄호로 묶어줘야 한다.
  • filter말고 where로 해도 된다
    * filter Vs where

groupBy, agg data

group_key = [key1, key2, key3]

data_1 = data.groupBy(
	group_key
).agg(
	F.sum(column_1), # 총합
    F.countDistinct(column_2), # unique한 개수만 세기
    F.count(column_3), # 전체 개수 세기
    F.mean(column_4), # 평균값
    F.stddev(column_5), # 표준편차
    F.min(F.col('column_6')).alias('min'), # 최솟값
    F.max(F.col('column_7')).alias('max'), # 최댓값
    F.expr('percentile(column_8, array(0.01))')[0].alias('1%'), # 백분위수 1%
    F.expr('percentile(column_9, array(0.25))')[0].alias('25%'), # 백분위수 25%
    F.expr('percentile(column_10,array(0.5))')[0].alias('50%'), # 백분위수 50%
    F.expr('percentile(column_11, array(0.75))')[0].alias('75%'), # 백분위수 75%
    F.expr('percentile(column_12, array(0.99))')[0].alias('99%'), # 백분위수 99%
)
  • expr을 활용하면 string형태로 쿼리를 직접 날릴 수 있다.
  • percentile은 array형태로 값을 return한다.
  • 단순히 aggregate를 하면 해당 열의 이름이 아주 길게 나오기 때문에 alias로 별칭을 줘야 보기 편하다.
  • 위의 식은 아래와 같이 써질 수 있다.
group_key = [key1, key2, key3]
agg_columns = [
	F.sum(column_1), # 총합
    F.countDistinct(column_2), # unique한 개수만 세기
    F.count(column_3), # 전체 개수 세기
    F.mean(column_4), # 평균값
    F.stddev(column_5), # 표준편차
    F.min(F.col('column_6')).alias('min'), # 최솟값
    F.max(F.col('column_7')).alias('max'), # 최댓값
    F.expr('percentile(column_8, array(0.01))')[0].alias('1%'), # 백분위수 1%
    F.expr('percentile(column_9, array(0.25))')[0].alias('25%'), # 백분위수 25%
    F.expr('percentile(column_10,array(0.5))')[0].alias('50%'), # 백분위수 50%
    F.expr('percentile(column_11, array(0.75))')[0].alias('75%'), # 백분위수 75%
    F.expr('percentile(column_12, array(0.99))')[0].alias('99%'), # 백분위수 99%
]

data_1 = data.groupBy(
	group_key
).agg(
	*agg_columns
)

이렇게 group하지 않고 단순히 데이터의 기초 통계를 보고 싶은 것이라면 아래와 같이 하면 된다.

data.summary("mean" , "stddev", "min", "1%","25%", "50%","75%", "99%", "max").show()

sort data

data_1 = data.orderBy(column_1)
  • 마치 sql과 비슷하다. 참고로 python pandas의 data frame을 사용할 때에는 sort_values()를 사용한다.

join data

data_2 = data_1.join(data_2, [join_key], how = 'left')
  • 조인 방식에는 여러가지가 있다.
    • right
    • anti
    • inner
    • left
  • 만약 조인하려는 데이터의 키들의 이름이 서로 다르다면 아래와 같이 해준다
data_2 = data_1.join(data_2, [data_1.join_key1 == data_2.join_key2], how = 'left')

OR

data_2.withColumn(join_key1, F.col(join_key2)) # 조인하려는 키와 똑같은 이름의 키를 만들어주기
data_2 = data_1.join(data_2, [join_key], how = 'left')
profile
1에서 10을 만들고 싶은 데이터 분석가

0개의 댓글