SQL과 Python DataFrame API, 두 가지 방법으로 같은 쿼리를 표현하는 방법 정리
Q. 내용은 PySpark인데 왜 제목이 Spark SQL이야?
Spark SQL은 "SQL만 쓰는 것"이 아니라 구조화된 데이터를 처리하는 Spark 모듈 전체의 이름이다.
Spark SQL 모듈
├── SQL 방식 (%sql 셀에서 SELECT * FROM ...)
└── DataFrame API (Python으로 spark.table(...).select(...))
둘 다 같은 Spark SQL 엔진을 쓰고, 결과도 동일하다. PySpark는 Python으로 Spark를 쓰는 것이고, DataFrame API가 PySpark의 핵심이다.
Q. SQL에서는 Spark를 쓰는지 어떻게 분간해?
Databricks에서 %sql 셀에 쓰는 SQL은 전부 Spark SQL이다.
Databricks 노트북
%sql 셀 → 무조건 Spark SQL
%python 셀 → spark. 붙으면 Spark 사용 중
Databricks 밖
MySQL에서 SELECT ... → 일반 SQL (Spark 아님)
spark.sql("SELECT ...") → Spark SQL (명시적으로 표시됨)
Databricks 안에서는 "항상 Spark SQL이다"라고 이해하면 된다.
Spark SQL은 SQL과 DataFrame API, 두 가지 방법으로 동일한 쿼리를 표현할 수 있다.
방법 1: SQL
%sql
SELECT name, price
FROM products
WHERE price < 200
ORDER BY price
방법 2: DataFrame API (Python)
display(spark
.table("products")
.select("name", "price")
.where("price < 200")
.orderBy("price")
)
둘 다 완전히 동일한 결과를 반환한다. 내부적으로 같은 쿼리 계획으로 변환되어 실행된다.
SparkSession은 DataFrame API를 사용하는 Spark의 모든 기능에 대한 입구다.
Databricks 노트북에서는 자동으로 생성되어 spark 변수에 저장된다.
spark # 이미 만들어져 있음
주요 메서드:
| 메서드 | 설명 |
|---|---|
spark.sql("쿼리") | SQL 실행 → DataFrame 반환 |
spark.table("테이블명") | 테이블을 DataFrame으로 가져오기 |
spark.read | 파일에서 DataFrame 읽기 |
spark.createDataFrame(...) | 테스트용 DataFrame 직접 생성 |
# SQL 실행 결과를 DataFrame으로 받기
result_df = spark.sql("""
SELECT name, price
FROM products
WHERE price < 200
ORDER BY price
""")
display(result_df)
Q. DataFrame은 왜 만들어? 테이블이랑 같은 거야?
비슷하지만 다르다.
테이블 DataFrame
────────────────── ──────────────────
메타스토어에 등록됨 Python 변수에 저장됨
세션 끝나도 유지됨 세션 끝나면 사라짐
SQL에서 이름으로 조회 가능 Python 코드에서만 사용
DataFrame을 만드는 이유는 Python 코드 안에서 데이터를 변수처럼 다루기 위해서다.
① spark.table() — 테이블을 DataFrame으로 가져오기
products_df = spark.table("products")
② spark.sql() — SQL 실행 결과를 DataFrame으로 받기
result_df = spark.sql("""
SELECT name, price
FROM products
WHERE price < 200
ORDER BY price
""")
display(result_df)
③ spark.read — 파일에서 DataFrame 읽기
# CSV 파일에서 읽기
df = spark.read.csv("/path/to/file.csv", header=True)
# Parquet 파일에서 읽기
df = spark.read.parquet("/path/to/file.parquet")
# JSON 파일에서 읽기
df = spark.read.json("/path/to/file.json")
④ spark.createDataFrame() — 테스트용 DataFrame 직접 만들기
# 튜플 리스트로 직접 생성
data = [("Alice", 25), ("Bob", 30), ("Carol", 22)]
df = spark.createDataFrame(data, ["name", "age"])
display(df)
⑤ spark.range() — 숫자 범위로 DataFrame 만들기
# 0부터 9까지 숫자 DataFrame
df = spark.range(10)
display(df)
# 테이블에서 가져와서 변환 후 변수에 저장
budget_df = (spark
.table("products")
.select("name", "price")
.where("price < 200")
.orderBy("price")
)
display(budget_df) # 결과 출력
# 스키마 속성으로 보기
budget_df.schema
# printSchema()로 보기 (더 읽기 쉬움)
budget_df.printSchema()
# 출력 예시:
# root
# |-- name: string (nullable = true)
# |-- price: double (nullable = true)
DataFrame 메서드는 두 종류로 나뉜다.
# 이 코드는 실행해도 계산이 안 됨!
(products_df
.select("name", "price")
.where("price < 200")
.orderBy("price"))
# → "이렇게 할 거야" 라는 계획만 만들어둠
| 변환 메서드 | 설명 | 예시 |
|---|---|---|
.select("컬럼1", "컬럼2") | 컬럼 선택 | .select("name", "price") |
.where("조건") | 행 필터링 | .where("device = 'macOS'") |
.filter("조건") | where와 동일 | .filter("price < 200") |
.orderBy("컬럼") | 오름차순 정렬 | .orderBy("price") |
.drop("컬럼") | 컬럼 제거 | .drop("price") |
.withColumn("새컬럼", 식) | 새 컬럼 추가/수정 | .withColumn("tax", col("price")*0.1) |
.withColumnRenamed("기존", "새이름") | 컬럼 이름 변경 | .withColumnRenamed("name", "item_name") |
.limit(n) | 상위 n개 행만 | .limit(10) |
.distinct() | 중복 행 제거 | .distinct() |
.groupBy("컬럼") | 그룹화 | .groupBy("category").count() |
# .show()가 붙어야 비로소 실행됨
(products_df
.select("name", "price")
.where("price < 200")
.orderBy("price")
.show()) # ← 여기서 실제 계산 시작
| 액션 메서드 | 설명 | 예시 |
|---|---|---|
.show() | 상위 20개 행 출력 | df.show() |
.show(n) | 상위 n개 행 출력 | df.show(5) |
display(df) | Databricks에서 표 형식으로 출력 | display(budget_df) |
.count() | 행 수 반환 (숫자) | budget_df.count() |
.collect() | 모든 행을 Python 배열로 반환 | budget_df.collect() |
.take(n) | 처음 n개 행을 배열로 반환 | budget_df.take(3) |
.first() | 첫 번째 행 반환 | budget_df.first() |
.describe() | 숫자/문자열 컬럼 기본 통계 | df.describe().show() |
왜 지연 평가를 쓰냐면:
변환 10개를 연결해도 Spark는 실행 안 함
↓
액션 호출 시 변환 10개를 한꺼번에 최적화해서 실행
↓
불필요한 중간 계산 없이 한 번에 처리 → 빠름
DataFrame을 SQL에서 쓰고 싶으면 임시 뷰로 등록하면 된다.
# DataFrame → 임시 뷰 등록
budget_df.createOrReplaceTempView("budget")
-- 이제 SQL에서 budget 테이블처럼 쓸 수 있음
SELECT * FROM budget
# Python에서도 SQL처럼 쓰고 싶으면
display(spark.sql("SELECT * FROM budget"))
같은 쿼리를 두 가지로 표현 가능
SQL (%sql 셀) DataFrame API (Python)
SELECT * FROM products ↔ spark.table("products")
WHERE price < 200 ↔ .where("price < 200")
ORDER BY price ↔ .orderBy("price")
DataFrame 쓸 때 흐름
spark.table() or spark.sql() ← 생성
↓
.select() .where() .orderBy() ← 변환 (지연 평가, 실행 안 됨)
↓
.show() .count() .collect() ← 액션 (여기서 실제 실행)
DataFrame ↔ SQL 오가기
df.createOrReplaceTempView("이름") → SQL에서 테이블처럼 사용 가능
spark.sql("SELECT * FROM 이름") → SQL 결과를 DataFrame으로