DataFrame API | SQL Query | 기능 |
---|---|---|
select | SELECT | 선택한 컬럼을 반환 |
count | COUNT | row의 갯수를 반환 |
filter, where | WHERE | 조건을 걸어 필터링 |
isNull | IS NULL | Null 값일 경우 True 반환 |
alias | AS (생략 가능) | 컬럼의 이름을 별칭으로 변환해 반환 |
when | CASE WHEN | 조건에 따라 다른 값을 반환 |
drop | X | 컬럼 삭제 |
withColumn | X | 컬럼을 특정 변수로 추가 |
collect_set, collect_list | X | 특정 조건에 맞는 값들의 set |
size | array 열의 길이를 반환 | |
to_date | TO_DATE | 컬럼을 날짜 형식으로 변환 |
cast | CAST | 컬럼의 데이터 타입 변환 |
SQL Query를 사용하기 위해서는 temp view로 데이터프레임을 지정해줘야한다.
df.createOrReplaceTempView('sql_table')
select
from pyspark.sql import functions as F
from pyspark.sql import Window as W
df.select("Date", "StoreName").show()
# 결과값:
# +----------+--------------------+
# | Date| StoreName|
# +----------+--------------------+
# |07/17/2012|HY-VEE FOOD STORE...|
# |05/29/2014| SAHOTA FOOD MART|
# |06/19/2012|HY-VEE FOOD STORE...|
# +----------+--------------------+
spark.sql('''
select Date, StoreName
from sql_table
''').show()
# 결과값:
# +----------+--------------------+
# | Date| StoreName|
# +----------+--------------------+
# |07/17/2012|HY-VEE FOOD STORE...|
# |05/29/2014| SAHOTA FOOD MART|
# |06/19/2012|HY-VEE FOOD STORE...|
# +----------+--------------------+
count
DataFrame API와 SQL Query 모두 count
라는 함수를 쓰지만 두 함수의 차이점이 있다.
DataFrame API: Action 함수이므로 항상 return 값이 DataFrame의 형태로 나오는 것이 아닌 정수값을 반환
SQL Query: DataFrame 형태를 반환
df.count()
# 결과값:
# 26160915
spark.sql('''
select count(*)
from sql_table
''').show()
# 결과값:
# +--------+
# |count(1)|
# +--------+
# |26160915|
# +--------+
filter
/where
df.filter(F.col("StoreName") == "URBAN LIQUOR").select("StoreName").show()
# 또는
df.where(F.col("StoreName") == "URBAN LIQUOR").select("StoreName").show()
# 결과값:
# +------------+
# | StoreName|
# +------------+
# |URBAN LIQUOR|
# |URBAN LIQUOR|
# |URBAN LIQUOR|
# +------------+
spark.sql('''
select StoreName
from sql_table
where StoreName = 'URBAN LIQUOR'
''').show()
# 결과값:
# +------------+
# | StoreName|
# +------------+
# |URBAN LIQUOR|
# |URBAN LIQUOR|
# |URBAN LIQUOR|
# +------------+
isNull
df.filter(F.col("City").isNull()).select("StoreName", "City").show()
# 결과값:
# +--------------------+----+
# | StoreName|City|
# +--------------------+----+
# | THE MUSIC STATION|null|
# |HY-VEE FOOD STORE...|null|
# |QUICK SHOP / CLEA...|null|
# +--------------------+----+
spark.sql('''
select StoreName, City
from sql_table
where City IS NULL
''').show()
# 결과값:
# +--------------------+----+
# | StoreName|City|
# +--------------------+----+
# | THE MUSIC STATION|null|
# |HY-VEE FOOD STORE...|null|
# |QUICK SHOP / CLEA...|null|
# +--------------------+----+
alias
df.filter(F.col("City").isNull()).select("StoreName", F.col("City").alias("City_null")).show()
# 결과값:
# +--------------------+---------+
# | StoreName|City_null|
# +--------------------+---------+
# | THE MUSIC STATION| null|
# |HY-VEE FOOD STORE...| null|
# |QUICK SHOP / CLEA...| null|
# +--------------------+---------+
spark.sql('''
select StoreName, City as City_null
from sql_table
where City IS NULL
''').show()
# 또는
spark.sql('''
select StoreName, City City_null # as (별칭) 생략
from sql_table
where City IS NULL
''').show()
# 결과값:
# +--------------------+---------+
# | StoreName|City_null|
# +--------------------+---------+
# | THE MUSIC STATION| null|
# |HY-VEE FOOD STORE...| null|
# |QUICK SHOP / CLEA...| null|
# +--------------------+---------+
when
df.select(
"Pack",
F.when(
F.col("Pack") > 12, "over bulk"
).otherwise("unit").alias("Pack_bin")
).show()
# 결과값:
# +----+---------+
# |Pack| Pack_bin|
# +----+---------+
# | 10| unit|
# | 48|over bulk|
# | 6| unit|
# +----+---------+
spark.sql('''
select Pack,
CASE WHEN Pack > 12 THEN 'over bulk' ELSE 'unit'
END AS Pack_bin
from sql_table
''').show()
# 결과값:
# +----+---------+
# |Pack| Pack_bin|
# +----+---------+
# | 10| unit|
# | 48|over bulk|
# | 6| unit|
# +----+---------+
drop
DataFrame API은 drop 함수를 사용하지만 SQL Query는 drop 함수가 없다.
DataFrame API:
SQL Query:
*
사용withColumn
DataFrame API: withColumn
SQL Query: X
collect_set
, collect_list
DataFrame API: collect_set
, collect_list
SQL Qeury: X
(df.withColumn("CategoryName_cnt", F.size(F.collect_set("CategoryName").over(W.partitionBy("Category"))))
.filter(F.col("CategoryName_cnt") >= 2)
.select("CategoryName", "Category", "CategoryName_cnt")
).show()
# 결과값:
# +--------------------+--------+----------------+
# | CategoryName|Category|CategoryName_cnt|
# +--------------------+--------+----------------+
# |STRAIGHT RYE WHIS...| 1011500| 2|
# |STRAIGHT RYE WHIS...| 1011500| 2|
# |STRAIGHT RYE WHIS...| 1011500| 2|
# +--------------------+--------+----------------+
spark.sql('''
WITH new_sql_table AS (
SELECT CategoryName, Category, max(CategoryName_dense) over (partition by Category) AS CategoryName_cnt
FROM (
SELECT CategoryName, Category,
dense_rank() OVER (PARTITION BY Category order by CategoryName) AS CategoryName_dense
FROM sql_table2
) as t
)
SELECT CategoryName, Category, CategoryName_cnt
FROM
new_sql_table
WHERE CategoryName_cnt >= 2
''').show()
# 결과값:
+--------------------+--------+----------------+
# | CategoryName|Category|CategoryName_cnt|
# +--------------------+--------+----------------+
# |BOTTLED IN BOND B...| 1011500| 2|
# |BOTTLED IN BOND B...| 1011500| 2|
# |BOTTLED IN BOND B...| 1011500| 2|
# +--------------------+--------+----------------+
to_date
df.withColumn("Date", F.to_date(F.col("Date"), 'MM/dd/yyyy')).select("Date").show()
# 결과값:
# +----------+
# | Date|
# +----------+
# |2012-07-17|
# |2014-05-29|
# |2012-06-19|
# +----------+
spark.sql('''
SELECT TO_DATE(Date, 'MM/dd/yyyy') Date
FROM sql_table2
''').show()
# 결과값:
# +----------+
# | Date|
# +----------+
# |2012-02-09|
# |2013-10-30|
# |2012-11-20|
# +----------+
cast
# 실수형인 "StoreNumber"를 문자형으로 변경
df.select(F.col("StoreNumber").cast("String"))
# 결과값:
# DataFrame[StoreNumber: string]
spark.sql('''
SELECT CAST(StoreNumber as varchar(30)) -- varchar: 가변의 문자열 타입
FROM sql_table2
''').show()
# 결과값:
# +-----------+
# |StoreNumber|
# +-----------+
# | 4076|
# | 3721|
# | 3869|
# +-----------+