🏳️🌈 [궁금한점]
스키마는 데이터프레임의 구조(Structure) 를 설명하는 메타데이터이다.
스키마에는 다음 정보가 포함된다.
CSV 에서 읽은 데이터를 데이터프레임에 넣고 printSchema()함수를 호출하여 스키마를 출력해 볼 수 있다.
val df = spark.read.option("header", "true").csv("data.csv")
// 스키마 확인
df.printSchema()
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- salary: double (nullable = true)
데이터프레임(DataFrame)은 스키마를 가진 데이터 컬렉션 (RDD보다 구조화된 데이터)이다.즉, 스키마가 적용된 RDD라고 볼 수 있으며, SQL과 유사한 방식으로 데이터를 다룰 수 있다.
소스를 확인하면 RDD와 스키마의 입력으로 데이터프레임을 만들고 있다. show() 함수는 RDBMS의 테이블 처럼 데이터프레임의 내용을을 출력해 준다.
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
// 스키마 정의
val schema = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true),
StructField("salary", DoubleType, true)
))
// 데이터를 Row 객체로 변환
val data = Seq(
Row("Alice", 25, 50000.0),
Row("Bob", 30, 60000.0)
)
// RDD에서 DataFrame 생성
val rdd = spark.sparkContext.parallelize(data)
val df = spark.createDataFrame(rdd, schema)
// 데이터 확인
df.show()
+-----+---+-------+
| name|age|salary |
+-----+---+-------+
|Alice| 25|50000.0|
| Bob| 30|60000.0|
+-----+---+-------+
RDD (Resilient Distributed Dataset)는 Apache Spark의 핵심 데이터 구조로, 불변(Immutable)하며 분산(Distributed)된 데이터 컬렉션이다. Spark에서 데이터를 분산 처리하고, 병렬 연산을 수행하는 기본 단위로 사용된다.
DataFrame과 Dataset이 더 최적화되어 있으므로 기본적으로는 RDD를 사용하지 않는 것이 좋다. 하지만 다음과 같은 경우에는 RDD가 유용할 수 있다.
Resilient (복구 가능)
Distributed (분산)
Immutable (불변성)
Lazy Evaluation (지연 실행)
Fault-Tolerant (장애 복구 가능)
RDD는 두 가지 연산을 지원합니다.
Transformation (변환 연산, Lazy Evaluation)
val rdd = spark.sparkContext.parallelize(Seq(1, 2, 3, 4, 5))
val squaredRdd = rdd.map(x => x * x) // Transformation (즉시 실행되지 않음)
Action (실행 연산, 즉시 실행)
val result = squaredRdd.collect() // Action (여기서 실제 실행됨)
println(result.mkString(", ")) // 출력: 1, 4, 9, 16, 25
RDD는 세 가지 방식으로 생성할 수 있습니다.
기존 데이터에서 생성
val data = Seq(1, 2, 3, 4, 5)
val rdd = spark.sparkContext.parallelize(data)
외부 파일에서 생성 (HDFS, S3, 로컬 파일)
val rdd = spark.sparkContext.textFile("hdfs://path/to/file.txt")
기존 RDD에서 변환
val rdd2 = rdd.map(x => x * 2) // 기존 RDD를 변환하여 새로운 RDD 생성
장점
단점
| 비교 항목 | RDD 방식 | DataFrame 방식 |
|---|---|---|
| 데이터 로드 | textFile()을 사용 (문자열 RDD) | read.csv() 사용 (구조화된 데이터) |
| 타입 변환 | map()을 사용하여 수동 변환 | 자동 스키마 처리 (csv("data.csv")) |
| 필터링 | filter() 사용 (case (_, age) => age >= 30) | .filter($"age" >= 30) |
| 출력 방식 | collect().foreach(println) | .show() (자동 포맷팅) |
// SparkContext를 이용하여 CSV 파일을 RDD로 로드
val rdd = spark.sparkContext.textFile("data.csv")
// CSV 파일에서 데이터를 파싱하여 (이름, 나이) 튜플로 변환
val parsedRdd = rdd.map(line => {
val cols = line.split(",")
(cols(0), cols(1).toInt) // (이름, 나이)
})
// 30세 이상 필터링
val filteredRdd = parsedRdd.filter { case (_, age) => age >= 30 }
// 결과 출력
filteredRdd.collect().foreach(println)
// SparkSession을 이용하여 CSV 파일을 DataFrame으로 로드
val df = spark.read.option("header", "true").csv("data.csv")
// 나이를 Integer로 변환 후, 30세 이상 필터링
val filteredDf = df.filter($"age" >= 30)
// 결과 출력
filteredDf.show()
Spark SQL은 Spark에서 SQL 쿼리를 실행할 수 있도록 지원하는 모듈이며, DataFrame은 Spark SQL의 핵심 데이터 구조이다.
즉, DataFrame은 Spark SQL의 일부이며, Spark SQL은 DataFrame을 SQL 방식으로 조작할 수 있는 기능을 제공한한다.
| 비교 항목 | DataFrame | Spark SQL |
|---|---|---|
| 정의 | 구조화된 데이터의 컬렉션 (RDD의 업그레이드 버전) | SQL 문법을 사용하여 데이터를 처리하는 Spark 모듈 |
| API 스타일 | Scala/Python API (df.filter(), df.select()) | SQL 쿼리 (spark.sql("SELECT * FROM df")) |
| 사용 목적 | 프로그래밍 방식으로 데이터 처리 | SQL 쿼리를 사용한 데이터 처리 |
| 최적화 지원 | Catalyst 옵티마이저 적용 | Catalyst 옵티마이저 적용 |
| 연산 방식 | DataFrame의 함수(select, filter, groupBy) 사용 | SQL을 DataFrame으로 변환 후 실행 |
// CSV 파일을 DataFrame으로 로드
val df = spark.read.option("header", "true").csv("data.csv")
// DataFrame을 SQL 테이블로 등록
df.createOrReplaceTempView("people")
// SQL 쿼리 실행
val sqlDf = spark.sql("SELECT name, age FROM people WHERE age >= 30")
// 결과 출력
sqlDf.show()
Spark SQL의 동작 과정
DataFrame API의 동작 과정
Spark SQL을 사용할 때
DataFrame을 사용할 때
둘을 함께 사용할 때
데이터 프레임 특징
데이터세트 특징
| 비교 항목 | DataFrame (Spark) | Dataset (Spark) |
|---|---|---|
| 타입 안정성 | 낮음 (동적 타입) | 높음 (정적 타입) |
| API 스타일 | SQL-like (Row 기반) | Functional (객체 기반) |
| 성능 | Catalyst 최적화 적용 | Tungsten 및 직렬화 최적화 적용 |
| 언어 지원 | Scala, Java, Python | Scala, Java |
| 사용 목적 | SQL, 배치 처리 | 타입 안정성이 중요한 연산 |
| 비교 항목 | RDD | DataFrame | Dataset |
|---|---|---|---|
| 타입 안정성 | 없음 | 없음 | 있음 (타입 안전) |
| 메모리 효율성 | 낮음 (객체 저장) | 높음 (Binary Format) | 높음 (Binary Format + Encoders) |
| 최적화 적용 | 없음 | Catalyst, Tungsten | Catalyst, Tungsten |
| API 스타일 | 함수형 (map, filter) | SQL 스타일 | 함수형 + SQL |
| 성능 | 낮음 (직렬화/GC 비용 큼) | 높음 | 높음 |