| 방향 | 인터페이스 | 접근 방법 |
|---|---|---|
| 파일 → DataFrame | DataFrameReader | spark.read |
| DataFrame → 파일/테이블 | DataFrameWriter | df.write |
비유: Reader는 "데이터를 가져오는 택배 수령", Writer는 "데이터를 보내는 택배 발송"
df = (spark
.read
.option("옵션이름", 값) # 옵션 추가 (체이닝 가능)
.schema(스키마) # 스키마 직접 지정 (선택)
.csv("파일경로") # 파일 형식 메서드로 마무리
)
.option() 체이닝users_df = (spark
.read
.option("sep", "\t") # 구분자 (기본값은 쉼표 ",")
.option("header", True) # 첫 줄을 컬럼명으로 사용
.option("inferSchema", True) # 타입 자동 추론
.csv("파일경로")
)
users_df = (spark
.read
.csv("파일경로", sep="\t", header=True, inferSchema=True)
)
두 방법은 동일한 결과. 취향에 따라 선택
| 옵션 키 | 설명 | 예시 값 |
|---|---|---|
"sep" | 구분자 | ",", "\t", `" |
"header" | 첫 줄 = 컬럼명 여부 | True / False |
"inferSchema" | 데이터 타입 자동 추론 | True / False |
⚠️
inferSchema=True는 전체 데이터를 한 번 스캔하므로 느림. 대용량이면 스키마 직접 지정 권장
events_df = (spark
.read
.option("inferSchema", True) # 타입 자동 추론
.json("파일경로")
)
비유: 테이블의 설계도를 미리 그려서 "이 건물은 이렇게 지어야 해"라고 Spark에게 알려주는 것
from pyspark.sql.types import (
LongType, StringType, DoubleType, IntegerType,
ArrayType, StructType, StructField
)
스키마 = StructType([
StructField("컬럼명", 데이터타입(), nullable여부),
StructField("컬럼명", 데이터타입(), nullable여부),
...
])
| 인자 | 설명 |
|---|---|
| 첫 번째 | 컬럼 이름 (문자열) |
| 두 번째 | 데이터 타입 (아래 타입 표 참고) |
| 세 번째 | True = null 허용, False = null 불허 |
| 타입 | 설명 | 예시 |
|---|---|---|
StringType() | 문자열 | "hello" |
LongType() | 큰 정수 (64bit) | 123456789 |
IntegerType() | 일반 정수 (32bit) | 42 |
DoubleType() | 소수 (64bit) | 3.14 |
StructType([...]) | 중첩 구조 (object 안에 object) | {"city": "Seoul"} |
ArrayType(타입) | 배열 | ["a", "b", "c"] |
schema = StructType([
StructField("user_id", StringType(), True),
StructField("user_first_touch_timestamp", LongType(), True),
StructField("email", StringType(), True)
])
df = spark.read.option("sep", "\t").option("header", True).schema(schema).csv("경로")
schema = StructType([
StructField("device", StringType(), True),
StructField("geo", StructType([ # ← 중첩 struct
StructField("city", StringType(), True),
StructField("state", StringType(), True)
]), True),
StructField("items", ArrayType( # ← 배열 안에 struct
StructType([
StructField("item_id", StringType(), True),
StructField("quantity", LongType(), True)
])
), True)
])
더 짧고 간단한 대안. SQL의 CREATE TABLE 문법과 유사
ddl_schema = "user_id string, user_first_touch_timestamp long, email string"
df = spark.read.option("sep", "\t").option("header", True).schema(ddl_schema).csv("경로")
목적: 복잡한 JSON의 스키마를 처음부터 손으로 쓰기 어려울 때, Spark가 자동 추론한 스키마를 DDL 문자열로 뽑아주는 편법
⚠️ 개발/탐색 단계에서만 사용. 운영 환경에서는 금지 (inferSchema는 전체 데이터를 읽어서 느림)
# Step 1: Python에서 경로를 Spark config로 공유
spark.conf.set("com.scope.events_path", "파일경로")
// Step 2: Scala 셀에서 실행 (%scala 마법 명령어 필요)
val path = spark.conf.get("com.scope.events_path")
val schema = spark.read.option("inferSchema", true).json(path).schema.toDDL
println(schema)
# Step 3: 출력된 DDL 문자열을 Python 변수에 붙여넣기
events_schema = "`device` STRING, `event_name` STRING, ..." # 복사한 값
df = spark.read.schema(events_schema).json("경로")
(df
.write
.option("옵션이름", 값) # 선택
.mode("쓰기 모드") # 중요!
.저장메서드("경로 또는 테이블명")
)
(users_df
.write
.option("compression", "snappy") # 압축 방식
.mode("overwrite") # 기존 파일 덮어쓰기
.parquet("저장경로")
)
# 또는 메서드에 직접 인자 전달
(users_df
.write
.parquet("저장경로", compression="snappy", mode="overwrite")
)
| 값 | 설명 |
|---|---|
"overwrite" | 기존 데이터 삭제 후 새로 씀 |
"append" | 기존 데이터에 추가 |
"ignore" | 이미 있으면 아무것도 안 함 |
"error" | 이미 있으면 오류 발생 (기본값) |
events_df.write.mode("overwrite").saveAsTable("테이블명")
createOrReplaceTempView vs saveAsTable 차이
| createOrReplaceTempView | saveAsTable | |
|---|---|---|
| 범위 | 현재 세션만 (로컬) | 전체 작업공간 (전역) |
| 영속성 | 세션 종료 시 사라짐 | 영구 저장 |
(events_df
.write
.format("delta") # 저장 형식을 Delta로 지정
.mode("overwrite")
.save("저장경로")
)
비유: 일반 Parquet은 그냥 파일 보관함, Delta Lake는 "버전 관리되는 구글 드라이브"
| 기능 | 설명 |
|---|---|
| ACID 트랜잭션 | 중간에 실패해도 데이터 깨지지 않음 |
| Time Travel | 과거 버전 데이터 조회 가능 |
| 스키마 적용 | 잘못된 형식의 데이터 자동 차단 |
| 스트리밍 + 배치 통합 | 실시간/일괄 처리 모두 지원 |
| Parquet 기반 | Spark API와 호환 |
Databricks에서는 거의 항상 Delta Lake 사용 권장
# 스키마 확인
df.printSchema()
# 데이터 미리보기
display(df) # Databricks 전용 (테이블 형태)
df.show(5) # 일반 Spark (텍스트 형태)
# 저장된 파일 목록 확인
display(dbutils.fs.ls("저장경로"))
CSV/JSON 파일
↓ spark.read.option().schema().csv() / .json()
DataFrame (메모리 내 처리)
↓ df.write.mode().parquet() / .saveAsTable() / .format("delta").save()
Parquet 파일 / 테이블 / Delta 테이블