SQL 쿼리를 사용하려면 데이터프레임을 임시 뷰로 등록해야 합니다:
# 데이터프레임을 Temp View로 변환
df.createOrReplaceTempView('sql_table')
SQL 쿼리를 사용하여 데이터 선택:
spark.sql('''
SELECT Date, StoreName
FROM sql_table
''').show()
| DataFrame API | SQL Query | 기능 |
|---|---|---|
| select | select | 선택한 컬럼을 반환 |
| count | count | row의 개수를 반환 |
| filter, where | where | 조건을 걸어 필터링 |
| isNull | IS NULL | NULL 값인 경우 True를 반환 |
| alias | as | 컬럼 이름 변경 |
| when | case when | 조건에 따라 다른 값을 반환 |
| drop | X | 컬럼을 삭제 |
| withColumn | X | 컬럼을 추가 또는 업데이트 |
| collect_set, list | X | 조건에 맞는 값을 set, list로 반환 |
| to_date | to_date | 날짜 형식으로 변환 |
# DataFrame API
df.filter(F.col("City").isNull()).select("StoreName", F.col("City").alias("City_null")).show()
# SQL Query
spark.sql('''
SELECT StoreName, City AS City_null
FROM sql_table
WHERE City IS NULL
''').show()
# DataFrame API
df.select(
"Pack",
F.when(F.col("Pack") > 12, "over bulk").otherwise("unit").alias("Pack_bin")
).show()
# SQL Query
spark.sql('''
SELECT Pack,
CASE WHEN Pack > 12 THEN 'over bulk' ELSE 'unit' END AS Pack_bin
FROM sql_table
''').show()
from pyspark.sql import functions as F
from pyspark.sql import Window as W
df = df.withColumn(
"CategoryName_cnt",
F.size(F.collect_set("CategoryName").over(W.partitionBy("Category")))
).filter(F.col("CategoryName_cnt") >= 2)
# 결과 출력
df.select("CategoryName", "Category", "CategoryName_cnt").show()
spark.sql('''
WITH new_sql_table AS (
SELECT CategoryName, Category,
MAX(CategoryName_dense) OVER (PARTITION BY Category) AS CategoryName_cnt
FROM (
SELECT CategoryName, Category,
DENSE_RANK() OVER (PARTITION BY Category ORDER BY CategoryName) AS CategoryName_dense
FROM sql_table
)
)
SELECT CategoryName, Category, CategoryName_cnt
FROM new_sql_table
WHERE CategoryName_cnt >= 2
''').show()
컬럼 타입을 변환하려면 DataFrame API와 SQL Query를 사용합니다:
# DataFrame API
df.select(F.col("StoreNumber").cast("String")).show()
# SQL Query
spark.sql('''
SELECT CAST(StoreNumber AS VARCHAR(30)) AS StoreNumber
FROM sql_table
''').show()
날짜 데이터를 변환하는 방법은 아래와 같습니다:
# DataFrame API
df.withColumn("Date", F.to_date(F.col("Date"), 'MM/dd/yyyy')).select("Date").show()
# SQL Query
spark.sql('''
SELECT TO_DATE(Date, 'MM/dd/yyyy') AS Date
FROM sql_table
''').show()
PySpark DataFrame API와 SQL Query는 각각 장단점이 있으며, 데이터 처리와 분석 목적에 맞게 선택하여 사용할 수 있습니다. SQL Query를 활용하면 직관적으로 데이터를 분석할 수 있으며, DataFrame API는 더 복잡한 연산과 변환에 유리합니다.