1️⃣ SparkSQL과 DataFrame 개요

스파크 SQL은 현대 스파크의 중심 축이다.
RDD 기반 연산보다 훨씬 효율적이고, 구조화된 데이터 처리를 위한 최적화된 API를 제공한다.
이 API의 핵심이 바로 DataFrame이다.

  • RDD vs DataFrame

    항목RDDDataFrame
    데이터 구조비구조화된 객체 컬렉션구조화된 테이블(행·열 기반)
    최적화없음Catalyst Optimizer 자동 최적화
    표현 방식함수형 코드(map, reduce)SQL 또는 메서드 체인
    사용 난이도높음직관적

DataFrame은 기본적으로 테이블 구조의 데이터셋이다.
각 행(Row)은 필드 이름과 타입을 갖고, 열(column)은 명시적으로 접근할 수 있다.
이를 통해 SQL 명령처럼 데이터를 쿼리할 수 있고, 대규모 데이터를 클러스터 전반에 분산 처리할 수 있다.


2️⃣ SparkSession과 SQL 인터페이스

RDD가 SparkContext를 기반으로 했다면,
DataFrame과 SparkSQL은 SparkSession을 중심으로 동작한다.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate()
  • SparkSession은 SQL 쿼리, DataFrame 생성, 외부 데이터베이스 연결의 진입점이다.
  • 세션은 .stop()으로 명시적으로 종료해야 한다.

CSV, JSON, Parquet 등 다양한 포맷의 데이터를 읽을 수 있다.

df = spark.read.option("header", "true").option("inferSchema", "true").csv("data/fakefriends.csv")
df.printSchema()
df.show(5)

.option("inferSchema", "true")를 사용하면 스파크가 데이터의 타입을 자동 유추한다.
이 덕분에 헤더가 포함된 CSV 파일은 매우 쉽게 구조화할 수 있다.


3️⃣ SQL 쿼리로 DataFrame 다루기

DataFrame은 SQL 테이블처럼 사용할 수 있다.
임시 뷰를 만들어 SQL 명령으로 바로 접근 가능하다.

df.createOrReplaceTempView("people")
teenagers = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
teenagers.show()
  • .createOrReplaceTempView() : SQL 쿼리용 임시 테이블 생성
  • spark.sql() : SQL 명령을 DataFrame 쿼리로 변환
  • 반환값은 DataFrame이므로, 추가 연산 체이닝 가능

결과는 드라이버에 수집할 수도 있다.

results = teenagers.collect()

하지만 실제 클러스터 환경에서는 .show().write()로 출력만 하는 편이 낫다.
.collect()는 모든 데이터를 드라이버로 가져오므로 리소스 부담이 크다.


4️⃣ SQL 없이 DataFrame API 사용하기

SQL 명령을 사용하지 않아도, 동일한 기능을 메서드 체인으로 표현할 수 있다.

from pyspark.sql import functions as F

df.select("name", "age").filter(df.age < 21).orderBy("age").show()
df.groupBy("age").count().orderBy("age").show()
  • select() : 열 선택
  • filter() : 조건 필터링
  • groupBy().agg() : 그룹 연산
  • orderBy() : 정렬
  • show() : 상위 행 표시

SQL 구문을 선호하지 않는 개발자에게는 이 방식이 더 익숙하다.


5️⃣ 명시적 스키마 정의

CSV에 헤더가 없거나, 타입을 명확히 지정해야 하는 경우 StructType을 사용한다.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

schema = StructType([
    StructField("stationID", StringType(), True),
    StructField("date", IntegerType(), True),
    StructField("measure_type", StringType(), True),
    StructField("temperature", FloatType(), True)
])

df = spark.read.schema(schema).csv("data/1800.csv")
df.printSchema()

명시적 스키마는

  • 자동 타입 추론의 오차를 줄이고
  • 쿼리 최적화를 향상시키며
  • 대용량 데이터 로드 속도를 높인다.

6️⃣ 실습 ① — 연령별 평균 친구 수 계산

문제:
각 나이별 평균 친구 수를 구하라. (fakefriends-header.csv)

핵심 단계

from pyspark.sql import functions as F

friends = spark.read.option("header", "true").option("inferSchema", "true").csv("fakefriends-header.csv")

avgFriends = friends.groupBy("age").agg(F.round(F.avg("friends"), 2).alias("avg_friends")).orderBy("age")
avgFriends.show()

핵심 포인트

  • .agg() : 그룹 집계
  • F.avg(), F.round() : SQL 함수 호출
  • .alias() : 계산 열 이름 지정

이전 RDD 예제보다 코드량이 줄고, 최적화는 Catalyst 엔진이 자동 처리한다.


7️⃣ 실습 ② — Word Count (데이터프레임 버전)

목표: RDD로 구현했던 단어 빈도 계산을 DataFrame과 SQL 함수로 구현.

from pyspark.sql import functions as F

lines = spark.read.text("book.txt")
words = lines.select(F.explode(F.split(F.col("value"), "\\W+")).alias("word"))
words = words.filter(words.word != "")
lowered = words.select(F.lower(F.col("word")).alias("word"))
wordCounts = lowered.groupBy("word").count().orderBy(F.desc("count"))
wordCounts.show(10)
  • explode + splitflatMap과 동일한 효과
  • lower() → 대소문자 정규화
  • .groupBy().count() → 단어 빈도 계산
  • .orderBy(desc("count")) → 자주 등장한 단어 순으로 정렬

이 방식은 SQL 기반 함수의 효율을 그대로 활용한다.


8️⃣ 실습 ③ — 기상 데이터의 최저 온도 계산

헤더 없는 CSV에서 명시적 스키마를 정의해 최저 온도 추출.

minTemps = df.filter(df.measure_type == "TMIN")
stationTemps = minTemps.select("stationID", "temperature")
minTempsByStation = stationTemps.groupBy("stationID").min("temperature")

minTempsByStation.withColumn("temperature_F",
                             F.round(F.col("min(temperature)") * 0.1 * (9.0 / 5.0) + 32.0, 2)
                            ).orderBy("temperature_F").show()
  • .withColumn() : 기존 데이터프레임에 새 열 추가
  • .min() : 그룹 내 최소값 계산
  • 섭씨 → 화씨 변환
  • round() : 시각적 정리

9️⃣ 실습 ④ — 고객별 총 지출액 계산

schema = StructType([
    StructField("cust_ID", IntegerType(), True),
    StructField("item_ID", IntegerType(), True),
    StructField("amount_spent", FloatType(), True)
])

orders = spark.read.schema(schema).csv("customer-orders.csv")
totalSpent = orders.groupBy("cust_ID").agg(F.round(F.sum("amount_spent"), 2).alias("total_spent"))
totalSpent.orderBy(F.desc("total_spent")).show()
  • .sum()으로 고객별 지출 합산
  • .orderBy(desc())로 상위 지출 고객 정렬
  • .alias()로 명확한 열 이름 설정

결과 예시:

cust_IDtotal_spent
686375.45
455520.37
794928.53

🔟 요약

SparkSQL과 DataFrame은 스파크의 현재이자 미래다.
RDD보다 간결하고, SQL 친화적이며, 구조화된 데이터에 적합하다.
기계학습(MLlib)과 스트리밍(Structured Streaming)도 이 기반 위에서 동작한다.

데이터를 SQL처럼 다루고 싶다면 DataFrame,
직접적인 함수형 처리가 필요하면 RDD를 사용하라.
두 인터페이스는 상호 변환 가능하며, 함께 사용될 때 가장 강력하다.


💡 핵심 요약

  • SparkSession은 SQL 진입점
  • DataFrame = 분산 테이블
  • SQL과 메서드 체인 둘 다 지원
  • 명시적 스키마와 inferSchema 둘 다 활용
  • DataFrame은 Catalyst Optimizer로 자동 최적화
profile
okorion's Tech Study Blog.

0개의 댓글