Delta Lake Operation 04. Time Travel

Q·2024년 8월 21일

Delta Lake

목록 보기
6/8

Time Travel

Delta Table을 만들고 일부 데이터를 추가한 다음 덮어쓰기 작업을 수행하여 3가지 버전의 Delta Table을 구축

먼저, 버전 0을 생성하기 위해 Delta Table 생성

df = spark.range(0, 3)
df.repartition(1).write.format("delta").save("tmp/some_nums")

이제 Delta Table에 일부 데이터를 추가하면 버전 1이 생성

df = spark.range(8, 11)
df.repartition(1).write.mode("append").format("delta").save("tmp/some_nums")

마지막으로 Delta Table을 덮어쓰면 Delta Table의 버전 2가 생성

df = spark.createDataFrame([(55,), (66,), (77,)]).toDF("id")
df.repartition(1).write.mode("overwrite").format("delta").save("tmp/some_nums")

최신 Delta Table이 버전2 데이터로 구성되어 있는지 확인

spark.read.format("delta").load("tmp/some_nums").show()

+---+
| id|
+---+
| 55|
| 66|
| 77|
+---+

버전0으로 Time Travel을 하여 이전 버전의 Delta Table을 읽는 방법

spark.read.format("delta").option("versionAsOf", "0").load("tmp/some_nums").show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
+---+

버전1 Time Travel

spark.read.format("delta").option("versionAsOf", "1").load("tmp/some_nums").show()

+---+
| id|
+---+
|  8|
|  9|
| 10|
|  0|
|  1|
|  2|
+---+

versionAsOf을 사용하여 Delta Table의 최신 버전을 명시적으로 읽을 수도 있다.

spark.read.format("delta").option("versionAsOf", "2").load("tmp/some_nums").show()

+---+
| id|
+---+
| 55|
| 66|
| 77|
+---+

history 명령어로 Delta table의 모든 버전과 관련 timestamp 표시도 가능

from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "tmp/some_nums")
delta_table.history().select("version", "timestamp", "operation").show(truncate=False)
+-------+-----------------------+---------+
|version|timestamp              |operation|
+-------+-----------------------+---------+
|2      |2023-01-31 06:27:08.508|WRITE    |
|1      |2023-01-31 06:26:56.895|WRITE    |
|0      |2023-01-31 06:26:35.825|WRITE    |
+-------+-----------------------+---------+

다음 다이어그램은 방금 예시에서 Transaction에 대해 추가되고 제거된 파일을 보여준다.

  • 버전 0의 경우 Delta Lake는 file A만 읽으면 된다.
  • 버전 1의 경우 Delta Lake는 file A, B를 읽어야 된다.
  • 버전 2의 경우 Delta Lake는 file A, file B, file C가 추가되었지만 file A와 file B는 제거되었으므로 file C만 읽고 다른 file은 건너뛴다.

Timestamp를 기반으로 Time Travel 하는 방법

spark.read.format("delta").option("timestampAsOf", "2019-01-01").load("tmp/some_nums")

Time Travel SQL 예제

-- 특정 버전
SELECT count(*) FROM my_table VERSION AS OF 5238
-- 특정 Timestamp
SELECT count(*) FROM my_table TIMESTAMP AS OF "2019-01-01"

Delta Lake는 현재 버전을 이전 버전으로 되돌리는 restore 명령을 지원한다.
Time Travle과의 차이점으로 Time Travle은 버전을 명시해줘야 하며 이전 버전을 읽어도 기본 설정은 현재 버전이다. 하지만 restore은 이전의 버전을 기본 설정으로 재설정 하기 때문에 기본 설정을 재설정 할때 적합하다.

참고 자료

profile
Data Engineer

0개의 댓글