
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, lit, array, rand, when, col
# Creating Spark Session
# Spark 3.5 with Iceberg 1.5.0
DW_PATH='/iceberg/spark/warehouse'
spark = SparkSession.builder \
.master("local[4]") \
.appName("iceberg-poc") \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0')\
.config('spark.sql.extensions','org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')\
.config('spark.sql.catalog.local','org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.local.type','hadoop') \
.config('spark.sql.catalog.local.warehouse',DW_PATH) \
.getOrCreate()
t1 = spark.range(30000).withColumn("year",
when(col("id") <= 10000, lit(2023))\
.when(col("id").between(10001, 15000), lit(2024))\
.otherwise(lit(2025))
)
t1 = t1.withColumn("business_vertical", array(
lit("Retail"),
lit("SME"),
lit("Cor"),
lit("Analytics")
).getItem((rand()*4).cast("int")))\
.withColumn("is_updated", lit(False))
# Creating a table to store employee's business vertical info
TGT_TBL = "local.db.emp_bv_details"
t1.coalesce(1).writeTo(TGT_TBL).partitionedBy('year').using('iceberg')\
.tableProperty('format-version','2')\
.tableProperty('write.delete.mode','merge-on-read')\
.tableProperty('write.update.mode','merge-on-read')\
.tableProperty('write.merge.mode','merge-on-read')\
.create()
emp_bv_details 테이블 생성# New department created called Sales and 3000 employees switched in 2025
updated_records = spark.range(15000, 18001).withColumn("year", lit(2025)).withColumn("business_vertical", lit("Sales"))
STG_TBL = "local.db.emp_bv_updates"
updated_records.coalesce(1).writeTo(STG_TBL).partitionedBy('year').using('iceberg')\
.tableProperty('format-version','2')\
.create()
Sales로 변경spark.sql(f"""
MERGE INTO {TGT_TBL} as tgt
USING (Select *, False as is_updated from {STG_TBL}) as src
ON tgt.id = src.id
WHEN MATCHED AND src.business_vertical <> tgt.business_vertical AND tgt.year = src.year THEN
UPDATE SET tgt.is_updated = True, tgt.business_vertical = src.business_vertical
WHEN NOT MATCHED THEN
INSERT *
""")


WHEN MATCHED및 WHEN NOT MATCHED절을 함께 적용하여 테이블의 새 상태를 계산즉 셔플이 많은 비용이 든다.

WHEN MATCHED나 WHEN NOT MATCHED에 있는 필터는 푸시다운되지 않는다.-- Adding filter condition that will be pushed down
MERGE INTO TARGET_TABLE tgt
USING STAGE_TABLE stg
ON tgt.id = stg.id
AND tgt.year = 2025 -- All filters in ON Clause are pushed down.
WHEN MATCHED ...
WHEN NOT MATCHED ...

spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")

spark.sql.join.preferSortMergeJoin 설정값에 의해 제어되며 기본값은 true# Setting Table Property to enable fanout-writers
spark.sql(f"""ALTER TABLE {TGT_TBL} SET TBLPROPERTIES (
'write.spark.fanout.enabled'='true'
)""")

로컬 정렬(Local Sorting)이란?
Spark에서 데이터를 쓰기 전에 같은 파티션 값끼리 묶기 위해 로컬 정렬을 수행
- 배치(Batch) 작업에서는 비교적 영향이 적지만
- 스트리밍(Streaming) 작업에서는 지연(latency)을 유발한다.
# Setting Distribution mode in SparkSession.
# This overwrites the Distribution mode mentioned in TBLPROPERTIES
spark.conf.set("spark.sql.iceberg.distribution-mode", "None")

None: 분산 요청 없이 Spark가 받은 그대로 데이터를 씀
- 가장 빠름
- 작은 파일이 많이 생길 수 있음
hash: Hash 기반으로 Shuffle 후 작성
- 보통
- 균형 잡힌 파일
range: Range 기반으로 정렬 후 Shuffle 후 작성
- 가장 느림
- 정렬된 큰 파일 생성 가능
테이블에 데이터를 쓸 때 Exchange 노드를 제거하는 것은 단순한 최적화가 아닌,
쓰기 성능과 읽기 성능 사이의 균형(trade-off) 을 고려한 결정
distribution-mode = none
MOR 테이블에서는 더 심각해짐
none 모드가 유용한 상황
한 파티션만 쓰는 경우
Compaction을 자주 수행할 수 있을 때
쓰기 성능이 더 중요한 경우
# Enabling SPJ
spark.conf.set('spark.sql.sources.v2.bucketing.enabled','true')
spark.conf.set('spark.sql.sources.v2.bucketing.pushPartValues.enabled','true')
spark.conf.set('spark.sql.iceberg.planning.preserve-data-grouping','true')
spark.conf.set('spark.sql.requireAllClusterKeysForCoPartition','false')
spark.conf.set('spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled','true')
MERGE INTO TARGET_TABLE tgt
USING STAGE_TABLE stg
ON tgt.id = stg.id
AND tgt.year = 2025
AND tgt.year = src.year -- joining on partition columns to enable SPJ
WHEN MATCHED ...
WHEN NOT MATCHED ...

# Updating some employees departments after setting some configs for optimization
updated_records = spark.range(28000, 35001)\
.withColumn("year", lit(2025))\
.withColumn("business_vertical", lit("DataEngineering"))
updated_records.coalesce(1).writeTo(STG_TBL).overwritePartitions()
# To avoid sort due to Sort Merge Join by prefering Hash Join if possible.
spark.conf.set('spark.sql.join.preferSortMergeJoin', 'false')
# To avoid Shuffle before writing into table.
spark.conf.set("spark.sql.iceberg.distribution-mode", "None")
# Enabling SPJ
spark.conf.set('spark.sql.sources.v2.bucketing.enabled','true')
spark.conf.set('spark.sql.sources.v2.bucketing.pushPartValues.enabled','true')
spark.conf.set('spark.sql.iceberg.planning.preserve-data-grouping','true')
spark.conf.set('spark.sql.requireAllClusterKeysForCoPartition','false')
spark.conf.set('spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled','true')
# To avoid Pre-Sorting before writing into table
spark.sql(f"""ALTER TABLE {TGT_TBL} SET TBLPROPERTIES (
'write.spark.fanout.enabled'='true'
)""")
spark.sql(f"""
MERGE INTO {TGT_TBL} as tgt
USING (SELECT *, False as is_updated from {STG_TBL}) as src
ON tgt.id = src.id AND tgt.year = src.year AND tgt.year = 2025
WHEN MATCHED AND
tgt.business_vertical <> src.business_vertical
THEN
UPDATE SET tgt.is_updated = True, tgt.business_vertical = src.business_vertical
WHEN NOT MATCHED THEN
INSERT *
""")

year = 2025 로 push down filter 추가year 파티션 컬럼을 SPJ를 위해 join에 추가
Exchange와 Sort 도 사라진 것 확인
