PySpark의 SQL vs Dataframe API

Ryan·2025년 1월 18일

SQL/Python 분석

목록 보기
87/94

PySpark와 SQL Query 활용하기

1. PySpark 데이터프레임을 SQL 쿼리로 변환

SQL 쿼리를 사용하려면 데이터프레임을 임시 뷰로 등록해야 합니다:

# 데이터프레임을 Temp View로 변환
df.createOrReplaceTempView('sql_table')

SQL 쿼리를 사용하여 데이터 선택:

spark.sql('''
    SELECT Date, StoreName
    FROM sql_table
''').show()

2. DataFrame API와 SQL Query 비교

DataFrame APISQL Query기능
selectselect선택한 컬럼을 반환
countcountrow의 개수를 반환
filter, wherewhere조건을 걸어 필터링
isNullIS NULLNULL 값인 경우 True를 반환
aliasas컬럼 이름 변경
whencase when조건에 따라 다른 값을 반환
dropX컬럼을 삭제
withColumnX컬럼을 추가 또는 업데이트
collect_set, listX조건에 맞는 값을 set, list로 반환
to_dateto_date날짜 형식으로 변환

주요 차이점

  1. DataFrame API는 Action 또는 Transformation을 사용합니다.
  2. SQL Query는 항상 새로운 데이터프레임을 반환하며, select 구문이 필수입니다.

3. SQL Query 활용 예제

Null 값 처리

# DataFrame API
df.filter(F.col("City").isNull()).select("StoreName", F.col("City").alias("City_null")).show()

# SQL Query
spark.sql('''
    SELECT StoreName, City AS City_null
    FROM sql_table
    WHERE City IS NULL
''').show()

조건부 컬럼 생성

# DataFrame API
df.select(
    "Pack",
    F.when(F.col("Pack") > 12, "over bulk").otherwise("unit").alias("Pack_bin")
).show()

# SQL Query
spark.sql('''
    SELECT Pack,
           CASE WHEN Pack > 12 THEN 'over bulk' ELSE 'unit' END AS Pack_bin
    FROM sql_table
''').show()

4. 집계와 그룹화

데이터프레임 API

from pyspark.sql import functions as F
from pyspark.sql import Window as W

df = df.withColumn(
    "CategoryName_cnt",
    F.size(F.collect_set("CategoryName").over(W.partitionBy("Category")))
).filter(F.col("CategoryName_cnt") >= 2)

# 결과 출력
df.select("CategoryName", "Category", "CategoryName_cnt").show()

SQL Query

spark.sql('''
    WITH new_sql_table AS (
        SELECT CategoryName, Category,
               MAX(CategoryName_dense) OVER (PARTITION BY Category) AS CategoryName_cnt
        FROM (
            SELECT CategoryName, Category,
                   DENSE_RANK() OVER (PARTITION BY Category ORDER BY CategoryName) AS CategoryName_dense
            FROM sql_table
        )
    )
    SELECT CategoryName, Category, CategoryName_cnt
    FROM new_sql_table
    WHERE CategoryName_cnt >= 2
''').show()

5. 컬럼 타입 변경

컬럼 타입을 변환하려면 DataFrame API와 SQL Query를 사용합니다:

# DataFrame API
df.select(F.col("StoreNumber").cast("String")).show()

# SQL Query
spark.sql('''
    SELECT CAST(StoreNumber AS VARCHAR(30)) AS StoreNumber
    FROM sql_table
''').show()

6. 날짜 형식 변환

날짜 데이터를 변환하는 방법은 아래와 같습니다:

# DataFrame API
df.withColumn("Date", F.to_date(F.col("Date"), 'MM/dd/yyyy')).select("Date").show()

# SQL Query
spark.sql('''
    SELECT TO_DATE(Date, 'MM/dd/yyyy') AS Date
    FROM sql_table
''').show()

결론

PySpark DataFrame API와 SQL Query는 각각 장단점이 있으며, 데이터 처리와 분석 목적에 맞게 선택하여 사용할 수 있습니다. SQL Query를 활용하면 직관적으로 데이터를 분석할 수 있으며, DataFrame API는 더 복잡한 연산과 변환에 유리합니다.

0개의 댓글