Best practices

유상기·2023년 2월 21일
0

Delta Lake

목록 보기
14/16
post-thumbnail

본 글은 Delta Lake 2.2.0 Best practices 을 번역 및 정리하였습니다.

이 문서에서는 Delta Lake 사용 시 최선의 방법에 대해 설명합니다.

Choose the right partition column

Delta Lake 테이블을 열로 분할 할 수 있습니다. 가장 일반적으로 사용되는 분할 열은 날짜입니다. 분할할 열을 결정할 때 다음 두 가지 규칙을 따르세요.

  • 열의 cardinality가 매우 높을 경우 해당 열을 분할에 사용하지 마십시오. 예를 들어 userId 열로 분할하고 1 백만 개의 고유 사용자 ID가 있는 경우 이는 좋지 않은 분할 전략입니다.
  • 각 파티션의 데이터 양 : 해당 파티션에 데이터가 적어도 1GB 이상 있을 것으로 예상하는 경우 해당 열을 기준으로 분할할 수 있습니다.

Compact files

Delta 테이블에 지속적으로 데이터를 기록하면 시간이 지남에 따라 많은 수의 파일이 누적됩니다. 특히 작은 batch 단위로 데이터를 추가하는 경우에는 더 그러합니다. 이렇게 되면 테이블 읽기의 효율에 부정적인 영향을 미치며 파일 시스템의 성능에도 영향을 미칠 수 있습니다. 이상적으로는 많은 수의 작은 파일을 정기적으로 더 적은 수의 큰 파일로 다시 작성해야 합니다. 이를 컴팩션이라고 합니다.

테이블을 재파티셔닝하여 더 작은 수의 파일로 압축할 수 있습니다. 또한 데이터가 변경되지 않고 데이터 레이아웃만 재배치되기 때문에 다른 동시 작업에 최소한의 영향을 미치도록 옵션 dataChange을 false로 지정할 수 있습니다.

예를 들어, 테이블을 16개 파일로 압축할 수 있습니다.

path = "..."
numFiles = 16

(spark.read
 .format("delta")
 .load(path)
 .repartition(numFiles)
 .write
 .option("dataChange", "false")
 .format("delta")
 .mode("overwrite")
 .save(path))

테이블이 파티션화되어 있고, 조건에 따라 하나의 파티션만 재분할하려는 경우, WHERE을 사용하여 해당 파티션만 읽은 후 replaceWhere를 사용하여 다시 작성할 수 있습니다.

path = "..."
partition = "year = '2019'"
numFilesPerPartition = 16

(spark.read
 .format("delta")
 .load(path)
 .where(partition)
 .repartition(numFilesPerPartition)
 .write
 .option("dataChange", "false")
 .format("delta")
 .mode("overwrite")
 .option("replaceWhere", partition)
 .save(path))
💡 경고

데이터를 변경하는 작업에서 dataChange = false를 사용하면 테이블의 데이터가 손상될 수 있습니다.

💡 참고

이 작업은 이전 파일을 제거하지 않습니다. 이전 파일을 제거하려면 VACUUM 명령을 실행하십시오.

Replace the content or schema of a table

Delta 테이블을 대체하려는 경우가 있습니다. 예를 들어,

  • 테이블의 데이터가 잘못되었을 때 내용을 대체하려고 합니다.
  • 열 유형을 변경하는 것과 같이 호환되지 않는 스키마 변경을 수행하려면 전체 테이블을 다시 작성해야 합니다.

Delta 테이블의 전체 디렉토리를 삭제하고 동일한 경로에 새 테이블을 만들 수는 있지만 권장되지 않습니다. 이유는 다음과 같습니다:

  • 디렉토리 삭제가 효율적이지 않습니다. 매우 큰 파일이 들어 있는 디렉토리는 몇 시간 또는 며칠이 걸릴 수 있습니다.
  • 삭제된 파일의 모든 내용이 손실되며 잘못된 테이블을 삭제하면 복구하기가 어렵습니다.
  • 디렉토리 삭제는 원자적이지 않습니다. 테이블을 삭제하는 동안 테이블을 읽는 동시 쿼리는 실패하거나 부분적인 테이블을 볼 수 있습니다.

테이블 스키마를 변경하지 않아도 된다면 Delta 테이블에서 데이터를 삭제하고 새 데이터를 삽입하거나, 잘못된 값을 수정하기 위해 테이블을 업데이트할 수 있습니다.

테이블 스키마를 변경하려면 전체 테이블을 원자적으로 교체할 수 있습니다. 예를 들어:

REPLACETABLE <your-table>USING DELTA PARTITIONEDBY (<your-partition-columns>)ASSELECT ... -- Managed table
REPLACETABLE <your-table>USING DELTA PARTITIONEDBY (<your-partition-columns>)LOCATION "<your-table-path>"ASSELECT ... -- External table
dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .partitionBy(<your-partition-columns>) \
  .saveAsTable("<your-table>") # Managed table
dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .option("path", "<your-table-path>") \
  .partitionBy(<your-partition-columns>) \
  .saveAsTable("<your-table>") # External table

이 방법의 여러 가지 이점이 있습니다.

  • 디렉터리를 재귀적으로 나열하거나 파일을 삭제할 필요가 없기 때문에 테이블 덮어쓰기가 훨씬 빠릅니다.
  • 이전 버전의 테이블은 여전히 존재합니다. 잘못된 테이블을 삭제하더라도 Time Travel을 사용하여 이전 데이터를 쉽게 검색할 수 있습니다.
  • 원자적인 작업입니다. 테이블을 삭제하는 동안 동시 쿼리는 여전히 테이블을 읽을 수 있습니다.

Delta Lake ACID 트랜잭션 보장으로 인해 테이블 덮어쓰기가 실패하면 테이블은 이전 상태로 유지됩니다.

또한, 테이블을 덮어쓴 후 저장소 비용을 절약하려면 VACUUM을 사용하여 이전 파일을 삭제할 수 있습니다. 파일 삭제를 최적화하고 일반적으로 디렉토리 전체를 삭제하는 것보다 빠릅니다.

Spark caching

Databricks는 다음 이유로 Spark 캐싱 사용을 권장하지 않습니다:

  • 캐시된 DataFrame 위에 추가된 추가 필터에서 나오는 데이터 스킵을 잃게 됩니다.
  • 캐시된 데이터는 다른 식별자를 사용하여 테이블에 액세스하는 경우 업데이트되지 않을 수 있습니다. (예를 들어, spark.table(x).cache()를 수행하지만, spark.write.save(/some/path)를 사용하여 테이블을 쓰는 경우)
profile
Data/AI Solution Architect

0개의 댓글