Databricks는 데이터 처리 프레임워크인 Apache Spark와 파일 포멧 Apache Parquet 등을 개발해오면서 클라우드 환경에 적합한 data lake란 무엇인지 다음과 같이 고민해왔다고 합니다.
이러한 문제들을 해결해나가다보니 Delta Lake라는 훌륭한 오픈소스 스토리지가 탄생하였으며, 이를 Delta Lake를 사용해야할 5가지 이유와 함께 Spark Submit에서 발표한 적이 있습니다. [링크]
또한 Delta Lake는 데이터가 변화한 transaction log를 기록하고 있기 때문에, ACID에 의거한 read/write가 가능하다는 장점이 있습니다. 즉 스트리밍으로 데이터가 쌓이고 있는 중에도 사용자는 안정적으로 데이터를 다룰 수 있습니다. [참고]
제가 delta lake를 도입한 환경은 다음과 같습니다.
Delta lake를 도입하여 얻고자 했던 이점은 다음과 같습니다.
이제 본론으로 넘어가 delta lake를 도입하기 위해 코드레벨에서 필요한 작업들을 이야기해보겠습니다.
먼저 Maven에서 적절한 버전의 delta-core를 다운로드 받습니다.
curl "https://repo1.maven.org/maven2/io/delta/delta-core_2.12/1.0.0/delta-core_2.12-1.0.0.jar" -O
spark-submit시 다운로드 받은 jar파일도 함께 제출해줍니다.
spark-submit --master yarn --deploy-mode cluster \
--jars "delta-core_2.12-1.0.0.jar" \
...
delta-spark
라이브러리를 사용하고 싶다면, —py-files 옵션으로도 jar파일을 넘겨줍니다.spark.SparkContext.addPyFile()
로 해당 jar파일 경로를 넣어주면 importing이 가능합니다.builder = SparkSession.builder \
.config(conf=SparkConf()) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.enableHiveSupport()
spark = builder.getOrCreate()
spark.sparkContext.addPyFile("s3://ridi-emr/jars/delta-core_2.12-1.0.0.jar")
Delta Table를 조작하기 위해 Spark Session을 생성하면서 Spark SQL DDL Extension를 추가해줍니다.
from pyspark.sql import SparkSession
builder = SparkSession.builder \
.config(conf=SparkConf()) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.enableHiveSupport()
spark = builder.getOrCreate()
sc = spark.sparkContext
s3://your_location
와 같이 location 방식의 identifier를 이용할 수 있습니다.delta.compatibility.symlinkFormatManifest.enabled=true
으로 설정해주면 데이터 insert가 발생할때마다 뒤에서 설명할 manifest 파일을 업데이트 할 수 있습니다.CREATE TABLE IF NOT EXISTS delta.`s3://your_location` (
`key` STRING,
`value` STRING,
`topic` STRING,
`timestamp` TIMESTAMP,
`date` STRING
)
USING DELTA
PARTITIONED BY (date)
LOCATION 's3://your_location/'
TBLPROPERTIES (
'delta.compatibility.symlinkFormatManifest.enabled'='true'
)
GENERATE symlink_format_manifest FOR TABLE delta.`s3://your_location`
CREATE EXTERNAL TABLE IF NOT EXISTS **database.table** (
key STRING,
value STRING,
topic STRING,
timestamp TIMESTAMP
)
PARTITIONED BY (date STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT
'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://your_location/_symlink_format_manifest/'
MSCK REPAIR TABLE database.table
6.Create Athena Table
에서 table property로 넘겨줄 수 있습니다.CREATE EXTERNAL TABLE IF NOT EXISTS **database.table** (
key STRING,
value STRING,
topic STRING,
timestamp TIMESTAMP
)
PARTITIONED BY (date STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT
'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://your_location/_symlink_format_manifest/'
TBLPROPERTIES (
'projection.enabled'='true',
'projection.date.type'='date',
'projection.date.format'='yyyy-MM-dd',
'projection.date.range'='NOW-2YEARS,NOW+1DAY',
'projection.date.interval'='1',
'projection.date.interval.unit'='DAYS'
)
Spark에서 테이블을 저장하거나 불러올 때 location 기반의 identifier를 사용해야 합니다.
spark.read.format("delta") \
.load("s3://your_location") \
.where("date='2022-04-17'")
SELECT *
FROM delta.`s3://your_location`
WHERE date='2021-11-25'
df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("date") \
.save("s3://your_location")
partition_clause = "date='2022-04-17'"
num_partitions = 10
spark.read.format("delta") \
.load("s3://your_location") \
.where(partition_clause) \
.repartition(num_partitions) \
.write \
.format("delta") \
.option("dataChange", "false") \
.option("replaceWhere", partition_clause) \
.mode("overwrite") \
.save("s3://your_location")
spark.conf.set("spark.databricks.delta.vacuum.parallelDelete.enabled", "true")
설정을 추가하여 병렬 삭제가 가능합니다.VACUUM delta.`s3://your_location` RETAIN 72 HOURS