<6월 15일 9-11시 학습한거 다시 보기>
카탈로그 > 스키마 > 테이블에서 '계보(Lineage)' 탭에서 업스트림(원본), 다운스트림(활용처)
Data lake: 온갖 다양한 원시 데이터를 저장하는 호수와 같은 공간
<-> Data warehouse: 정형 데이터 보관
정제된 데이터 저장할 때 (silver) 파티셔닝 전략 :
전체 덮어쓰기(overwrite)는 간단하지만 비효율적. 실무에서는 새로운 데이터만 처리하는 증분 방식이 일반적.
새 레코드를 기존 테이블에 추가. processing_date 같은 타임스탬프 컬럼으로 새 데이터를 식별.
df_new.write.format("delta") \
.mode("append") \
.saveAsTable(f"{CATALOG}.{SCHEMA}.bronze_diamonds")
존재하는 레코드는 업데이트, 새 레코드는 삽입. CDC(Change Data Capture) 시나리오에 이상적.
MERGE INTO target_table t
USING source_table s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
| 모드 | 언제 쓰나 | 특징 |
|---|---|---|
overwrite | 초기 적재, 실습 | 전체 재작성, 단순함 |
append | 로그, 이벤트 데이터 | 기존 유지, 중복 주의 |
merge | CDC, 마스터 데이터 | 가장 정교함 |
자주 필터링되는 컬럼을 파티션 키로 선택하여 불필요한 데이터 스캔을 줄임.
여러 번 사용되는 DataFrame은 cache() 또는 persist()로 메모리에 저장.
df.cache() # 메모리
df.persist() # 메모리 + 디스크 (대용량 적합)
df.unpersist() # 사용 후 해제
작은 테이블은 broadcast()를 사용하여 모든 노드에 복제하면 셔플을 피할 수 있음.
from pyspark.sql.functions import broadcast
df_result = df_large.join(broadcast(df_small), "key")
여기서 JOIN을 하려면 같은 key를 가진 데이터가 같은 노드에 있어야 합니다. 근데 지금은 흩어져 있으니까 네트워크로 데이터를 주고받아야 합니다. 이 과정이 셔플입니다.
노드 1 → 노드 2로 데이터 전송 ← 느림 😓
노드 3 → 노드 1로 데이터 전송 ← 느림 😓
브로드캐스트 조인은 어떻게 다르냐면
작은 테이블을 모든 노드에 통째로 복사해버립니다.
브로드캐스트 전:
노드 1: 주문 데이터만 있음
노드 2: 주문 데이터만 있음
노드 3: 주문 데이터만 있음
브로드캐스트 후:
노드 1: 주문 데이터 + 고객 테이블(복사본) ← 로컬에서 바로 JOIN
노드 2: 주문 데이터 + 고객 테이블(복사본) ← 로컬에서 바로 JOIN
노드 3: 주문 데이터 + 고객 테이블(복사본) ← 로컬에서 바로 JOIN
각 노드가 자기 데이터랑 로컬에 있는 복사본이랑 바로 JOIN하면 되니까 노드 간 데이터 전송(셔플)이 없어집니다.
필요한 컬럼만 select하여 메모리 사용량과 I/O를 최소화.
df = spark.read.table("diamonds").select("cut", "price", "carat")
가능한 한 일찍 filter를 적용하여 처리할 데이터 양을 줄임.
df = spark.read.table("diamonds").filter("cut = 'Ideal'").select("price")