[Spark] Spark 데이터프레임 주요 메서드 - (2) orderBy, aggregation

baekdata·2022년 3월 1일
0

Spark

목록 보기
5/8

1. orderBy 메서드

요약

  • spark DataFrame의 orderBy()는 정렬 기능 수행 (sql의 orderby, pandas의 sort_values와 유사)
  • orderBy() 결과는 데이터프레임으로 반환.
  • 정렬 컬럼은 문자열, 또는 컬럼 형태로 입력할 수 있으며, 정렬 컬럼이 여러개일 경우 개별 컬럼을 인자로 넣거나 list로도 넣을 수 있음.
  • ascending = True / False로 정렬 방식 구분
  • 정렬 컬럼이 여러개 일때 개별 컬럼별로 서로 다른 정렬 옵션을 적용할 경우 -> ascending=[True, False]와 같은 형태로 이용.

a. 하나의 컬럼을 기준으로 정렬

from pyspark.sql.functions import col 

# orderBy에 컬럼명을 문자열로 지정. 
# select * from titanic_sdf order by Name desc
print("orderBy에 컬럼명을 문자열로 지정하고 내림 차순 정렬")
titanic_sdf.orderBy("Name", ascending=False).show() 

# orderBy에 컬럼명을 컬럼형태로 지정.
# select * from titanic_sdf order by Name asc
print("orderBy에 컬럼명을 DataFrame['컬럼명'] 컬럼형태로 오름 차순 정렬")
titanic_sdf.orderBy(titanic_sdf['Name'], ascending=True).show() 

print('orderBy에 컬럼명을 DataFrame.컬럼명 컬럼형태로 내림 차순 정렬')
titanic_sdf.orderBy(titanic_sdf.Name, ascending=False).show()

print("orderBy에 컬럼명을 col('컬럼명') 컬럼형태로 오름 차순 정렬")
titanic_sdf.orderBy(col('Name'), ascending=True).show()

b. 여러개의 컬럼을 기준으로 정렬

from pyspark.sql.functions import col

print("orderBy에 여러개의 컬럼명을 문자열로 지정하고 내림 차순 정렬")
# select * from titanic_sdf order by Pclass desc, Name desc
titanic_sdf.orderBy("Pclass", "Name", ascending=False).show() 
titanic_sdf.orderBy(["Pclass", "Name"], ascending=False).show()

print("orderBy에 여러개의 컬럼명을 컬럼형태로 지정하고 내림 차순 정렬")
# select * from titanic_sdf order by Pclass asc, Name desc
titanic_sdf.orderBy(col("Pclass"), col("Name"), ascending=False).show()

c. 여러개의 컬럼을 각각 다른 기준으로 정렬

# orderBy에 여러개의 컬럼명을 지정하고 서로 다른 방식으로 정렬하기
from pyspark.sql.functions import col 

print("orderBy에 여러개의 컬럼명을 문자열로 지정하고 서로 다른 방식으로 정렬 ")
titanic_sdf.orderBy('Pclass', 'Name', ascending=[True, False]).show()

print("orderBy에 여러개의 컬럼명을 컬럼형태로 지정하고 서로 다른 방식으로 정렬 ")
titanic_sdf.orderBy(col('Pclass'), col('Name'), ascending=[True, False]).show()

# 개별 컬럼별로 asc(), desc()를 적용. 
# select * from titanic_sdf order by Pclass asc, Name desc
titanic_sdf.orderBy(col('Pclass').asc(), col('Name').desc()).show() 

# orderBy()와 동일한 메소드로 sort()를 제공. 
titanic_sdf.sort(col('Pclass').asc(), col('Name').desc()).show()

# select Pclass, Name from titanic_sdf order by Pclass asc, Name desc
titanic_sdf.select(col('Pclass'), col('Name'))
		  .orderBy(col('Pclass').asc(), col('Name').desc()).show() 

#select Pclass, Name from (select * from titanic_sdf order by Pclass asc, Name desc)
titanic_sdf.orderBy(col('Pclass').asc(), col('Name').desc())
	       .select(col('Pclass'), col('Name')).show() # 

2. aggregation 메서드

요약

  • pandas DataFrame은 DataFrame 객체에서 aggregation 메소드를 많이 가지고 있음
  • pandas DataFrame은 aggregation 메소드를 적용 시 전체 컬럼들에 모두 agg 적용
  • spark DataFrame은 DataFrame 객체에서 aggregation 메소드를 별로 가지고 있지 않음
  • spark DataFrame에 aggregation 메소드를 적용 시
    • pyspark.sql.functions 모듈의 max, min, sum 등의 함수를 이용해야함

a. Pandas DF에서의 aggregation

print('#### pandas dataframe count() aggregation ####')
print(titanic_pdf.count())

print('#### pandas dataframe max() aggregation ####')
print(titanic_pdf.max())

print('#### pandas dataframe count() aggregation 결과 type ####')
print(type(titanic_pdf.count()))
  • 전체 컬럼들에 모두 aggregation 적용
  • agg 결과는 Series 타입을 가짐

b. Spark DF에서의 aggregation

# 하지만 count()를 제외한 agg 함수를 DataFrame에 적용하면 오류 발생
# 다른 aggregation 함수들은 어떤 컬럼을 aggregation 할지 명시해줘야 함. 
# count()외의 다른 agg 함수, max,min 등은 pyspark.sql.functions 모듈에 구현

from pyspark.sql.functions import max, sum, min

# spark DataFrame에 count()를 제외하고 max(), min(), sum(), avg()와 같은 
# aggregate 메소드를 바로 호출할 수 없으며, select()메소드 내에서 호출되어야 함. 

# select max(Age) from titanic_sdf
titanic_sdf_max = titanic_sdf.select(max('Age')) 
print(titanic_sdf_max.show())

# max() aggregation은 단 한개의 값을 반환하지만 DataFrame으로 반환. 
print(type(titanic_sdf_max)) 
  • agg 결과는 Dataframe 형태로 반환
profile
글쓰는 데이터 분석가

0개의 댓글