D-61-데이터분석-dataframe API vs SQL query

박초화·2024년 3월 5일
0

dataframe api vs sql query

  1. select
df.select("Date", "StoreName").show()
spark.sql('''
select Date, StoreName
from sql_table
''').show()
  1. count
  • dataframe api의 action 함수
    • 항상 return 값이 dataframe의 형태로 나오는 것이 아님
  • sql query의 결과
    • dataframe 형태를 반환
df.count()
26160915
spark.sql('''
select count(*)
from sql_table
''').show()

+--------+
|count(1)|
+--------+
|26160915|
+--------+
  1. filter, where
  • dataframe api: filter, where
  • sql query: where
df.filter(F.col("StoreName") == "URBAN LIQUOR").select("StoreName").show()
spark.sql('''
select StoreName
from sql_table
where StoreName = 'URBAN LIQUOR'
''').show()
  1. isnull
  • dataframe api: isNull
  • sql query: IS NULL
df.filter(F.col("City").isNull()).select("StoreName", "City").show()
spark.sql('''
select StoreName, City
from sql_table
where City IS NULL
''').show()
  1. alias
  • dataframe api: alias
  • sql query: as (생략 가능)
df.filter(F.col("City").isNull()).select("StoreName", F.col("City").alias("City_null")).show()
spark.sql('''
select StoreName, City as City_null
from sql_table
where City IS NULL
''').show()
  1. when
  • dataframe api: when
  • sql query: case when
df.select(
    "Pack",
    F.when(
        F.col("Pack") > 12, "over bulk"
    ).otherwise("unit").alias("Pack_bin")
).show()
spark.sql('''
select Pack,
       CASE WHEN Pack > 12 THEN 'over bulk' ELSE 'unit'
       END AS Pack_bin
from sql_table
''').show()
+----+---------+
|Pack| Pack_bin|
+----+---------+
|  12|     unit|
|  12|     unit|
|  12|     unit|
|  12|     unit|
|  12|     unit|
|   6|     unit|
|  24|over bulk|
|  24|over bulk|
|   6|     unit|
|  12|     unit|
|  24|over bulk|
|  12|     unit|
|  24|over bulk|
|   6|     unit|
|  24|over bulk|
|  12|     unit|
|  12|     unit|
|  12|     unit|
|   6|     unit|
|   6|     unit|
+----+---------+
  1. withcolumn , collect_set, collect_list
  • dataframe api: withColumn
  • sql query: X
    • 기본적으로 원래 테이블의 구조를 변경하지 않음
    • select에 새로운 계산으로 만들어진 컬럼을 반환하는 방식으로 진행
  • dataframe api: collect_set, collect_list
  • sql qeury: X
    • sql table은 array 형태의 column을 지원하지 않음
# Category별로 다른 CategoryName이 몇개 있는지를 알고 싶었던 것
# pyspark의 경우 이를 size + collect_set을 통해 해결(
(df
    .withColumn("CategoryName_cnt", F.size(F.collect_set("CategoryName").over(W.partitionBy("Category"))))
    .filter(F.col("CategoryName_cnt") >= 2)
    .select("CategoryName", "Category", "CategoryName_cnt")
).show()
+--------------------+--------+----------------+
|        CategoryName|Category|CategoryName_cnt|
+--------------------+--------+----------------+
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
|STRAIGHT RYE WHIS...| 1011500|               2|
+--------------------+--------+----------------+
  • dense_rank
    • 중복 값들에 대해서 동일 순위로 표시
    • 중복 순위 다음 값에 대해서는 중복 값 개수와 상관없이 순차적인 순위 값을 출력
    • 즉, partition by를 기준으로 order by에 사용된 변수가 몇개인지 알 수 있음

max 를 통해 최대값을 가져와 몇개인지 판별에 사용함

  • with
    • 임시 테이블을 생성
    • 쿼리의 반복을 최소화하기 위함
(
    df
    .filter(F.col("Category") == 1011500)
    # sql with절로 다른 테이블로 올리는 것 때문에 java 서버 오류가 나서 일부만 필터링 해서 보여드립니다.
    .createOrReplaceTempView('sql_table2')
)

spark.sql('''
WITH new_sql_table AS (
  SELECT CategoryName, Category, max(CategoryName_dense) over (partition by Category) AS CategoryName_cnt
    FROM (
        SELECT CategoryName, Category,
              dense_rank() OVER (PARTITION BY Category order by CategoryName) AS CategoryName_dense
        FROM sql_table2
    ) as t
)
SELECT CategoryName, Category, CategoryName_cnt
FROM
  new_sql_table
WHERE CategoryName_cnt >= 2
''').show()
+--------------------+--------+----------------+
|        CategoryName|Category|CategoryName_cnt|
+--------------------+--------+----------------+
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
|BOTTLED IN BOND B...| 1011500|               2|
+--------------------+--------+----------------+
  1. to date
  • 날짜 데이터 형 변환
    • dataframe api: to_date
    • sql query: to_date
df.withColumn("Date", F.to_date(F.col("Date"), 'MM/dd/yyyy')).select("Date").show()
spark.sql('''
  SELECT TO_DATE(Date, 'MM/dd/yyyy') Date
  FROM
  sql_table2
''').show()
+----------+
|      Date|
+----------+
|2012-05-03|
|2013-10-01|
|2015-01-15|
|2015-07-07|
|2013-08-06|
|2013-09-25|
|2012-09-11|
|2012-08-20|
|2015-08-20|
|2012-05-09|
|2012-08-13|
|2013-12-16|
|2012-12-06|
|2014-06-27|
|2013-10-29|
|2012-03-28|
|2012-11-12|
|2013-11-12|
|2014-01-02|
|2015-10-21|
+----------+
  1. cast
    : 컬럼 타입 변경
df.select(F.col("StoreNumber").cast("String"))
spark.sql('''
  SELECT CAST(StoreNumber as varchar(30)) -- varchar: 가변의 문자열 타입
  FROM
  sql_table2
''').show()
profile
도전적인 개발자

0개의 댓글