PySpark 4 - Reader & Writer

no-glass-otacku·7일 전

MS data school

목록 보기
22/25

📌 핵심 개념 한눈에 보기

방향인터페이스접근 방법
파일 → DataFrameDataFrameReaderspark.read
DataFrame → 파일/테이블DataFrameWriterdf.write

비유: Reader는 "데이터를 가져오는 택배 수령", Writer는 "데이터를 보내는 택배 발송"


1. DataFrameReader — 데이터 읽기

기본 패턴

df = (spark
      .read
      .option("옵션이름",)   # 옵션 추가 (체이닝 가능)
      .schema(스키마)           # 스키마 직접 지정 (선택)
      .csv("파일경로")          # 파일 형식 메서드로 마무리
     )

1-1. CSV 읽기

방법 A: .option() 체이닝

users_df = (spark
            .read
            .option("sep", "\t")        # 구분자 (기본값은 쉼표 ",")
            .option("header", True)     # 첫 줄을 컬럼명으로 사용
            .option("inferSchema", True) # 타입 자동 추론
            .csv("파일경로")
           )

방법 B: csv() 메서드에 직접 인자 전달

users_df = (spark
            .read
            .csv("파일경로", sep="\t", header=True, inferSchema=True)
           )

두 방법은 동일한 결과. 취향에 따라 선택

주요 option 인자 정리

옵션 키설명예시 값
"sep"구분자",", "\t", `"
"header"첫 줄 = 컬럼명 여부True / False
"inferSchema"데이터 타입 자동 추론True / False

⚠️ inferSchema=True는 전체 데이터를 한 번 스캔하므로 느림. 대용량이면 스키마 직접 지정 권장


1-2. JSON 읽기

events_df = (spark
             .read
             .option("inferSchema", True)  # 타입 자동 추론
             .json("파일경로")
            )

1-3. 스키마 직접 정의 — StructType 방식

비유: 테이블의 설계도를 미리 그려서 "이 건물은 이렇게 지어야 해"라고 Spark에게 알려주는 것

Import 먼저

from pyspark.sql.types import (
    LongType, StringType, DoubleType, IntegerType,
    ArrayType, StructType, StructField
)

기본 구조

스키마 = StructType([
    StructField("컬럼명", 데이터타입(), nullable여부),
    StructField("컬럼명", 데이터타입(), nullable여부),
    ...
])
인자설명
첫 번째컬럼 이름 (문자열)
두 번째데이터 타입 (아래 타입 표 참고)
세 번째True = null 허용, False = null 불허

주요 데이터 타입

타입설명예시
StringType()문자열"hello"
LongType()큰 정수 (64bit)123456789
IntegerType()일반 정수 (32bit)42
DoubleType()소수 (64bit)3.14
StructType([...])중첩 구조 (object 안에 object){"city": "Seoul"}
ArrayType(타입)배열["a", "b", "c"]

사용 예시 (단순 CSV용)

schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("user_first_touch_timestamp", LongType(), True),
    StructField("email", StringType(), True)
])

df = spark.read.option("sep", "\t").option("header", True).schema(schema).csv("경로")

중첩 구조 예시 (JSON용 — StructType 안에 StructType)

schema = StructType([
    StructField("device", StringType(), True),
    StructField("geo", StructType([           # ← 중첩 struct
        StructField("city", StringType(), True),
        StructField("state", StringType(), True)
    ]), True),
    StructField("items", ArrayType(           # ← 배열 안에 struct
        StructType([
            StructField("item_id", StringType(), True),
            StructField("quantity", LongType(), True)
        ])
    ), True)
])

1-4. 스키마 직접 정의 — DDL 문자열 방식

더 짧고 간단한 대안. SQL의 CREATE TABLE 문법과 유사

ddl_schema = "user_id string, user_first_touch_timestamp long, email string"

df = spark.read.option("sep", "\t").option("header", True).schema(ddl_schema).csv("경로")

1-5. toDDL 트릭 — 스키마 자동 생성 (개발용)

목적: 복잡한 JSON의 스키마를 처음부터 손으로 쓰기 어려울 때, Spark가 자동 추론한 스키마를 DDL 문자열로 뽑아주는 편법

⚠️ 개발/탐색 단계에서만 사용. 운영 환경에서는 금지 (inferSchema는 전체 데이터를 읽어서 느림)

# Step 1: Python에서 경로를 Spark config로 공유
spark.conf.set("com.scope.events_path", "파일경로")
// Step 2: Scala 셀에서 실행 (%scala 마법 명령어 필요)
val path = spark.conf.get("com.scope.events_path")
val schema = spark.read.option("inferSchema", true).json(path).schema.toDDL
println(schema)
# Step 3: 출력된 DDL 문자열을 Python 변수에 붙여넣기
events_schema = "`device` STRING, `event_name` STRING, ..."  # 복사한 값

df = spark.read.schema(events_schema).json("경로")

2. DataFrameWriter — 데이터 쓰기

기본 패턴

(df
 .write
 .option("옵션이름",)   # 선택
 .mode("쓰기 모드")        # 중요!
 .저장메서드("경로 또는 테이블명")
)

2-1. 파일로 저장 — Parquet

(users_df
 .write
 .option("compression", "snappy")  # 압축 방식
 .mode("overwrite")                # 기존 파일 덮어쓰기
 .parquet("저장경로")
)

# 또는 메서드에 직접 인자 전달
(users_df
 .write
 .parquet("저장경로", compression="snappy", mode="overwrite")
)

mode() 옵션 정리

설명
"overwrite"기존 데이터 삭제 후 새로 씀
"append"기존 데이터에 추가
"ignore"이미 있으면 아무것도 안 함
"error"이미 있으면 오류 발생 (기본값)

2-2. 테이블로 저장 — saveAsTable

events_df.write.mode("overwrite").saveAsTable("테이블명")

createOrReplaceTempView vs saveAsTable 차이

createOrReplaceTempViewsaveAsTable
범위현재 세션만 (로컬)전체 작업공간 (전역)
영속성세션 종료 시 사라짐영구 저장

2-3. Delta 테이블로 저장

(events_df
 .write
 .format("delta")     # 저장 형식을 Delta로 지정
 .mode("overwrite")
 .save("저장경로")
)

3. Delta Lake — 왜 써야 하나?

비유: 일반 Parquet은 그냥 파일 보관함, Delta Lake는 "버전 관리되는 구글 드라이브"

기능설명
ACID 트랜잭션중간에 실패해도 데이터 깨지지 않음
Time Travel과거 버전 데이터 조회 가능
스키마 적용잘못된 형식의 데이터 자동 차단
스트리밍 + 배치 통합실시간/일괄 처리 모두 지원
Parquet 기반Spark API와 호환

Databricks에서는 거의 항상 Delta Lake 사용 권장


4. 결과 확인 방법

# 스키마 확인
df.printSchema()

# 데이터 미리보기
display(df)         # Databricks 전용 (테이블 형태)
df.show(5)          # 일반 Spark (텍스트 형태)

# 저장된 파일 목록 확인
display(dbutils.fs.ls("저장경로"))

5. 전체 흐름 요약

CSV/JSON 파일
    ↓  spark.read.option().schema().csv() / .json()
DataFrame (메모리 내 처리)
    ↓  df.write.mode().parquet() / .saveAsTable() / .format("delta").save()
Parquet 파일 / 테이블 / Delta 테이블
profile
이제 개발해야지...

0개의 댓글