요약
- spark DataFrame의 orderBy()는 정렬 기능 수행 (sql의 orderby, pandas의 sort_values와 유사)
- orderBy() 결과는 데이터프레임으로 반환.
- 정렬 컬럼은 문자열, 또는 컬럼 형태로 입력할 수 있으며, 정렬 컬럼이 여러개일 경우 개별 컬럼을 인자로 넣거나 list로도 넣을 수 있음.
- ascending = True / False로 정렬 방식 구분
- 정렬 컬럼이 여러개 일때 개별 컬럼별로 서로 다른 정렬 옵션을 적용할 경우 -> ascending=[True, False]와 같은 형태로 이용.
from pyspark.sql.functions import col
# orderBy에 컬럼명을 문자열로 지정.
# select * from titanic_sdf order by Name desc
print("orderBy에 컬럼명을 문자열로 지정하고 내림 차순 정렬")
titanic_sdf.orderBy("Name", ascending=False).show()
# orderBy에 컬럼명을 컬럼형태로 지정.
# select * from titanic_sdf order by Name asc
print("orderBy에 컬럼명을 DataFrame['컬럼명'] 컬럼형태로 오름 차순 정렬")
titanic_sdf.orderBy(titanic_sdf['Name'], ascending=True).show()
print('orderBy에 컬럼명을 DataFrame.컬럼명 컬럼형태로 내림 차순 정렬')
titanic_sdf.orderBy(titanic_sdf.Name, ascending=False).show()
print("orderBy에 컬럼명을 col('컬럼명') 컬럼형태로 오름 차순 정렬")
titanic_sdf.orderBy(col('Name'), ascending=True).show()
from pyspark.sql.functions import col
print("orderBy에 여러개의 컬럼명을 문자열로 지정하고 내림 차순 정렬")
# select * from titanic_sdf order by Pclass desc, Name desc
titanic_sdf.orderBy("Pclass", "Name", ascending=False).show()
titanic_sdf.orderBy(["Pclass", "Name"], ascending=False).show()
print("orderBy에 여러개의 컬럼명을 컬럼형태로 지정하고 내림 차순 정렬")
# select * from titanic_sdf order by Pclass asc, Name desc
titanic_sdf.orderBy(col("Pclass"), col("Name"), ascending=False).show()
# orderBy에 여러개의 컬럼명을 지정하고 서로 다른 방식으로 정렬하기
from pyspark.sql.functions import col
print("orderBy에 여러개의 컬럼명을 문자열로 지정하고 서로 다른 방식으로 정렬 ")
titanic_sdf.orderBy('Pclass', 'Name', ascending=[True, False]).show()
print("orderBy에 여러개의 컬럼명을 컬럼형태로 지정하고 서로 다른 방식으로 정렬 ")
titanic_sdf.orderBy(col('Pclass'), col('Name'), ascending=[True, False]).show()
# 개별 컬럼별로 asc(), desc()를 적용.
# select * from titanic_sdf order by Pclass asc, Name desc
titanic_sdf.orderBy(col('Pclass').asc(), col('Name').desc()).show()
# orderBy()와 동일한 메소드로 sort()를 제공.
titanic_sdf.sort(col('Pclass').asc(), col('Name').desc()).show()
# select Pclass, Name from titanic_sdf order by Pclass asc, Name desc
titanic_sdf.select(col('Pclass'), col('Name'))
.orderBy(col('Pclass').asc(), col('Name').desc()).show()
#select Pclass, Name from (select * from titanic_sdf order by Pclass asc, Name desc)
titanic_sdf.orderBy(col('Pclass').asc(), col('Name').desc())
.select(col('Pclass'), col('Name')).show() #
요약
- pandas DataFrame은 DataFrame 객체에서 aggregation 메소드를 많이 가지고 있음
- pandas DataFrame은 aggregation 메소드를 적용 시 전체 컬럼들에 모두 agg 적용
- spark DataFrame은 DataFrame 객체에서 aggregation 메소드를 별로 가지고 있지 않음
- spark DataFrame에 aggregation 메소드를 적용 시
- pyspark.sql.functions 모듈의 max, min, sum 등의 함수를 이용해야함
print('#### pandas dataframe count() aggregation ####')
print(titanic_pdf.count())
print('#### pandas dataframe max() aggregation ####')
print(titanic_pdf.max())
print('#### pandas dataframe count() aggregation 결과 type ####')
print(type(titanic_pdf.count()))
# 하지만 count()를 제외한 agg 함수를 DataFrame에 적용하면 오류 발생
# 다른 aggregation 함수들은 어떤 컬럼을 aggregation 할지 명시해줘야 함.
# count()외의 다른 agg 함수, max,min 등은 pyspark.sql.functions 모듈에 구현
from pyspark.sql.functions import max, sum, min
# spark DataFrame에 count()를 제외하고 max(), min(), sum(), avg()와 같은
# aggregate 메소드를 바로 호출할 수 없으며, select()메소드 내에서 호출되어야 함.
# select max(Age) from titanic_sdf
titanic_sdf_max = titanic_sdf.select(max('Age'))
print(titanic_sdf_max.show())
# max() aggregation은 단 한개의 값을 반환하지만 DataFrame으로 반환.
print(type(titanic_sdf_max))