스파크 SQL은 현대 스파크의 중심 축이다.
RDD 기반 연산보다 훨씬 효율적이고, 구조화된 데이터 처리를 위한 최적화된 API를 제공한다.
이 API의 핵심이 바로 DataFrame이다.
RDD vs DataFrame
| 항목 | RDD | DataFrame |
|---|---|---|
| 데이터 구조 | 비구조화된 객체 컬렉션 | 구조화된 테이블(행·열 기반) |
| 최적화 | 없음 | Catalyst Optimizer 자동 최적화 |
| 표현 방식 | 함수형 코드(map, reduce) | SQL 또는 메서드 체인 |
| 사용 난이도 | 높음 | 직관적 |
DataFrame은 기본적으로 테이블 구조의 데이터셋이다.
각 행(Row)은 필드 이름과 타입을 갖고, 열(column)은 명시적으로 접근할 수 있다.
이를 통해 SQL 명령처럼 데이터를 쿼리할 수 있고, 대규모 데이터를 클러스터 전반에 분산 처리할 수 있다.
RDD가 SparkContext를 기반으로 했다면,
DataFrame과 SparkSQL은 SparkSession을 중심으로 동작한다.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate()
SparkSession은 SQL 쿼리, DataFrame 생성, 외부 데이터베이스 연결의 진입점이다..stop()으로 명시적으로 종료해야 한다.CSV, JSON, Parquet 등 다양한 포맷의 데이터를 읽을 수 있다.
df = spark.read.option("header", "true").option("inferSchema", "true").csv("data/fakefriends.csv")
df.printSchema()
df.show(5)
.option("inferSchema", "true")를 사용하면 스파크가 데이터의 타입을 자동 유추한다.
이 덕분에 헤더가 포함된 CSV 파일은 매우 쉽게 구조화할 수 있다.
DataFrame은 SQL 테이블처럼 사용할 수 있다.
임시 뷰를 만들어 SQL 명령으로 바로 접근 가능하다.
df.createOrReplaceTempView("people")
teenagers = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
teenagers.show()
.createOrReplaceTempView() : SQL 쿼리용 임시 테이블 생성spark.sql() : SQL 명령을 DataFrame 쿼리로 변환결과는 드라이버에 수집할 수도 있다.
results = teenagers.collect()
하지만 실제 클러스터 환경에서는 .show()나 .write()로 출력만 하는 편이 낫다.
.collect()는 모든 데이터를 드라이버로 가져오므로 리소스 부담이 크다.
SQL 명령을 사용하지 않아도, 동일한 기능을 메서드 체인으로 표현할 수 있다.
from pyspark.sql import functions as F
df.select("name", "age").filter(df.age < 21).orderBy("age").show()
df.groupBy("age").count().orderBy("age").show()
select() : 열 선택filter() : 조건 필터링groupBy().agg() : 그룹 연산orderBy() : 정렬show() : 상위 행 표시SQL 구문을 선호하지 않는 개발자에게는 이 방식이 더 익숙하다.
CSV에 헤더가 없거나, 타입을 명확히 지정해야 하는 경우 StructType을 사용한다.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
schema = StructType([
StructField("stationID", StringType(), True),
StructField("date", IntegerType(), True),
StructField("measure_type", StringType(), True),
StructField("temperature", FloatType(), True)
])
df = spark.read.schema(schema).csv("data/1800.csv")
df.printSchema()
명시적 스키마는
문제:
각 나이별 평균 친구 수를 구하라. (fakefriends-header.csv)
핵심 단계
from pyspark.sql import functions as F
friends = spark.read.option("header", "true").option("inferSchema", "true").csv("fakefriends-header.csv")
avgFriends = friends.groupBy("age").agg(F.round(F.avg("friends"), 2).alias("avg_friends")).orderBy("age")
avgFriends.show()
핵심 포인트
.agg() : 그룹 집계F.avg(), F.round() : SQL 함수 호출.alias() : 계산 열 이름 지정이전 RDD 예제보다 코드량이 줄고, 최적화는 Catalyst 엔진이 자동 처리한다.
목표: RDD로 구현했던 단어 빈도 계산을 DataFrame과 SQL 함수로 구현.
from pyspark.sql import functions as F
lines = spark.read.text("book.txt")
words = lines.select(F.explode(F.split(F.col("value"), "\\W+")).alias("word"))
words = words.filter(words.word != "")
lowered = words.select(F.lower(F.col("word")).alias("word"))
wordCounts = lowered.groupBy("word").count().orderBy(F.desc("count"))
wordCounts.show(10)
explode + split → flatMap과 동일한 효과lower() → 대소문자 정규화.groupBy().count() → 단어 빈도 계산.orderBy(desc("count")) → 자주 등장한 단어 순으로 정렬이 방식은 SQL 기반 함수의 효율을 그대로 활용한다.
헤더 없는 CSV에서 명시적 스키마를 정의해 최저 온도 추출.
minTemps = df.filter(df.measure_type == "TMIN")
stationTemps = minTemps.select("stationID", "temperature")
minTempsByStation = stationTemps.groupBy("stationID").min("temperature")
minTempsByStation.withColumn("temperature_F",
F.round(F.col("min(temperature)") * 0.1 * (9.0 / 5.0) + 32.0, 2)
).orderBy("temperature_F").show()
.withColumn() : 기존 데이터프레임에 새 열 추가.min() : 그룹 내 최소값 계산round() : 시각적 정리schema = StructType([
StructField("cust_ID", IntegerType(), True),
StructField("item_ID", IntegerType(), True),
StructField("amount_spent", FloatType(), True)
])
orders = spark.read.schema(schema).csv("customer-orders.csv")
totalSpent = orders.groupBy("cust_ID").agg(F.round(F.sum("amount_spent"), 2).alias("total_spent"))
totalSpent.orderBy(F.desc("total_spent")).show()
.sum()으로 고객별 지출 합산.orderBy(desc())로 상위 지출 고객 정렬.alias()로 명확한 열 이름 설정결과 예시:
| cust_ID | total_spent |
|---|---|
| 68 | 6375.45 |
| 45 | 5520.37 |
| 79 | 4928.53 |
SparkSQL과 DataFrame은 스파크의 현재이자 미래다.
RDD보다 간결하고, SQL 친화적이며, 구조화된 데이터에 적합하다.
기계학습(MLlib)과 스트리밍(Structured Streaming)도 이 기반 위에서 동작한다.
데이터를 SQL처럼 다루고 싶다면 DataFrame,
직접적인 함수형 처리가 필요하면 RDD를 사용하라.
두 인터페이스는 상호 변환 가능하며, 함께 사용될 때 가장 강력하다.
💡 핵심 요약