Spark, 데이터 표현계층

Jeonghak Cho·2025년 3월 31일

Spark

목록 보기
9/12

📗스파크의 데이터 표현 계층

🏳️‍🌈 [궁금한점]

  • 데이터 표현 계층 종류
    • 스키마, 데이터프레임,데이터세트,스파크SQL 개념

🔗[목차]

데이터 표현 모델

스키마

스키마는 데이터프레임의 구조(Structure) 를 설명하는 메타데이터이다.
스키마에는 다음 정보가 포함된다.

  • 컬럼명 (name, age, salary)
  • 컬럼유형 (StringType, IntegerType, DoubleType)
  • 컬럼 nullable(NULL 허용 여부) 여부

CSV 에서 읽은 데이터를 데이터프레임에 넣고 printSchema()함수를 호출하여 스키마를 출력해 볼 수 있다.

val df = spark.read.option("header", "true").csv("data.csv")

// 스키마 확인
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: double (nullable = true)

데이터프레임

데이터프레임(DataFrame)은 스키마를 가진 데이터 컬렉션 (RDD보다 구조화된 데이터)이다.즉, 스키마가 적용된 RDD라고 볼 수 있으며, SQL과 유사한 방식으로 데이터를 다룰 수 있다.
소스를 확인하면 RDD와 스키마의 입력으로 데이터프레임을 만들고 있다. show() 함수는 RDBMS의 테이블 처럼 데이터프레임의 내용을을 출력해 준다.

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// 스키마 정의
val schema = StructType(Array(
  StructField("name", StringType, true),
  StructField("age", IntegerType, true),
  StructField("salary", DoubleType, true)
))

// 데이터를 Row 객체로 변환
val data = Seq(
  Row("Alice", 25, 50000.0),
  Row("Bob", 30, 60000.0)
)

// RDD에서 DataFrame 생성
val rdd = spark.sparkContext.parallelize(data)
val df = spark.createDataFrame(rdd, schema)

// 데이터 확인
df.show()

+-----+---+-------+
| name|age|salary |
+-----+---+-------+
|Alice| 25|50000.0|
|  Bob| 30|60000.0|
+-----+---+-------+

RDD

RDD (Resilient Distributed Dataset)는 Apache Spark의 핵심 데이터 구조로, 불변(Immutable)하며 분산(Distributed)된 데이터 컬렉션이다. Spark에서 데이터를 분산 처리하고, 병렬 연산을 수행하는 기본 단위로 사용된다.

DataFrame과 Dataset이 더 최적화되어 있으므로 기본적으로는 RDD를 사용하지 않는 것이 좋다. 하지만 다음과 같은 경우에는 RDD가 유용할 수 있다.

  • 비정형 데이터 처리: 텍스트, 바이너리 데이터를 직접 다뤄야 할 때
  • 사용자 정의 파티셔닝이 필요할 때: RDD의 partitionBy 활용 가능
  • Spark SQL/Dataset API가 지원하지 않는 연산이 필요할 때
  • 직접적인 분산 제어가 필요할 때: RDD는 더 세밀한 제어 가능

RDD의 주요 특징

Resilient (복구 가능)

  • 노드 장애가 발생해도 데이터가 손실되지 않음
  • 체크포인트나 Lineage(계보) 정보를 통해 재연산 가능

Distributed (분산)

  • 클러스터의 여러 노드에 데이터를 분산 저장
  • 각 노드에서 병렬 연산 수행

Immutable (불변성)

  • 한 번 생성된 RDD는 변경할 수 없음
  • 변환(Transformation) 연산을 수행하면 새로운 RDD가 생성됨

Lazy Evaluation (지연 실행)

  • Transformation(변환) 연산은 즉시 실행되지 않고 DAG(Directed Acyclic Graph)로 저장됨
  • Action(실행) 연산을 호출해야 실제 연산이 수행됨

Fault-Tolerant (장애 복구 가능)

  • RDD의 Lineage 정보를 이용해 장애 발생 시 데이터 복구 가능

RDD의 주요 연산

RDD는 두 가지 연산을 지원합니다.

Transformation (변환 연산, Lazy Evaluation)

  • 기존 RDD에서 새로운 RDD를 생성하는 연산 (지연 실행됨)
    예: map, filter, flatMap, reduceByKey, groupByKey
val rdd = spark.sparkContext.parallelize(Seq(1, 2, 3, 4, 5))
val squaredRdd = rdd.map(x => x * x)  // Transformation (즉시 실행되지 않음)

Action (실행 연산, 즉시 실행)

  • Transformation을 실행하고 결과를 반환하는 연산 (즉시 실행됨)
    예: collect, count, reduce, take, saveAsTextFile
val result = squaredRdd.collect()  // Action (여기서 실제 실행됨)
println(result.mkString(", "))  // 출력: 1, 4, 9, 16, 25

RDD의 생성 방법

RDD는 세 가지 방식으로 생성할 수 있습니다.
기존 데이터에서 생성

val data = Seq(1, 2, 3, 4, 5)
val rdd = spark.sparkContext.parallelize(data)

외부 파일에서 생성 (HDFS, S3, 로컬 파일)

val rdd = spark.sparkContext.textFile("hdfs://path/to/file.txt")

기존 RDD에서 변환

val rdd2 = rdd.map(x => x * 2)  // 기존 RDD를 변환하여 새로운 RDD 생성

RDD의 장점과 단점

장점

  • 직접적인 분산 제어 가능: 사용자 정의 파티셔닝, 커스텀 연산 가능
  • 유연성(Flexibility): 구조화되지 않은 데이터를 다룰 수 있음 (텍스트, 바이너리 파일 등)
  • 장애 복구(Fault-Tolerance): Lineage를 이용해 재생성 가능

단점

  • 메모리 사용 비효율적: DataFrame/Dataset보다 많은 메모리를 사용 (객체 저장)
  • 성능 최적화 부족: Catalyst 옵티마이저, Tungsten이 적용되지 않음
  • 타입 안정성 없음: Row 기반 연산이라 타입 검증이 안 됨

RDD 와 Dataframe 처리 방식 비교

비교 항목RDD 방식DataFrame 방식
데이터 로드textFile()을 사용 (문자열 RDD)read.csv() 사용 (구조화된 데이터)
타입 변환map()을 사용하여 수동 변환자동 스키마 처리 (csv("data.csv"))
필터링filter() 사용 (case (_, age) => age >= 30).filter($"age" >= 30)
출력 방식collect().foreach(println).show() (자동 포맷팅)
  • RDD 사용 예시시
// SparkContext를 이용하여 CSV 파일을 RDD로 로드
val rdd = spark.sparkContext.textFile("data.csv")

// CSV 파일에서 데이터를 파싱하여 (이름, 나이) 튜플로 변환
val parsedRdd = rdd.map(line => {
  val cols = line.split(",")
  (cols(0), cols(1).toInt)  // (이름, 나이)
})

// 30세 이상 필터링
val filteredRdd = parsedRdd.filter { case (_, age) => age >= 30 }

// 결과 출력
filteredRdd.collect().foreach(println)
  • 데이터프레임 방식
// SparkSession을 이용하여 CSV 파일을 DataFrame으로 로드
val df = spark.read.option("header", "true").csv("data.csv")

// 나이를 Integer로 변환 후, 30세 이상 필터링
val filteredDf = df.filter($"age" >= 30)

// 결과 출력
filteredDf.show()

Spark SQL

Spark SQL은 Spark에서 SQL 쿼리를 실행할 수 있도록 지원하는 모듈이며, DataFrame은 Spark SQL의 핵심 데이터 구조이다.
즉, DataFrame은 Spark SQL의 일부이며, Spark SQL은 DataFrame을 SQL 방식으로 조작할 수 있는 기능을 제공한한다.

DataFrame, Spark SQL 비교

비교 항목DataFrameSpark SQL
정의구조화된 데이터의 컬렉션 (RDD의 업그레이드 버전)SQL 문법을 사용하여 데이터를 처리하는 Spark 모듈
API 스타일Scala/Python API (df.filter(), df.select())SQL 쿼리 (spark.sql("SELECT * FROM df"))
사용 목적프로그래밍 방식으로 데이터 처리SQL 쿼리를 사용한 데이터 처리
최적화 지원Catalyst 옵티마이저 적용Catalyst 옵티마이저 적용
연산 방식DataFrame의 함수(select, filter, groupBy) 사용SQL을 DataFrame으로 변환 후 실행
  • Spark SQL 사용 예시
// CSV 파일을 DataFrame으로 로드
val df = spark.read.option("header", "true").csv("data.csv")

// DataFrame을 SQL 테이블로 등록
df.createOrReplaceTempView("people")

// SQL 쿼리 실행
val sqlDf = spark.sql("SELECT name, age FROM people WHERE age >= 30")

// 결과 출력
sqlDf.show()

DataFrame과 Spark SQL의 내부 동작

Spark SQL의 동작 과정

  • spark.sql("SELECT * FROM people") 실행
  • 내부적으로 SQL Parser가 실행되어 DataFrame 연산으로 변환
  • DataFrame API에서 Catalyst 옵티마이저가 최적화 수행
  • Spark 엔진에서 실행

DataFrame API의 동작 과정

  • df.filter($"age" >= 30).select("name", "age") 실행
  • Catalyst 옵티마이저가 실행되어 SQL 논리 플랜으로 변환
  • Spark 엔진에서 실행

언제 DataFrame vs Spark SQL을 사용할까?

Spark SQL을 사용할 때

  • SQL 전문가가 데이터를 다룰 때
  • 복잡한 조인, 서브쿼리 등의 SQL을 사용해야 할 때
  • 다양한 BI 도구(Tableau, Power BI)와 연결할 때

DataFrame을 사용할 때

  • 프로그래밍 방식으로 데이터 처리가 필요할 때
  • 성능 최적화가 중요한 경우 (함수형 API 활용 가능)
  • RDD보다 효율적인 데이터 처리를 원할 때

둘을 함께 사용할 때

  • DataFrame으로 데이터를 조작한 후, SQL을 실행할 때 (createOrReplaceTempView 활용)
  • SQL 기반의 데이터 처리 후, DataFrame API를 활용하여 추가 변환을 수행할 때

데이터세트

데이터 프레임 특징

  • 동적 타입 지원 (타입 안정성이 낮음)
  • 데이터 변환 및 조작이 쉬움
  • 메모리 내에서 연산이 이루어짐 (Spark에서는 Tungsten 최적화 적용)
  • 실행 시점에 타입 체크
  • Spark DataFrame은 Catalyst 옵티마이저를 활용하여 SQL-like 최적화 수행

데이터세트 특징

  • 강한 타입 지원 (타입 안정성이 높음)
  • 객체 형태로 데이터 다룸 (Java/Scala의 case class와 연동 가능)
    • => Dataset에서는 데이터를 Row 타입이 아니라 사용자 정의 클래스로 변환하여 다룰 수 있음
  • 런타임 오류 방지 (컴파일 시점 타입 체크)
  • 메모리 및 네트워크 효율성이 좋음 (직렬화/역직렬화 최적화)
  • 단, Python에서는 지원되지 않음 (Scala/Java만 지원)

DataFrame vs Dataset 비교

비교 항목DataFrame (Spark)Dataset (Spark)
타입 안정성낮음 (동적 타입)높음 (정적 타입)
API 스타일SQL-like (Row 기반)Functional (객체 기반)
성능Catalyst 최적화 적용Tungsten 및 직렬화 최적화 적용
언어 지원Scala, Java, PythonScala, Java
사용 목적SQL, 배치 처리타입 안정성이 중요한 연산

RDD vs DataFrame vs Dataset 비교

비교 항목RDDDataFrameDataset
타입 안정성없음없음있음 (타입 안전)
메모리 효율성낮음 (객체 저장)높음 (Binary Format)높음 (Binary Format + Encoders)
최적화 적용없음Catalyst, TungstenCatalyst, Tungsten
API 스타일함수형 (map, filter)SQL 스타일함수형 + SQL
성능낮음 (직렬화/GC 비용 큼)높음높음

0개의 댓글