데이터 레이크 위에 Lakehouse 아키텍처를 구축할 수 있는 오픈소스 프로젝트
이미지 출처:delta.io
Bronze
Silver
Gold
전반적인 성능
TPC-DS는 DS(Decision Supporting) 시스템에 최적화 된 데이터셋과 99개의 쿼리를 갖고 있으며 이를 통한 측정하는 표준 벤치 마크
출처) https://databeans-blogs.medium.com/delta-vs-iceberg-vs-hudi-reassessing-performance-cb8157005eb0
환경 : Ubuntu 22.04
개발언어 : Python 3.10.6 , Java-1.17.0-openjdk
# sudo apt update
# sudo apt install openjdk-17-jdk
# vi .bashrc
JAVA_HOME='/usr/lib/jvm/java-1.17.0-openjdk-amd64'
PATH=$PATH:$JAVA_HOME/bin
# source ~/.bashrc
# echo $PATH
/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-1.17.0-openjdk-amd64/bin
# pip install delta-spark==2.2.0
• delta.compatibility.symlinkFormatManifest.enabled=true
으로 설정해주면 데이터 insert가 발생할때마다 뒤에서 설명할 manifest 파일을 업데이트 할 수 있습니다.
import pyspark
from delta import *
builder = pyspark.sql.SparkSession.builder.appName("MyApp3") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# Log Level
spark.sparkContext.setLogLevel("WARN") # ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
# Talbe path and name
data_path = "/home/user/data/delta-table-create"
# Create Table
# data = spark.range(0, 5)
# data.write.format("delta").save(data_path)
# Create Table(SQL)
create_sql = "CREATE TABLE IF NOT EXISTS delta.`/home/user/data/delta-table-create` ( \
`key` STRING, \
`value` STRING, \
`topic` STRING, \
`timestamp` TIMESTAMP, \
`date` STRING \
) \
USING DELTA \
PARTITIONED BY (date) \
LOCATION '/home/user/data/delta-table-create' \
TBLPROPERTIES ( \
'delta.compatibility.symlinkFormatManifest.enabled'='true' \
)"
spark_created_sql = spark.sql(create_sql)
df = spark.read.format("delta").load(data_path)
df.show()
아래와 같이 테이블 생성 확인됨
import pyspark
from delta import *
builder = pyspark.sql.SparkSession.builder.appName("MyApp3") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# Log Level
spark.sparkContext.setLogLevel("WARN") # ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
# Talbe path and name
data_path = "/home/user/data/delta-table-quickstart"
#Create Table
data = spark.range(0, 5)
data.write.format("delta").save(data_path)
# Read Data
df = spark.read.format("delta").load(data_path)
df.show()
# Update Data
data = spark.range(30, 40)
data.write.format("delta").mode("overwrite").save(data_path)
read_df = spark.read.format("delta").load(data_path)
read_df.show()
아래와 같이 최초 생성된 데이터와 업데이터 된 데이터를 확인 할 수 있음
import pyspark
from delta import *
builder = pyspark.sql.SparkSession.builder.appName("MyApp3") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# Log Level
spark.sparkContext.setLogLevel("WARN") # ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
# Talbe path and name
data_path = "/home/user/data/delta-table-timetravel"
#Create Table
data = spark.range(0, 5)
data.write.format("delta").save(data_path)
# History select
history = spark.sql("DESCRIBE HISTORY delta.`/home/user/data/delta-table-timetravel`")
latest_version = history.selectExpr("max(version)").collect()
print("latest_version >>>>>>>>>>>>>>>: {}".format(latest_version))
# Read Data
print("########### current ###########")
df = spark.read.format("delta").load(data_path)
df.show()
print("########### version 0 ###########")
df0 = spark.read.format("delta").option("versionAsOf", 0).load(data_path)
df0.show()
# Update Data
data = spark.range(30, 40)
data.write.format("delta").mode("overwrite").save(data_path)
print("########### version 0 ###########")
updatedf = spark.read.format("delta").option("versionAsOf", 0).load(data_path)
updatedf.show()
print("########### current version 1 ###########")
updatedf1 = spark.read.format("delta").option("versionAsOf", 1).load(data_path)
updatedf1.show()
print("########### current ###########")
df1 = spark.read.format("delta").load(data_path)
df1.show()
history = spark.sql("DESCRIBE HISTORY delta.`/home/user/data/delta-table-timetravel`")
latest_version = history.selectExpr("max(version)").collect()
print("latest_version >>>>>>>>>>>>>>>: {}".format(latest_version))
from delta.tables import *
from pyspark.sql.functions import *
import pyspark
from delta import *
builder = pyspark.sql.SparkSession.builder.appName("MyApp2") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("WARN") # ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
data_path = "/home/user/data/delta-table-quickstart"
print("################ origin data ################")
deltaTable = DeltaTable.forPath(spark, data_path)
deltaTable.toDF().show()
print("################ update data ################")
# Update every even value by adding 100 to it
deltaTable.update(
condition = expr("id % 2 == 0"),
set = { "id": expr("id + 100") })
deltaTable.toDF().show()
print("################ delete data ################")
# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF().show()
# Upsert (merge) new data
newData = spark.range(0, 20)
print("################ update insert data ################")
deltaTable.alias("oldData") \
.merge(
newData.alias("newData"),
"oldData.id = newData.id") \
.whenMatchedUpdate(set = { "id": col("newData.id") }) \
.whenNotMatchedInsert(values = { "id": col("newData.id") }) \
.execute()
deltaTable.toDF().show()
아래와 같이 읽어온 데이테를 업데이트, 삭제, 변경, 입력이 되었음
update, insert 에서 기존 행이 업데이트되고 새 행이 삽입된 것을 확인할 수 있습니다.