본 글은 Delta Lake 2.2.0 Table batch reads and writes 을 번역 및 정리하였습니다.
Delta Lake는 테이블에 대한 배치 읽기 및 쓰기를 수행하기 위한 Apache Spark DataFrame read 및 write API에서 제공하는 대부분의 옵션을 지원합니다.
테이블에 대한 많은 Delta Lake 작업에서는, SparkSession을 생성할 때 구성을 설정하여 Apache Spark DataSourceV2 및 Catalog API(3.0 이상)와 통합을 활성화합니다. 구성 방법은 Configure SparkSession을 참조하십시오.
Delta Lake는 메타스토어에서 정의된 테이블과 경로에서 정의된 테이블 두 가지 유형의 테이블을 생성할 수 있습니다.
메타스토어에서 정의된 테이블을 사용하려면, SparkSession을 생성할 때 구성을 설정하여 Apache Spark DataSourceV2 및 Catalog API와 통합을 활성화해야 합니다. 구성 방법은 Configure SparkSession을 참조하십시오.
다음과 같은 방법으로 테이블을 생성할 수 있습니다.
SQL DDL 명령어: Delta 테이블을 생성하기 위해 CREATE TABLE 및 REPLACE TABLE과 같이 Apache Spark에서 지원하는 표준 SQL DDL 명령어를 사용할 수 있습니다.
CREATE TABLE IF NOT EXISTS default.people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
) USING DELTA
CREATE OR REPLACE TABLE default.people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
) USING DELTA
SQL은 또한 Hive 메타스토어에 항목을 생성하지 않고 경로에서 테이블을 생성하는 것도 지원합니다.
-- Create or replace table with path
CREATE OR REPLACE TABLE delta.`/tmp/delta/people10m` (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
) USING DELTA
# Create table in the metastore using DataFrame's schema and write data to it
df.write.format("delta").saveAsTable("default.people10m")
# Create or replace partitioned table with path using DataFrame's schema and write/overwrite data to it
df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
Spark DataFrameWriterV2 API를 사용하여 Delta 테이블을 생성할 수도 있습니다.
이 기능은 새롭게 추가된 기능으로 현재 미리보기 단계에 있습니다.
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
See the API documentation for details.
쿼리 또는 DML에서 분할 컬럼에 관련된 프리디케이트가 있는 경우 쿼리 또는 DML을 가속화하기 위해 데이터를 파티션할 수 있습니다. Delta 테이블을 생성할 때 데이터를 파티션하려면 파티션 기준 열을 지정하십시오. 다음 예제는 성별에 따라 파티션을 설정합니다.
-- Create table in the metastore
CREATE TABLE default.people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
USING DELTA
PARTITIONED BY (gender)
df.write.format("delta").partitionBy("gender").saveAsTable("default.people10m")
DeltaTable.create(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.partitionedBy("gender") \
.execute()
테이블이 특정 파티션을 포함하는지 여부를 결정하려면, 다음 문장을 사용하십시오.
SELECT COUNT(*) > 0 FROM WHERE = .
파티션이 존재하는 경우 true가 반환됩니다. 예를 들면 다음과 같습니다:
SELECT COUNT(*) > 0 AS `Partition exists` FROM default.people10m WHERE gender = "M"
display(spark.sql("SELECT COUNT(*) > 0 AS `Partition exists` FROM default.people10m WHERE gender = 'M'"))
메타스토어에서 정의된 테이블의 경우, 경로로 위치를 지정할 수도 있습니다. 지정된 위치로 생성된 테이블은 메타스토어에 의해 관리되지 않습니다. 경로가 지정되지 않은 관리형 테이블과는 달리, 관리되지 않는 테이블의 파일은 DROP 명령을 실행할 때 삭제되지 않습니다.
Delta Lake를 사용하여 데이터가 저장된 경로에서 이미 CREATE TABLE을 실행하는 경우, Delta Lake는 다음 작업을 수행합니다.
CREATE TABLE default.people10m
USING DELTA
LOCATION '/tmp/delta/people10m'
메타스토어의 테이블은 자동으로 기존 데이터의 스키마, 파티셔닝 및 테이블 속성을 상속합니다. 이 기능은 데이터를 메타스토어에 "Import" 데 사용할 수 있습니다.
configuration(스키마, 파티셔닝 또는 테이블 속성)을 지정하는 경우, Delta Lake는 지정된 configuration이 기존 데이터의 구성과 정확히 일치하는지 확인합니다.
💡 중요:지정된 configuration이 데이터의 구성과 정확히 일치하지 않는 경우, Delta Lake는 불일치 사항을 설명하는 예외를 throw합니다.
💡 참고:메타스토어는 Delta 테이블의 최신 정보에 대한 진리의 원천이 아닙니다. 실제로 메타스토어에서 테이블 정의는 스키마와 속성과 같은 모든 메타데이터를 포함하지 않을 수 있습니다. 메타스토어는 테이블의 위치를 포함하고 있으며, 해당 위치에서의 테이블의 트랜잭션 로그가 진리의 원천입니다. Delta에 특화된 사용자 지정을 인식하지 못하는 시스템에서 메타스토어를 쿼리하면 불완전하거나 오래된 테이블 정보가 표시될 수 있습니다.
이 기능은 새롭게 추가된 기능으로 현재 미리보기 단계에 있습니다.
Delta Lake는 생성된 열을 지원합니다. 생성된 열은 Delta 테이블의 다른 열을 기반으로 사용자 지정 함수에 따라 자동으로 생성되는 특수 유형의 열입니다. 생성된 열이 포함된 테이블에 기록하면, 해당 열의 값이 명시적으로 제공되지 않으면 Delta Lake가 자동으로 값을 계산합니다. 예를 들어, 타임스탬프 열에서 날짜 열(테이블 파티셔닝을 위해)을 자동으로 생성할 수 있습니다. 테이블에 기록할 때, 타임스탬프 열에 대한 데이터만 지정하면 됩니다. 그러나 값이 명시적으로 제공되면, 제약 조건 ( <=> ) IS TRUE을 충족해야 하며, 그렇지 않으면 오류가 발생합니다.
💡 중요:생성된 열을 포함한 테이블은 기본값보다 더 높은 테이블 작성자 프로토콜 버전을 가지고 있습니다. 테이블 프로토콜 버전을 이해하고 테이블 프로토콜 버전의 더 높은 버전을 가지는 것이 의미하는 것은 Table protocol versioning을 참조하십시오.
다음 예제는 생성된 열이 포함된 테이블을 생성하는 방법을 보여줍니다:
DeltaTable.create(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("dateOfBirth", DateType(), generatedAlwaysAs="CAST(birthDate AS DATE)") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.partitionedBy("gender") \
.execute()
생성된 열은 일반적인 열처럼 저장됩니다. 즉, 저장 공간을 차지합니다.
다음은 생성된 열에 적용되는 제한 사항입니다:
다음 표현식 중 하나로 파티션 열이 정의된 경우, Delta Lake는 쿼리에 대한 파티션 필터를 생성할 수 있습니다.
파티션 열이 앞서 설명한 표현식 중 하나로 정의되고, 쿼리가 생성 식의 기본 기둥을 사용하여 데이터를 필터링하는 경우, Delta Lake는 기본 열과 생성된 열 간의 관계를 살펴보고 생성된 파티션 열에 대한 파티션 필터를 가능하면 자동으로 생성합니다. 예를 들어, 다음 테이블이 주어졌다면:
DeltaTable.create(spark) \
.tableName("default.events") \
.addColumn("eventId", "BIGINT") \
.addColumn("data", "STRING") \
.addColumn("eventType", "STRING") \
.addColumn("eventTime", "TIMESTAMP") \
.addColumn("eventDate", "DATE", generatedAlwaysAs="CAST(eventTime AS DATE)") \
.partitionedBy("eventType", "eventDate") \
.execute()
그러면 다음 쿼리를 실행할 수 있습니다:
spark.sql('SELECT * FROM default.events WHERE eventTime >= "2020-10-01 00:00:00" <= "2020-10-01 12:00:00"')
Delta Lake는 자동으로 파티션 필터를 생성하여 이전 쿼리가 파티션 필터가 지정되지 않은 경우에도 파티션 date = 2020-10-01의 데이터만 읽도록 합니다.
다른 예로는 다음과 같은 테이블이 주어졌다면:
DeltaTable.create(spark) \
.tableName("default.events") \
.addColumn("eventId", "BIGINT") \
.addColumn("data", "STRING") \
.addColumn("eventType", "STRING") \
.addColumn("eventTime", "TIMESTAMP") \
.addColumn("year", "INT", generatedAlwaysAs="YEAR(eventTime)") \
.addColumn("month", "INT", generatedAlwaysAs="MONTH(eventTime)") \
.addColumn("day", "INT", generatedAlwaysAs="DAY(eventTime)") \
.partitionedBy("eventType", "year", "month", "day") \
.execute()
그러면 다음 쿼리를 실행할 수 있습니다:
spark.sql('SELECT * FROM default.events WHERE eventTime >= "2020-10-01 00:00:00" <= "2020-10-01 12:00:00"')
Delta Lake는 자동으로 파티션 필터를 생성하여 이전 쿼리가 파티션 필터가 지정되지 않은 경우에도 파티션 year=2020/month=10/day=01의 데이터만 읽도록 합니다.
Delta Lake에서 자동으로 생성된 파티션 필터가 있는지 확인하려면 EXPLAIN 절을 사용하고 제공된 계획을 확인할 수 있습니다.
기본적으로 공백 및 ,;{}()\n\t=와 같은 특수 문자는 테이블 열 이름에서 지원되지 않습니다. 이러한 특수 문자를 테이블의 열 이름에 포함하려면 열 매핑을 활성화하십시오.
SparkSession에서 설정한 Delta Lake 구성은 세션에서 생성된 새 Delta Lake 테이블의 기본 테이블 속성을 무시합니다. SparkSession에서 사용하는 접두사는 테이블 속성에서 사용하는 구성과 다릅니다.
Delta Lake conf | SparkSession conf |
---|---|
delta. | spark.databricks.delta.properties.defaults. |
예를 들어, 세션에서 생성된 모든 새 Delta Lake 테이블에 대해 delta.appendOnly = true 속성을 설정하려면 다음을 설정하십시오:
SET spark.databricks.delta.properties.defaults.appendOnly = true
기존 테이블의 테이블 속성을 수정하려면 SET TBLPROPERTIES를 사용하십시오.
테이블 이름 또는 경로를 지정하여 Delta 테이블을 DataFrame으로 로드할 수 있습니다.
SELECT * FROM default.people10m -- query table in the metastore
SELECT * FROM delta.`/tmp/delta/people10m` -- query table by path
spark.table("default.people10m") # query table in the metastore
spark.read.format("delta").load("/tmp/delta/people10m") # query table by path
반환된 DataFrame은 쿼리에 대해 테이블의 가장 최신 스냅샷을 자동으로 읽으며, REFRESH TABLE을 실행할 필요가 없습니다. Delta Lake는 적용 가능한 프레디케이트가 쿼리에 있는 경우 파티셔닝 및 통계를 자동으로 사용하여 최소한의 데이터만 읽습니다.
Delta Lake의 타임 트래블 기능은 Delta 테이블의 이전 스냅샷을 쿼리할 수 있도록 해줍니다. 타임 트래블에는 다음과 같은 많은 사용 사례가 있습니다.
이 섹션에서는 테이블의 이전 버전을 쿼리하기 위한 지원 방법, 데이터 보존 문제 및 예제를 제공합니다.
💡 각 버전 N의 타임스탬프는 Delta 테이블 로그에서 버전 N에 해당하는 로그 파일의 타임스탬프에 따라 달라집니다. 따라서 타임스탬프에 의한 타임 트래블은 Delta 테이블 디렉터리 전체를 새 위치로 복사한 경우에 깨질 수 있습니다. 반면 버전에 의한 타임 트래블은 영향을 받지 않습니다.이 섹션에서는 Delta 테이블의 이전 버전을 쿼리하는 방법을 보여줍니다.
AS OF
syntaxSELECT * FROM table_name TIMESTAMP AS OF timestamp_expression
SELECT * FROM table_name VERSION AS OF version
timestamp_expression은 다음 중 하나가 될 수 있습니다.
버전은 DESCRIBE HISTORY table_spec의 출력에서 얻을 수 있는 long 값입니다.
timestamp_expression과 version 모두 하위 쿼리가 될 수 없습니다.
예시
Examples
SELECT * FROM default.people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123
DataFrameReader 옵션을 사용하면 Delta 테이블의 특정 버전으로 고정된 DataFrame을 생성할 수 있습니다.
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/tmp/delta/people10m")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/tmp/delta/people10m")
timestamp_string에는 날짜나 타임스탬프 문자열만 허용됩니다. 예를 들어, "2019-01-01" 및 "2019-01-01T00:00:00.000Z"와 같은 문자열이 허용됩니다.
다운스트림 응용 프로그램을 업데이트하기 위해 작업 실행 중 Delta 테이블의 최신 상태를 사용하는 것이 일반적인 패턴입니다.
Delta 테이블은 자동으로 업데이트되므로, Delta 테이블에서 로드한 DataFrame은 기본 데이터가 업데이트될 경우 호출 간에 서로 다른 결과를 반환할 수 있습니다. 타임 트래블을 사용하여 DataFrame에서 호출 간에 반환된 데이터를 고정할 수 있습니다.
history = spark.sql("DESCRIBE HISTORY delta.`/tmp/delta/people10m`")
latest_version = history.selectExpr("max(version)").collect()
df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/tmp/delta/people10m")
Examples
yesterday = spark.sql("SELECT CAST(date_sub(current_date(), 1) AS STRING)").collect()[0][0]
df = spark.read.format("delta").option("timestampAsOf", yesterday).load("/tmp/delta/events")
df.where("userId = 111").write.format("delta").mode("append").save("/tmp/delta/events")
• 테이블에 실수로 잘못된 업데이트가 있어 수정이 필요합니다.
yesterday = spark.sql("SELECT CAST(date_sub(current_date(), 1) AS STRING)").collect()[0][0]
df = spark.read.format("delta").option("timestampAsOf", yesterday).load("/tmp/delta/events")
df.createOrReplaceTempView("my_table_yesterday")
spark.sql('''
MERGE INTO delta.`/tmp/delta/events` target
USING my_table_yesterday source
ON source.userId = target.userId
WHEN MATCHED THEN UPDATE SET *
''')
• 지난 주에 추가된 신규 고객의 수를 쿼리해주세요.
last_week = spark.sql("SELECT CAST(date_sub(current_date(), 7) AS STRING)").collect()[0][0]
df = spark.read.format("delta").option("timestampAsOf", last_week).load("/tmp/delta/events")
last_week_count = df.select("userId").distinct().count()
count = spark.read.format("delta").load("/tmp/delta/events").select("userId").distinct().count()
new_customers_count = count - last_week_count
이전 버전으로 시간 여행하려면 해당 버전의 로그 및 데이터 파일을 모두 유지해야합니다.
Delta 테이블을 백업하는 데이터 파일은 자동으로 삭제되지 않습니다. VACUUM을 실행할 때만 데이터 파일이 삭제됩니다. VACUUM은 Delta 로그 파일을 삭제하지 않으며 로그 파일은 체크포인트가 작성된 후 자동으로 정리됩니다.
기본적으로, Delta 테이블에서 30일 이전으로만 시간 여행할 수 있습니다. 다음 조건 중 하나를 충족하는 경우에는 더 오래된 시점으로 이동할 수 있습니다:
Delta는 체크포인트가 작성될 때마다 보존 기간보다 오래된 로그 항목을 자동으로 정리합니다. 이 구성을 충분히 큰 값으로 설정하면 많은 로그 항목이 보존됩니다. 로그에 대한 작업은 일정한 시간이 소요되므로 성능에 영향을 미치지 않습니다. 기록 작업은 병렬로 수행되지만 로그 크기가 커지면 더 비싸지게 됩니다.
Delta 테이블에서 VACUUM을 실행하더라도 30일간의 기록 데이터에 액세스하려면 delta.deletedFileRetentionDuration = "interval 30 days"로 설정하세요. 이 설정은 저장 비용이 증가할 수 있습니다.
💡 참고:로그 항목 정리로 인해 유지 기간보다 작은 버전으로 시간 여행할 수 없는 경우가 발생할 수 있습니다. Delta Lake는 특정 버전으로 시간 여행하기 위해 이전 체크포인트 이후의 모든 연속 로그 항목을 필요로합니다. 예를 들어, [0, 19] 버전의 로그 항목으로 구성된 테이블과 버전 10의 체크포인트가 있는 경우, 버전 0의 로그 항목이 정리되면 [1, 9] 버전으로 시간 여행할 수 없습니다. delta.logRetentionDuration 테이블 속성을 늘리면 이러한 상황을 피할 수 있습니다.
기존 Delta 테이블에 원자적으로 새 데이터를 추가하려면 append 모드를 사용하세요:
INSERT INTO default.people10m SELECT * FROM morePeople
df.write.format("delta").mode("append").save("/tmp/delta/people10m")
df.write.format("delta").mode("append").saveAsTable("default.people10m")
테이블의 모든 데이터를 원자적으로 교체하려면 overwrite 모드를 사용하세요:
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople
df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")
DataFrames를 사용하면 임의의 표현식과 일치하는 데이터만 선택적으로 덮어쓸 수도 있습니다. 이 기능은 Delta Lake 1.1.0 이상에서 사용할 수 있습니다. 다음 명령은 start_date로 파티셔닝된 대상 테이블에서 1월 이벤트를 df의 데이터로 원자적으로 대체합니다:
df.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'") \
.save("/tmp/delta/events")
이 샘플 코드는 df의 데이터를 기록하고 모두가 조건절과 일치하는지 검증한 후 원자적으로 대체합니다. 대상 테이블에서 조건에 일치하지 않는 데이터를 모두 기록하고 대체하려면, 제약 조건 검사를 비활성화하면 됩니다. 이를 위해서는 spark.databricks.delta.replaceWhere.constraintCheck.enabled를 false로 설정하면 됩니다:
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
Delta Lake 1.0.0 이하에서 replaceWhere는 파티션 열에 대한 조건절과 일치하는 데이터만 덮어쓰기합니다. 다음 명령은 date로 파티셔닝된 대상 테이블에서 1월의 월을 df의 데이터로 원자적으로 대체합니다:
df.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'") \
.save("/tmp/delta/people10m")
Delta Lake 1.1.0 이상에서 이전 동작으로 되돌아가려면, spark.databricks.delta.replaceWhere.dataColumns.enabled 플래그를 비활성화할 수 있습니다.
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Delta Lake 2.0 이상에서는 파티션이 지정된 테이블의 동적 파티션 덮어쓰기 모드를 지원합니다.
동적 파티션 덮어쓰기 모드에서는 새 데이터가 커밋될 로직 파티션의 모든 기존 데이터를 덮어쓰게 됩니다. 데이터가 포함되어 있지 않은 기존 로직 파티션은 변경되지 않습니다. 이 모드는 덮어쓰기 모드에서 데이터를 쓸 때만 적용됩니다: SQL에서 INSERT OVERWRITE 또는 df.write.mode("overwrite")를 사용하는 경우입니다.
Spark 세션 구성의 spark.sql.sources.partitionOverwriteMode를 dynamic으로 설정하여 동적 파티션 덮어쓰기 모드를 구성할 수 있습니다. 또한 DataFrameWriter 옵션 partitionOverwriteMode를 dynamic으로 설정하여 이 기능을 활성화할 수도 있습니다. 쿼리별 옵션이 있는 경우 세션 구성에서 정의된 모드를 재정의합니다. partitionOverwriteMode의 기본값은 static입니다.
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
df.write \
.format("delta") \
.mode("overwrite") \
.option("partitionOverwriteMode", "dynamic") \
.saveAsTable("default.people10m")
💡 참고:
동적 파티션 덮어쓰기는 파티션이 지정된 테이블의 replaceWhere 옵션과 충돌합니다.
동적 파티션 덮어쓰기로 작성한 데이터가 예상된 파티션만 접근하는지 검증하세요. 잘못된 파티션에 있는 단일 행이 전체 파티션을 실수로 덮어쓰게 될 수 있습니다. 어떤 데이터를 덮어쓸지 지정하기 위해 replaceWhere를 사용하는 것이 좋습니다.
파티션을 실수로 덮어쓴 경우, Delta 테이블을 이전 상태로 복원하여 변경 사항을 취소할 수 있습니다.
테이블 삭제, 업데이트 및 병합을 참조하여 Delta Lake에서 테이블 업데이트를 지원합니다.
Delta Lake 테이블의 단일 파일에 쓸 최대 레코드 수를 지정하려면 SQL 세션 구성인 spark.sql.files.maxRecordsPerFile을 사용할 수 있습니다. 값으로 0 또는 음수를 지정하면 제한이 없음을 나타냅니다.
데이터 프레임 API를 사용하여 Delta Lake 테이블에 기록할 때도 DataFrameWriter 옵션 maxRecordsPerFile을 사용할 수 있습니다. maxRecordsPerFile이 지정되면 SQL 세션 구성인 spark.sql.files.maxRecordsPerFile의 값을 무시합니다.
df.write.format("delta") \
.mode("append") \
.option("maxRecordsPerFile", "10000") \
.save("/tmp/delta/people10m")
데이터를 Delta 테이블에 기록하는 작업이 종종 다양한 이유로 다시 시작될 수 있습니다 (예: 작업이 실패). 실패한 작업은 종료되기 전에 데이터를 Delta 테이블에 기록할 수도, 기록하지 못할 수도 있습니다. 데이터가 Delta 테이블에 기록된 경우, 재시작된 작업은 중복 데이터를 생성하는 동일한 데이터를 Delta 테이블에 기록합니다.
이를 해결하기 위해 Delta 테이블은 다음과 같은 DataFrameWriter 옵션을 지원하여 기록을 멱등하게 만듭니다:
상기 옵션 조합은 Delta 테이블로 적재되는 새 데이터마다 고유해야 하며, txnVersion은 Delta 테이블로 적재된 마지막 데이터보다 커야 합니다. 예를 들어:
이 솔루션은 작업의 여러 재시도에서 Delta 테이블에 기록되는 데이터가 동일한 것으로 가정합니다. Delta 테이블의 기록 시도가 성공하지만 하위 스트림 실패로 인해 동일한 txn 옵션으로 다른 데이터로 두 번째 기록 시도가 발생하는 경우 두 번째 기록 시도는 무시됩니다. 이는 예기치 않은 결과를 초래할 수 있습니다.
Example
app_id = ... # A unique string that is used as an application ID.
version = ... # A monotonically increasing number that acts as transaction version.
dataFrame.write.format(...).option("txnVersion", version).option("txnAppId", app_id).save(...)
이러한 작업으로 생성된 커밋에 사용자 정의 문자열 메타데이터를 지정할 수 있습니다. 이를 위해 DataFrameWriter 옵션 userMetadata 또는 SparkSession 구성 spark.databricks.delta.commitInfo.userMetadata를 사용할 수 있습니다. 둘 다 지정된 경우 옵션이 우선합니다. 이 사용자 정의 메타데이터는 history 작업에서 읽을 수 있습니다.
SET spark.databricks.delta.commitInfo.userMetadata=overwritten-for-fixing-incorrect-data
INSERT OVERWRITE default.people10m SELECT * FROM morePeople
df.write.format("delta") \
.mode("overwrite") \
.option("userMetadata", "overwritten-for-fixing-incorrect-data") \
.save("/tmp/delta/people10m")
Delta Lake는 DataFrame에서 테이블로의 쓰기가 호환되는지 여부를 결정하기 위해 다음 규칙을 사용합니다.
Delta Lake는 새 열을 명시적으로 추가하고 스키마를 자동으로 업데이트하는 DDL을 지원합니다.
append 모드와 함께 partitionBy와 같은 다른 옵션을 지정하는 경우 Delta Lake는 해당 옵션이 일치하는지 확인하고 불일치하는 경우 오류가 발생합니다. partitionBy가 없으면 append는 기존 데이터의 파티셔닝을 자동으로 따릅니다.
Delta Lake를 사용하면 테이블 스키마를 업데이트할 수 있습니다. 다음 유형의 변경을 지원합니다.
이러한 변경 사항은 DDL을 사용하여 명시적으로 또는 DML을 사용하여 암시적으로 수행할 수 있습니다.
💡 중요:Delta 테이블 스키마를 업데이트하면 해당 테이블에서 읽는 스트림이 종료됩니다. 스트림을 계속하려면 다시 시작해야 합니다.
테이블 스키마를 명시적으로 변경하려면 다음과 같은 DDL을 사용할 수 있습니다.
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
기본적으로 nullability가 true입니다.
중첩된 필드에 열을 추가하려면 다음을 사용합니다.
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Example
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)을 실행하기 전의 스키마가 다음과 같다면:
- root
| - colA
| - colB
| +-field1
| +-field2
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
💡 참고:
중첩된 열 추가는 struct에 대해서만 지원됩니다. 배열과 맵은 지원되지 않습니다.
ALTER TABLE table_name ALTER [COLUMN] col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
중첩된 필드의 열을 변경하려면 다음을 사용합니다.
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
Example
ALTER TABLE boxes CHANGE COLUMN colB.field2 field2 STRING FIRST 하기 전 스키마가 다음과 같다면:
- root
| - colA
| - colB
| +-field1
| +-field2
- root
| - colA
| - colB
| +-field2
| +-field1
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Example
다음 DDL을 실행할 때:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
만약 그 전의 스키마가 다음과 같다면:
- root
| - colA
| - colB
| +-field1
| +-field2
실행 후의 스키마는 다음과 같습니다:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
이 기능은 Delta Lake 1.2.0 이상에서 사용할 수 있습니다. 이 기능은 현재 실험적입니다.
기존 열의 데이터를 재작성하지 않고 열 이름을 변경하려면, 테이블에 대한 열 매핑을 활성화해야 합니다. 열 매핑 활성화를 참조하세요.
열 이름을 변경하려면:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
중첩된 필드의 이름을 바꾸려면:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
Example
다음 명령을 실행할 때:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
만약 이전 스키마가 다음과 같다면:
- root
| - colA
| - colB
| +-field1
| +-field2
그러면 실행 후의 스키마는 다음과 같습니다:
- root
| - colA
| - colB
| +-field001
| +-field2
이 기능은 Delta Lake 2.0 이상에서 사용할 수 있습니다. 이 기능은 현재 실험적입니다.
데이터 파일을 재작성하지 않고 메타데이터만으로 열을 삭제하려면, 테이블에 대한 열 매핑을 활성화해야 합니다. 열 매핑 활성화를 참조하세요.
💡 중요:메타데이터에서 열을 삭제하면 해당 열의 기본 데이터 파일이 삭제되지 않습니다.
열을 삭제하려면:
ALTER TABLE table_name DROP COLUMN col_name
여러 열을 삭제하려면:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
테이블을 다시 작성하여 열의 유형 또는 이름을 변경하거나 열을 삭제할 수 있습니다. 이를 위해서는 overwriteSchema 옵션을 사용하세요:
열 유형 변경
spark.read.table(...) \
.withColumn("birthDate", col("birthDate").cast("date")) \
.write \
.format("delta") \
.mode("overwrite")
.option("overwriteSchema", "true") \
.saveAsTable(...)
열 이름 변경
spark.read.table(...) \
.withColumnRenamed("dateOfBirth", "birthDate") \
.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable(...)
Delta Lake는 DML 트랜잭션 (추가 또는 덮어쓰기)의 일부로 테이블의 스키마를 자동으로 업데이트하고 쓰여지는 데이터와 호환되도록 할 수 있습니다.
DataFrame에는 있지만 테이블에는 없는 열은 다음 조건이 충족될 때 쓰기 트랜잭션의 일부로 자동으로 추가됩니다:
두 옵션이 모두 지정된 경우, DataFrameWriter의 옵션이 우선합니다. 추가된 열은 해당 구조체의 끝에 추가됩니다. 새 열을 추가할 때 대소문자가 유지됩니다.
NullType
columnsParquet은 NullType을 지원하지 않기 때문에 Delta 테이블로 쓸 때 NullType 열은 삭제되지만 스키마에는 여전히 저장됩니다. 해당 열에 대해 다른 데이터 유형이 수신되면 Delta Lake는 스키마를 새 데이터 유형으로 병합합니다. Delta Lake가 기존 열에 대해 NullType을 수신하면 이전 스키마가 유지되고 쓰기 중에 새 열이 삭제됩니다.
스트리밍에서는 NullType을 지원하지 않습니다. 스키마를 설정해야 하므로 이러한 경우는 매우 드물 것입니다. NullType은 ArrayType 및 MapType과 같은 복합 유형에서도 허용되지 않습니다.
기본적으로 테이블의 데이터를 덮어쓰면 스키마는 덮어쓰지 않습니다. replaceWhere 없이 mode("overwrite")를 사용하여 테이블을 덮어쓸 때, 여전히 쓰여지는 데이터의 스키마를 덮어쓰고 싶을 수 있습니다. overwriteSchema 옵션을 true로 설정하여 테이블의 스키마와 파티셔닝을 덮어쓸 수 있습니다:
df.write.option("overwriteSchema", "true")
Delta Lake는 데이터 소스 테이블과 마찬가지로 Delta 테이블 위에 뷰를 생성하는 것을 지원합니다.
뷰를 사용할 때 핵심적인 문제는 스키마를 해결하는 것입니다. Delta 테이블 스키마를 변경하면 파생 뷰를 다시 생성하여 스키마의 추가 사항을 처리해야 합니다. 예를 들어 Delta 테이블에 새 열을 추가하면 해당 기본 테이블 위에 구축된 적절한 뷰에서 해당 열을 사용할 수 있도록 해야 합니다.
자신의 메타데이터를 CREATE 및 ALTER에서 TBLPROPERTIES를 사용하여 테이블 속성으로 저장할 수 있습니다. 그런 다음 해당 메타데이터를 SHOW할 수 있습니다. 예를 들어:
ALTER TABLE default.people10m SET TBLPROPERTIES ('department' = 'accounting', 'delta.appendOnly' = 'true');
-- Show the table's properties.
SHOW TBLPROPERTIES default.people10m;
-- Show just the 'department' table property.
SHOW TBLPROPERTIES default.people10m ('department');
TBLPROPERTIES는 Delta 테이블 메타데이터의 일부로 저장됩니다. 따라서 특정 위치에 Delta 테이블이 이미 존재하는 경우 CREATE 문에서 새로운 TBLPROPERTIES를 정의할 수 없습니다.
또한 Delta Lake는 동작 및 성능을 조정하기 위해 다음과 같은 Delta 테이블 속성을 지원합니다.
Delta 테이블 속성을 수정하면 다른 동시 쓰기 작업과 충돌하여 실패할 수 있는 쓰기 작업입니다. 테이블 속성을 수정할 때는 테이블에 동시 쓰기 작업이 없는 경우에만 수정하는 것이 좋습니다.
또한, Spark 설정을 사용하여 Delta 테이블의 첫 번째 커밋에서 delta.- 접두사가 붙은 속성을 설정할 수도 있습니다. 예를 들어, delta.appendOnly=true 속성으로 Delta 테이블을 초기화하려면 Spark 설정 spark.databricks.delta.properties.defaults.appendOnly를 true로 설정하면 됩니다.
spark.sql("SET spark.databricks.delta.properties.defaults.appendOnly = true")
spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")
Delta 테이블 속성 참조를 참조하세요.
Delta Lake는 테이블 메타데이터를 탐색하기 위한 다양한 기능을 제공합니다.
SHOW COLUMNS 및 DESCRIBE TABLE을 지원합니다.
또한 다음과 같은 고유한 명령을 제공합니다.
Delta 테이블의 스키마, 파티셔닝, 테이블 크기 등의 정보를 제공합니다. 자세한 내용은 Delta 테이블 세부 정보 검색을 참조하세요.
DESCRIBE DETAIL '/data/events/'
DESCRIBE DETAIL eventsTable
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, pathToTable)
detailDF = deltaTable.detail()
Delta 테이블의 스키마, 파티셔닝, 테이블 크기 등의 정보를 제공합니다. 자세한 내용은 Delta 테이블 세부 정보 검색을 참조하세요.
DESCRIBE HISTORY '/data/events/' -- get the full history of the table
DESCRIBE HISTORY delta.`/data/events/`
DESCRIBE HISTORY '/data/events/' LIMIT 1 -- get the last operation only
DESCRIBE HISTORY eventsTable
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, pathToTable)
fullHistoryDF = deltaTable.history() # get the full history of the table
lastOperationDF = deltaTable.history(1) # get the last operation
Delta Lake의 많은 작업에서, 새로운 SparkSession을 생성할 때 다음 구성을 설정하여 Apache Spark DataSourceV2 및 Catalog API(3.0부터)와의 통합을 활성화합니다.
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("...") \
.master("...") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
또한, 명령 줄 매개변수로 지정하여 spark-submit 또는 spark-shell 또는 pyspark를 시작할 때 구성을 추가할 수 있습니다.
spark-submit --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" ...
pyspark --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
Delta Lake는 저장소 시스템에 액세스하기 위해 Hadoop FileSystem API를 사용합니다. 일반적으로 저장소 시스템의 자격 증명은 Hadoop 구성을 통해 설정할 수 있습니다. Delta Lake는 Apache Spark와 유사한 여러 가지 방법으로 Hadoop 구성을 설정할 수 있습니다.
클러스터에서 Spark 애플리케이션을 시작할 때, spark.hadoop.* 형식의 Spark 구성을 설정하여 사용자 지정 Hadoop 구성을 전달할 수 있습니다. 예를 들어, spark.hadoop.a.b.c에 대한 값을 설정하면 해당 값을 Hadoop 구성 a.b.c로 전달하고 Delta Lake는 해당 값으로 Hadoop FileSystem API에 액세스합니다.
자세한 내용은 Spark 문서를 참조하세요.
Spark SQL은 현재 SQL 세션 구성을 모두 Delta Lake로 전달하고, Delta Lake는 해당 구성을 사용하여 Hadoop FileSystem API에 액세스합니다. 예를 들어, SET a.b.c=x.y.z는 Delta Lake에게 Hadoop 구성 a.b.c의 값을 x.y.z로 전달하라고 지시하고, Delta Lake는 해당 값을 Hadoop FileSystem API에 액세스하는 데 사용합니다.
Spark (클러스터) 구성 또는 SQL 세션 구성을 통해 Hadoop 파일 시스템 구성을 설정하는 것 외에도, Delta는 DataFrameReader 및 DataFrameWriter 옵션 (즉, fs. 접두사로 시작하는 옵션 키)을 통해 Hadoop 파일 시스템 구성을 읽고 쓸 수 있습니다. 이를 위해서는 DataFrameReader.load(path) 또는 DataFrameWriter.save(path)를 사용합니다.
예를 들어, DataFrame 옵션을 통해 저장소 자격 증명을 전달할 수 있습니다.
df1 = spark.read.format("delta") \
.option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-1>") \
.read("...")
df2 = spark.read.format("delta") \
.option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-2>") \
.read("...")
df1.union(df2).write.format("delta") \
.mode("overwrite") \
.option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-3>") \
.save("...")