Appendix. PySpark 함수 vs SQL Query

dpwl·2024년 6월 15일
0

Data Analysis with SQL

목록 보기
106/120

1. PySpark 함수 vs SQL Query

DataFrame APISQL Query기능
selectSELECT선택한 컬럼을 반환
countCOUNTrow의 갯수를 반환
filter, whereWHERE조건을 걸어 필터링
isNullIS NULLNull 값일 경우 True 반환
aliasAS (생략 가능)컬럼의 이름을 별칭으로 변환해 반환
whenCASE WHEN조건에 따라 다른 값을 반환
dropX컬럼 삭제
withColumnX컬럼을 특정 변수로 추가
collect_set, collect_listX특정 조건에 맞는 값들의 set
sizearray 열의 길이를 반환
to_dateTO_DATE컬럼을 날짜 형식으로 변환
castCAST컬럼의 데이터 타입 변환

SQL Query를 사용하기 위해서는 temp view로 데이터프레임을 지정해줘야한다.

df.createOrReplaceTempView('sql_table')

1.1 select

from pyspark.sql import functions as F
from pyspark.sql import Window as W

df.select("Date", "StoreName").show()

# 결과값:
# +----------+--------------------+
# |      Date|           StoreName|
# +----------+--------------------+
# |07/17/2012|HY-VEE FOOD STORE...|
# |05/29/2014|    SAHOTA FOOD MART|
# |06/19/2012|HY-VEE FOOD STORE...|
# +----------+--------------------+
spark.sql('''
select Date, StoreName
from sql_table
''').show()

# 결과값:
# +----------+--------------------+
# |      Date|           StoreName|
# +----------+--------------------+
# |07/17/2012|HY-VEE FOOD STORE...|
# |05/29/2014|    SAHOTA FOOD MART|
# |06/19/2012|HY-VEE FOOD STORE...|
# +----------+--------------------+

1.2 count

DataFrame API와 SQL Query 모두 count라는 함수를 쓰지만 두 함수의 차이점이 있다.

  • DataFrame API: Action 함수이므로 항상 return 값이 DataFrame의 형태로 나오는 것이 아닌 정수값을 반환

  • SQL Query: DataFrame 형태를 반환

df.count()

# 결과값:
# 26160915
spark.sql('''
select count(*)
from sql_table
''').show()

# 결과값:
# +--------+
# |count(1)|
# +--------+
# |26160915|
# +--------+

1.3 filter/where

df.filter(F.col("StoreName") == "URBAN LIQUOR").select("StoreName").show()

# 또는

df.where(F.col("StoreName") == "URBAN LIQUOR").select("StoreName").show()

# 결과값:
# +------------+
# |   StoreName|
# +------------+
# |URBAN LIQUOR|
# |URBAN LIQUOR|
# |URBAN LIQUOR|
# +------------+
spark.sql('''
select StoreName
from sql_table
where StoreName = 'URBAN LIQUOR'
''').show()

# 결과값:
# +------------+
# |   StoreName|
# +------------+
# |URBAN LIQUOR|
# |URBAN LIQUOR|
# |URBAN LIQUOR|
# +------------+

1.4 isNull

df.filter(F.col("City").isNull()).select("StoreName", "City").show()

# 결과값:
# +--------------------+----+
# |           StoreName|City|
# +--------------------+----+
# |   THE MUSIC STATION|null|
# |HY-VEE FOOD STORE...|null|
# |QUICK SHOP / CLEA...|null|
# +--------------------+----+
spark.sql('''
select StoreName, City
from sql_table
where City IS NULL
''').show()

# 결과값:
# +--------------------+----+
# |           StoreName|City|
# +--------------------+----+
# |   THE MUSIC STATION|null|
# |HY-VEE FOOD STORE...|null|
# |QUICK SHOP / CLEA...|null|
# +--------------------+----+

1.5 alias

df.filter(F.col("City").isNull()).select("StoreName", F.col("City").alias("City_null")).show()

# 결과값:
# +--------------------+---------+
# |           StoreName|City_null|
# +--------------------+---------+
# |   THE MUSIC STATION|     null|
# |HY-VEE FOOD STORE...|     null|
# |QUICK SHOP / CLEA...|     null|
# +--------------------+---------+
spark.sql('''
select StoreName, City as City_null
from sql_table
where City IS NULL
''').show()

# 또는

spark.sql('''
select StoreName, City City_null	# as (별칭) 생략
from sql_table
where City IS NULL
''').show()

# 결과값:
# +--------------------+---------+
# |           StoreName|City_null|
# +--------------------+---------+
# |   THE MUSIC STATION|     null|
# |HY-VEE FOOD STORE...|     null|
# |QUICK SHOP / CLEA...|     null|
# +--------------------+---------+

1.6 when

df.select(
    "Pack",
    F.when(
        F.col("Pack") > 12, "over bulk"
    ).otherwise("unit").alias("Pack_bin")
).show()

# 결과값:
# +----+---------+
# |Pack| Pack_bin|
# +----+---------+
# |  10|     unit|
# |  48|over bulk|
# |   6|     unit|
# +----+---------+
spark.sql('''
select Pack,
       CASE WHEN Pack > 12 THEN 'over bulk' ELSE 'unit'
       END AS Pack_bin
from sql_table
''').show()

# 결과값:
# +----+---------+
# |Pack| Pack_bin|
# +----+---------+
# |  10|     unit|
# |  48|over bulk|
# |   6|     unit|
# +----+---------+

1.7 drop

DataFrame API은 drop 함수를 사용하지만 SQL Query는 drop 함수가 없다.

DataFrame API:

  • PySpark의 output은 또 다른 DataFrame 객체를 반환
  • select를 사용하지 않으면, 기본적으로 기본 table에서의 모든 column을 반환

SQL Query:

  • 항상 SELECT를 지정
  • 즉, SELECT에서 제외되면 자동으로 drop
  • 모든 column을 반환하고 싶다면 * 사용

1.8 withColumn

DataFrame API: withColumn

SQL Query: X

  • 기본적으로 원래 테이블의 구조를 변경하지 않음
  • SELECT에 새로운 계산으로 만들어진 컬럼을 반환하는 방식으로 진행

1.9 collect_set, collect_list

DataFrame API: collect_set, collect_list

SQL Qeury: X

  • SQL table은 array 형태의 column을 지원하지 않음
(df.withColumn("CategoryName_cnt", F.size(F.collect_set("CategoryName").over(W.partitionBy("Category"))))
  .filter(F.col("CategoryName_cnt") >= 2)
  .select("CategoryName", "Category", "CategoryName_cnt")
).show()

# 결과값:
# +--------------------+--------+----------------+
# |        CategoryName|Category|CategoryName_cnt|
# +--------------------+--------+----------------+
# |STRAIGHT RYE WHIS...| 1011500|               2|
# |STRAIGHT RYE WHIS...| 1011500|               2|
# |STRAIGHT RYE WHIS...| 1011500|               2|
# +--------------------+--------+----------------+
spark.sql('''
WITH new_sql_table AS (
  SELECT CategoryName, Category, max(CategoryName_dense) over (partition by Category) AS CategoryName_cnt
    FROM (
        SELECT CategoryName, Category,
              dense_rank() OVER (PARTITION BY Category order by CategoryName) AS CategoryName_dense
        FROM sql_table2
    ) as t
)
SELECT CategoryName, Category, CategoryName_cnt
FROM
  new_sql_table
WHERE CategoryName_cnt >= 2
''').show()

# 결과값:
+--------------------+--------+----------------+
# |        CategoryName|Category|CategoryName_cnt|
# +--------------------+--------+----------------+
# |BOTTLED IN BOND B...| 1011500|               2|
# |BOTTLED IN BOND B...| 1011500|               2|
# |BOTTLED IN BOND B...| 1011500|               2|
# +--------------------+--------+----------------+

1.10 to_date

df.withColumn("Date", F.to_date(F.col("Date"), 'MM/dd/yyyy')).select("Date").show()

# 결과값:
# +----------+
# |      Date|
# +----------+
# |2012-07-17|
# |2014-05-29|
# |2012-06-19|
# +----------+
spark.sql('''
  SELECT TO_DATE(Date, 'MM/dd/yyyy') Date
  FROM sql_table2
''').show()

# 결과값:
# +----------+
# |      Date|
# +----------+
# |2012-02-09|
# |2013-10-30|
# |2012-11-20|
# +----------+

1.11 cast

# 실수형인 "StoreNumber"를 문자형으로 변경
df.select(F.col("StoreNumber").cast("String"))

# 결과값:
# DataFrame[StoreNumber: string]
spark.sql('''
  SELECT CAST(StoreNumber as varchar(30)) -- varchar: 가변의 문자열 타입
  FROM sql_table2
''').show()

# 결과값:
# +-----------+
# |StoreNumber|
# +-----------+
# |       4076|
# |       3721|
# |       3869|
# +-----------+
profile
거북선통통통통

0개의 댓글