PySpark 1 - DataFrame 생성

no-glass-otacku·2026년 6월 19일

MS data school

목록 보기
20/25

Spark SQL과 DataFrame API

SQL과 Python DataFrame API, 두 가지 방법으로 같은 쿼리를 표현하는 방법 정리


읽기 전에 — 핵심 개념 정리

Q. 내용은 PySpark인데 왜 제목이 Spark SQL이야?

Spark SQL은 "SQL만 쓰는 것"이 아니라 구조화된 데이터를 처리하는 Spark 모듈 전체의 이름이다.

Spark SQL 모듈
├── SQL 방식          (%sql 셀에서 SELECT * FROM ...)
└── DataFrame API    (Python으로 spark.table(...).select(...))

둘 다 같은 Spark SQL 엔진을 쓰고, 결과도 동일하다. PySpark는 Python으로 Spark를 쓰는 것이고, DataFrame API가 PySpark의 핵심이다.

Q. SQL에서는 Spark를 쓰는지 어떻게 분간해?

Databricks에서 %sql 셀에 쓰는 SQL은 전부 Spark SQL이다.

Databricks 노트북
  %sql 셀         → 무조건 Spark SQL
  %python 셀      → spark. 붙으면 Spark 사용 중

Databricks 밖
  MySQL에서 SELECT ...      → 일반 SQL (Spark 아님)
  spark.sql("SELECT ...")   → Spark SQL (명시적으로 표시됨)

Databricks 안에서는 "항상 Spark SQL이다"라고 이해하면 된다.


1. 두 가지 방법으로 같은 쿼리 쓰기

Spark SQL은 SQL과 DataFrame API, 두 가지 방법으로 동일한 쿼리를 표현할 수 있다.

방법 1: SQL

%sql
SELECT name, price
FROM products
WHERE price < 200
ORDER BY price

방법 2: DataFrame API (Python)

display(spark
        .table("products")
        .select("name", "price")
        .where("price < 200")
        .orderBy("price")
       )

둘 다 완전히 동일한 결과를 반환한다. 내부적으로 같은 쿼리 계획으로 변환되어 실행된다.


2. SparkSession

SparkSessionDataFrame API를 사용하는 Spark의 모든 기능에 대한 입구다.

Databricks 노트북에서는 자동으로 생성되어 spark 변수에 저장된다.

spark  # 이미 만들어져 있음

주요 메서드:

메서드설명
spark.sql("쿼리")SQL 실행 → DataFrame 반환
spark.table("테이블명")테이블을 DataFrame으로 가져오기
spark.read파일에서 DataFrame 읽기
spark.createDataFrame(...)테스트용 DataFrame 직접 생성
# SQL 실행 결과를 DataFrame으로 받기
result_df = spark.sql("""
SELECT name, price
FROM products
WHERE price < 200
ORDER BY price
""")
display(result_df)

3. DataFrame

Q. DataFrame은 왜 만들어? 테이블이랑 같은 거야?

비슷하지만 다르다.

테이블                          DataFrame
──────────────────              ──────────────────
메타스토어에 등록됨               Python 변수에 저장됨
세션 끝나도 유지됨                세션 끝나면 사라짐
SQL에서 이름으로 조회 가능         Python 코드에서만 사용

DataFrame을 만드는 이유는 Python 코드 안에서 데이터를 변수처럼 다루기 위해서다.

DataFrame 생성 방법

① spark.table() — 테이블을 DataFrame으로 가져오기

products_df = spark.table("products")

② spark.sql() — SQL 실행 결과를 DataFrame으로 받기

result_df = spark.sql("""
  SELECT name, price
  FROM products
  WHERE price < 200
  ORDER BY price
""")
display(result_df)

③ spark.read — 파일에서 DataFrame 읽기

# CSV 파일에서 읽기
df = spark.read.csv("/path/to/file.csv", header=True)

# Parquet 파일에서 읽기
df = spark.read.parquet("/path/to/file.parquet")

# JSON 파일에서 읽기
df = spark.read.json("/path/to/file.json")

④ spark.createDataFrame() — 테스트용 DataFrame 직접 만들기

# 튜플 리스트로 직접 생성
data = [("Alice", 25), ("Bob", 30), ("Carol", 22)]
df = spark.createDataFrame(data, ["name", "age"])
display(df)

⑤ spark.range() — 숫자 범위로 DataFrame 만들기

# 0부터 9까지 숫자 DataFrame
df = spark.range(10)
display(df)

DataFrame 변수처럼 다루기

# 테이블에서 가져와서 변환 후 변수에 저장
budget_df = (spark
             .table("products")
             .select("name", "price")
             .where("price < 200")
             .orderBy("price")
            )

display(budget_df)  # 결과 출력

스키마 확인

# 스키마 속성으로 보기
budget_df.schema

# printSchema()로 보기 (더 읽기 쉬움)
budget_df.printSchema()

# 출력 예시:
# root
#  |-- name: string (nullable = true)
#  |-- price: double (nullable = true)

4. 변환(Transformation) vs 액션(Action)

DataFrame 메서드는 두 종류로 나뉜다.

변환 — 실행되지 않고 계획만 세움 (지연 평가)

# 이 코드는 실행해도 계산이 안 됨!
(products_df
  .select("name", "price")
  .where("price < 200")
  .orderBy("price"))
# → "이렇게 할 거야" 라는 계획만 만들어둠
변환 메서드설명예시
.select("컬럼1", "컬럼2")컬럼 선택.select("name", "price")
.where("조건")행 필터링.where("device = 'macOS'")
.filter("조건")where와 동일.filter("price < 200")
.orderBy("컬럼")오름차순 정렬.orderBy("price")
.drop("컬럼")컬럼 제거.drop("price")
.withColumn("새컬럼", 식)새 컬럼 추가/수정.withColumn("tax", col("price")*0.1)
.withColumnRenamed("기존", "새이름")컬럼 이름 변경.withColumnRenamed("name", "item_name")
.limit(n)상위 n개 행만.limit(10)
.distinct()중복 행 제거.distinct()
.groupBy("컬럼")그룹화.groupBy("category").count()

액션 — 실제 계산을 실행시키는 메서드

# .show()가 붙어야 비로소 실행됨
(products_df
  .select("name", "price")
  .where("price < 200")
  .orderBy("price")
  .show())           # ← 여기서 실제 계산 시작
액션 메서드설명예시
.show()상위 20개 행 출력df.show()
.show(n)상위 n개 행 출력df.show(5)
display(df)Databricks에서 표 형식으로 출력display(budget_df)
.count()행 수 반환 (숫자)budget_df.count()
.collect()모든 행을 Python 배열로 반환budget_df.collect()
.take(n)처음 n개 행을 배열로 반환budget_df.take(3)
.first()첫 번째 행 반환budget_df.first()
.describe()숫자/문자열 컬럼 기본 통계df.describe().show()

왜 지연 평가를 쓰냐면:

변환 10개를 연결해도 Spark는 실행 안 함
        ↓
액션 호출 시 변환 10개를 한꺼번에 최적화해서 실행
        ↓
불필요한 중간 계산 없이 한 번에 처리 → 빠름

5. DataFrame ↔ SQL 변환

DataFrame을 SQL에서 쓰고 싶으면 임시 뷰로 등록하면 된다.

# DataFrame → 임시 뷰 등록
budget_df.createOrReplaceTempView("budget")
-- 이제 SQL에서 budget 테이블처럼 쓸 수 있음
SELECT * FROM budget
# Python에서도 SQL처럼 쓰고 싶으면
display(spark.sql("SELECT * FROM budget"))

전체 흐름 정리

같은 쿼리를 두 가지로 표현 가능
  SQL (%sql 셀)                    DataFrame API (Python)
  SELECT * FROM products    ↔     spark.table("products")
  WHERE price < 200         ↔     .where("price < 200")
  ORDER BY price            ↔     .orderBy("price")

DataFrame 쓸 때 흐름
  spark.table() or spark.sql()     ← 생성
    ↓
  .select() .where() .orderBy()    ← 변환 (지연 평가, 실행 안 됨)
    ↓
  .show() .count() .collect()      ← 액션 (여기서 실제 실행)

DataFrame ↔ SQL 오가기
  df.createOrReplaceTempView("이름")  → SQL에서 테이블처럼 사용 가능
  spark.sql("SELECT * FROM 이름")     → SQL 결과를 DataFrame으로
profile
이제 개발해야지...

0개의 댓글