본 글은 Delta Lake 2.2.0 Storage configuration 을 번역 및 정리하였습니다.
Delta Lake ACID 보증은 저장 시스템의 원자성과 내구성 보증에 기반합니다. 특히 델타 레이크는 저장 시스템과 상호작용할 때 다음을 의존합니다:
저장 시스템이 이러한 보증을 기본적으로 제공하지 않을 수 있으므로, Delta Lake 트랜잭션 작업은 보통 저장 시스템에 직접 액세스하는 대신 LogStore API를 통해 수행됩니다. 다른 저장 시스템에 대한 ACID 보증을 제공하려면 다른 LogStore 구현을 사용해야 할 수 있습니다. 이 문서에서는 다양한 저장 시스템에 대해 델타 레이크를 구성하는 방법을 다룹니다. 저장 시스템에는 두 가지 범주가 있습니다.
spark.delta.logStore.<scheme>.impl=<full-qualified-class-name>
여기서 은 저장 시스템의 경로의 스키마입니다. 이를 통해 Delta Lake는 해당 경로에 대해서만 지정된 LogStore 구현을 동적으로 사용하도록 구성됩니다. 여러 스키마에 대해 애플리케이션에서 다양한 구성을 가질 수 있으므로, 서로 다른 저장 시스템에서 동시에 읽고 쓸 수 있습니다.
💡 로컬 파일 시스템에서 Delta Lake는 동시 트랜잭션 쓰기를 지원하지 않을 수 있습니다. 이는 로컬 파일 시스템이 원자적인 이름 변경을 지원하지 않을 수도 있기 때문입니다. 따라서 동시 쓰기를 테스트하기 위해 로컬 파일 시스템을 사용해서는 안 됩니다.버전 1.0 이전에는 Delta Lake가 spark.delta.logStore.class를 설정하여 LogStore를 구성할 수 있었습니다. 이 방법은 이제 사용이 중단되었습니다. 이 구성을 설정하면 모든 경로에 대해 구성된 LogStore가 사용되어 동적인 스키마 기반 위임이 비활성화됩니다.
Delta Lake는 S3에서 읽기 및 쓰기를 두 가지 다른 모드로 지원합니다: 싱글 클러스터 및 멀티 클러스터 모드입니다.
Single-cluster | Multi-cluster | |
---|---|---|
Configuration | 델타 레이크는 기본적으로 제공되며, 추가 구성 없이 사용할 수 있습니다. | 실험적이며 추가 구성이 필요합니다. |
Reads | 멀티 클러스터 모드는 여러 클러스터에서 동시에 읽기를 지원합니다. | 멀티 클러스터 모드는 여러 클러스터에서 동시에 읽기를 지원합니다. |
Writes | 싱글 클러스터 모드에서는 단일 Spark 드라이버에서 동시 쓰기를 지원합니다. | 멀티 클러스터 모드에서는 멀티 클러스터 쓰기를 지원합니다. |
Permissions | S3 credentials | S3 및 DynamoDB 작업 권한이 필요합니다. |
기본 모드에서 Delta Lake는 여러 클러스터에서 동시에 읽기를 지원하지만, S3로의 동시 쓰기는 델타 레이크가 트랜잭션 보증을 제공하기 위해 하나의 Spark 드라이버에서 시작되어야 합니다. 이는 S3가 현재 상호 배제를 제공하지 않기 때문입니다. 즉, 하나의 작성자만 파일을 생성할 수 있는 방법을 보장할 수 없습니다.
💡 경고:여러 Spark 드라이버에서 S3 저장소의 동일한 Delta 테이블에 대해 동시에 쓰기 작업을 수행하면 데이터 손실이 발생할 수 있습니다. 멀티 클러스터 솔루션을 사용하려면 아래의 멀티 클러스터 설정 섹션을 참조하십시오.
이 섹션에서는 싱글 클러스터 모드를 사용하여 S3에서 Delta 테이블을 빠르게 읽고 쓰는 방법에 대해 설명합니다. 구성에 대한 자세한 설명은 설정 구성(S3 멀티 클러스터)을 참조하십시오.
bin/spark-shell \
--packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-aws:3.3.1 \
--conf spark.hadoop.fs.s3a.access.key=<your-s3-access-key> \
--conf spark.hadoop.fs.s3a.secret.key=<your-s3-secret-key>
// Create a Delta table on S3:
spark.range(5).write.format("delta").save("s3a://<your-s3-bucket>/<path-to-delta-table>")
// Read a Delta table on S3:
spark.read.format("delta").load("s3a://<your-s3-bucket>/<path-to-delta-table>").show()
다른 언어 및 Delta 테이블 작업에 대한 더 많은 예제는 퀵스타트 페이지를 참조하십시오.
다음은 S3에 대해 Delta Lake를 구성하는 단계입니다.
Delta Lake는 S3를 위해 Hadoop의 FileSystem API를 구현한 hadoop-aws 패키지의 org.apache.hadoop.fs.s3a.S3AFileSystem 클래스가 필요합니다. 이 패키지의 버전이 Spark가 빌드된 Hadoop 버전과 일치하는지 확인하십시오.
인증 및 권한 부여를 위해 IAM 역할을 사용하는 것이 좋습니다. 그러나 키를 사용하려는 경우, Hadoop 구성을 설정하는 방법은 다음과 같습니다(Scala):
sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-s3-access-key>")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-s3-secret-key>")
-------------------------------------------------------------------------------
import org.apache.hadoop.conf.Configuration
val hadoopConf = new Configuration()
hadoopConf.set("fs.s3a.access.key", "<ACCESS_KEY>")
hadoopConf.set("fs.s3a.secret.key", "<SECRET_KEY>")
spark.sparkContext.hadoopConfiguration.addResource(hadoopConf)
이 지원은 새롭고 실험적입니다.
이 모드는 다중 클러스터에서 S3로의 동시 쓰기를 지원하며, Delta Lake를 올바른 LogStore 구현을 사용하도록 구성함으로써 명시적으로 활성화해야합니다. 이 구현은 S3가 부족한 상호 배제를 제공하기 위해 DynamoDB를 사용합니다.
💡 경고:이 멀티 클러스터 쓰기 솔루션은 모든 작성자가 이 LogStore 구현과 동일한 DynamoDB 테이블 및 지역을 사용할 때만 안전합니다. 일부 드라이버가 기본 Delta Lake를 사용하고 다른 드라이버가 이 실험적인 LogStore를 사용하는 경우 데이터 손실이 발생할 수 있습니다.
이 섹션에서는 멀티 클러스터 모드를 사용하여 S3에서 Delta 테이블을 빠르게 읽고 쓰는 방법에 대해 설명합니다.
bin/spark-shell \
--packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-aws:3.3.1,io.delta:delta-storage-s3-dynamodb:2.2.0 \
--conf spark.hadoop.fs.s3a.access.key=<your-s3-access-key> \
--conf spark.hadoop.fs.s3a.secret.key=<your-s3-secret-key> \
--conf spark.delta.logStore.s3a.impl=io.delta.storage.S3DynamoDBLogStore \
--conf spark.io.delta.storage.S3DynamoDBLogStore.ddb.region=us-west-2
// Create a Delta table on S3:
spark.range(5).write.format("delta").save("s3a://<your-s3-bucket>/<path-to-delta-table>")
// Read a Delta table on S3:
spark.read.format("delta").load("s3a://<your-s3-bucket>/<path-to-delta-table>").show()
DynamoDB 테이블을 직접 생성하거나 자동으로 생성하도록 선택할 수 있습니다. (권장)
이 DynamoDB 테이블은 여러 Delta 테이블의 커밋 메타데이터를 유지하며, 사용 사례에 맞는 Read/Write Capacity Mode(예: 요청에 따라 또는 프로비저닝)로 구성하는 것이 중요합니다. 따라서 DynamoDB 테이블을 직접 만드는 것을 강력히 권장합니다. 다음 예제는 AWS CLI를 사용합니다. 자세한 내용은 create-table 명령 참조를 참조하십시오.
aws dynamodb create-table \
--region us-east-1 \
--table-name delta_log \
--attribute-definitions AttributeName=tablePath,AttributeType=S \
AttributeName=fileName,AttributeType=S \
--key-schema AttributeName=tablePath,KeyType=HASH \
AttributeName=fileName,KeyType=RANGE \
--billing-mode PAY_PER_REQUEST
참고: 테이블 이름과 지역을 선택하면 이 멀티 클러스터 모드가 올바르게 작동하도록 각 Spark 세션에서 지정해야 합니다. 아래 표를 참조하세요.
그러나 기본 DynamoDB 테이블이 이미 존재하지 않는 경우 이 LogStore 구현을 지정한 후에 자동으로 생성됩니다. 이 기본 테이블은 초당 5번의 강력한 일관성 있는 읽기 및 쓰기를 지원합니다. 이 기본값을 테이블-생성 전용 설정 키를 사용하여 변경할 수 있습니다. 자세한 내용은 아래 표를 참조하세요.
먼저, s3 스키마에 대해 이 LogStore 구현을 구성합니다. s3a 및 s3n 스키마에 대해서도 이 명령어를 복제할 수 있습니다.
spark.delta.logStore.s3.impl=io.delta.storage.S3DynamoDBLogStore
그 다음, DynamoDB 클라이언트를 인스턴스화하는 데 필요한 추가 정보를 지정합니다. 이 멀티 클러스터 모드에서 올바르게 작동하려면 매번 같은 tableName 및 region으로 DynamoDB 클라이언트를 인스턴스화해야 합니다. 아래는 세션별 구성과 기본값 목록입니다.
Configuration Key | Description | Default |
---|---|---|
spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName | 사용할 DynamoDB 테이블의 이름입니다. | delta_log |
spark.io.delta.storage.S3DynamoDBLogStore.ddb.region | 클라이언트에서 사용할 리전입니다. | us-east-1 |
spark.io.delta.storage.S3DynamoDBLogStore.credentials.provider | 클라이언트에서 사용하는 AWSCredentialsProvider*입니다. | https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html |
spark.io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.rcu | (테이블 생성 전용**) Read Capacity Units입니다. | 5 |
spark.io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.wcu | (테이블 생성 전용**) Write Capacity Units입니다. | 5 |
이 시점에서, 이 멀티 클러스터 설정은 완전히 작동합니다. 그러나 프로덕션 환경에서 실행할 때 성능을 향상시키고 저장소를 최적화하는 데 유용한 추가 구성이 있습니다.
이 LogStore 구현에서 자동으로 생성된 기본 DynamoDB 테이블을 사용하는 경우 기본 RCU 및 WCU가 워크로드에 충분하지 않을 수 있습니다. 프로비저닝 처리량을 조정하거나 온디맨드 모드로 업데이트할 수 있습니다.
DynamoDB 메타 데이터 항목이 완료로 표시되고 그에 해당하는 Delta 파일에 대한 우연한 덮어쓰기를 방지하는 데 S3만으로 충분한 시간이 경과한 후, 해당 항목을 DynamoDB에서 삭제해도 안전합니다. 이 작업을 수행하는 가장 저렴한 방법은 DynamoDB의 TTL 기능을 사용하는 것입니다. 이 기능은 DynamoDB 테이블에서 항목을 삭제하기 위한 무료 자동화된 수단입니다.
TTL을 활성화하려면 지정된 DynamoDB 테이블에서 다음 명령을 실행하세요.
aws dynamodb update-time-to-live \
--region us-east-1 \
--table-name delta_log \
--time-to-live-specification "Enabled=true, AttributeName=expireTime"
DynamoDB 항목이 완료로 표시된 후 기본 만료 시간은 하루입니다.
이 LogStore 구현에서는 Delta 로그에 커밋될 메타데이터 사본이 포함된 temp 파일이 생성됩니다. Delta 로그로의 커밋이 완료되고 해당 DynamoDB 항목이 제거된 후 이 temp 파일을 삭제해도 안전합니다. 실제로 실패한 커밋을 복구하는 동안 최신 temp 파일만 사용됩니다.
다음은이 temp 파일을 삭제하는 두 가지 간단한 옵션입니다.
이것은 가장 안전한 옵션입니다. 다음 명령은 및
에서 가장 최근의 temp 파일을 제외하고 모두 삭제합니다.aws s3 ls s3://<bucket>/<delta_table_path>/_delta_log/.tmp/ --recursive | awk 'NF>1{print $4}' | grep . | sort | head -n -1 | while read -r line ; do
echo "Removing ${line}"
aws s3 rm s3://<bucket>/<delta_table_path>/_delta_log/.tmp/${line}
done
보다 자동화된 옵션은 S3 라이프사이클 만료 규칙을 사용하는 것입니다. 이 규칙은 테이블 경로에있는 <delta_table_path>/_delta_log/.tmp/ 폴더를 가리키는 필터 접두사와 30 일의 만료 값을 가지도록 구성됩니다.
참고 : 충분히 큰 만료 값을 선택하는 것이 중요합니다. 위에서 언급했듯이, 실패한 커밋 복구 중에 가장 최근의 temp 파일이 사용됩니다. 이 temp 파일이 삭제되면 DynamoDB 테이블과 S3 <delta_table_path>/_delta_log/.tmp/ 폴더가 동기화되지 않습니다.
AWS 문서에서 설명하는대로 버킷 수명주기 구성을 구성하는 다양한 방법이 있습니다.
put-bucket-lifecycle-configuration 명령을 사용하는 방법이 있습니다. 자세한 내용은 S3 수명주기 구성을 참조하십시오. 아래에 예제 규칙과 명령 호출이 제공됩니다.
file://lifecycle.json에 참조되는 파일에서 :
{
"Rules":[
{
"ID":"expire_tmp_files",
"Filter":{
"Prefix":"path/to/table/_delta_log/.tmp/"
},
"Status":"Enabled",
"Expiration":{
"Days":30
}
}
]
}
aws s3api put-bucket-lifecycle-configuration \
--bucket my-bucket \
--lifecycle-configuration file://lifecycle.json
💡 참고
AWS S3는 버킷당 규칙 수에 대한 제한이 있을 수 있습니다. 자세한 내용은 PutBucketLifecycleConfiguration을 참조하십시오.
다중 클러스터에서 동시에 읽고 쓰기에 대한 전체 트랜잭션 보장을 제공하는 HDFS에 대한 Delta Lake의 내장 지원이 있습니다. 자격 증명을 구성하는 방법에 대해서는 Hadoop 및 Spark 문서를 참조하십시오.