Delta Lake 정리

Q·2024년 3월 30일

데이터 레이크하우스의 등장

2010년대까지 기업이 데이터 활용을 위해 관리하던 방식은 크게 데이터 웨어하우스(Data Warehouse, DW)와 데이터 레이크(Data Lake)로 나눌 수 있다. 두 개념 모두 데이터를 중앙적으로 관리하기 위해 등장한 개념으로 DW는 정형 데이터를 중심으로 데이터의 저장, 관리, 검색, 분석이 가능하다는 점에서 비즈니스 인사이트 도출에 중요한 역할을 담당했다.

하지만 정형 데이터 중심이라는 한계점으로 인해 비정형, 반정형 데이터 처리가 필요한 머신러닝 데이터 페르소나(Data Persona)의 요구사항을 충족하지 못하고 여전히 데이터 사일로(Data Silo) 문제가 발생한다는 문제점으로부터 우선 한곳에 모두 수집하고, 오브젝트 스토리지 형태로 정형을 비롯한 비정형, 반정형 데이터 처리가 가능한 데이터 레이크가 인기를 얻었다.

데이터 레이크는 많은 장점이 있었지만 DW와 달리 데이터의 관리, 접근이 용이하지 않아 쌓기는 하지만 정작 활용을 하지 못하는 데이터 늪(Data Swamp)이 되는 문제가 발생했다. 이런 배경에서 데이터 레이크 처럼 확장 가능한 오브젝트 스토리지 기반으로 저장하면서도 데이터의 관리 및 분석이 가능한 관리 방식의 필요성이 대두하면서 데이터 레이크하우스가 등장했다.

데이터 레이크하우스의 장점

데이터 레이크하우스는 데이터 웨어하우스와 데이터 레이크의 장점을 결합한 형태로 아래와 같은 장점이 있다.

  • 다양한 데이터 형태(정형, 반정형, 비정형)로 수집 및 저장이 가능하다.
  • 데이터의 관리 및 분석이 가능하기 때문에 데이터 늪이 되는 문제를 줄일 수 있다.
  • 다양한 데이터 페르소나의 요구사항을 충족할 수 있다(분석, 머신러닝 등).
  • Batch와 Streaming 워크로드의 결합이 가능하다.

Delta Lake

데이터 레이크하우스의 구현을 위해 개발된 여러 데이터 포맷이 있다. 대표적으로 Delta Lake와 Apache Iceberg가 있으며 이 글에서는 Delta Lake를 중심으로 살펴본다.

데이터레이크 구축의 핵심은 메타데이터 관리라고 생각한다. 잘못된 데이터가 계속 유입되거나 필요한 데이터가 어느 경로에 어떤 구조로 저장되어있는지 관리되지 않으면, 비용만 차지하는 데이터 늪이 되기 쉽다.

Delta Lake는 데이터 레이크 위에 Lakehouse 아키텍처를 구축할 수 있는 오픈소스 스토리지다. Spark를 개발한 UC버클리대학 AMP랩에서 설립한 Databricks라는 회사에서 주도하는 프로젝트로서, 데이터 레이크의 문제점과 데이터 웨어하우스의 문제점을 보완해줄 수 있다.

Delta Lake 특징

  • Delta Lake에서 모든 연산은 Append
  • ACID
  • Schema enforcement
  • Time travel
  • 성능
  • 비용

Delta Lake에서 모든 연산은 Append

Delta Lake는 모든 연산을 append로 동작한다. 연산마다 매번 새로운 오브젝트를 생성하는 방식으로 데이터가 저장된 경로를 살펴보면, 실제 데이터는 parquet 포맷으로 저장되며 트랜잭션 로그도 함께 저장된다. 데이터에 연산을 수행할 때마다 parquet 파일이 추가되며 트랜잭션 로그도 같이 append가 된다.

ACID

하나의 데이터 저장소에 동시에 여러 작업이 수행될 때 delta lake에서 데이터 일관성을 보장해준다.

예를 들어 사용자 A가 읽기 작업을 하면, 최근 트랜잭션 로그를 확인하여 최신 버전의 파일 블록들을 사용자 A에게 전달한다. 동시에 사용자 B가 쓰기 작업을 하고 있어도 이는 새로운 파일을 append하기 때문에 사용자 A의 읽기에는 영향이 없다. 사용자 B의 쓰기가 모두 완료된 시점에서 새로운 트랜잭션 로그가 추가되고 그 이후에 다른 사용자가 데이터를 조회할 때 새로운 버전의 파일이 제공된다.

만약 사용자 A와 사용자 B가 동시에 같은 파일 블록에 수정 작업을 하면 어떨까? Delta Lake는 먼저 작업이 종료된 작업을 트랜잭션 로그에 추가하며, 수정 작업을 뒤늦게 완료하고 트랜잭션 로그를 추가하려는 순간 기존 버전이 변경된 것을 인식하여 새로운 작업을 실패 처리해 준다. 분산 스토리지 시스템에서 언제나 일관된 데이터를 보장해주기 때문에 신뢰도가 높은 데이터를 유지 할 수 있다.

Spark에서 overwrite를 하는 과정은 아래와 같은 순서대로 수행된다.

  1. 데이터를 저장소에서 메모리로 읽기
  2. 데이터 메모리에서 수정
  3. 데이터를 저장소에서 삭제
  4. 메모리에서 수정된 데이터를 저장소에 저장

여기서 만약 데이터가 저장소에서 삭제된 순간 오류가 발생하여 작업이 종료된다면, 모든 데이터가 유실되게 된다. 그러나 Delta Lake를 사용하면, 작업이 중간에 종료된다 해도 기존 버전의 데이터가 계속 유지되기 때문에 안정적으로 overwrite를 사용할 수 있다.

Schema enforcement

Delta Lake는 스키마 enforcement 기능도 제공해준다. 데이터 소싱 시 데이터를 전달해주는 주체는 대부분 외부이다. 따라서 오염된 데이터가 입력된다고 해도 데이터 파이프라인 단계의 마지막 단계에서 데이터 정합성 문제가 확인되거나 최악의 경우에는 이러한 오류가 확인되지 못하고 지표에 반영되는 경우도 있을 수 있다.

# type mismatch error
>>> df.write.format("delta").mode("append").save("/tmp/delta-table") # 실패

# type casting
>>> df = df .withColumn("id", col("id").cast("integer"))
>>> df.write.format("delta").mode("append").save("/tmp/delta-table") # 성공

Delta Lake에서 schema enforcement를 사용하면 기존과 다른 스키마의 데이터가 append 되는 순간 에러를 발생시키기 때문에 데이터 유입 초기 단계에서 데이터 구조 변화를 확인할 수 있고 빠른 대응이 가능해진다.

Time travel

데이터가 바뀔 때마다 버저닝을 하는 특징을 활용해서, 데이터를 예전 버전으로 되돌릴 수 있다. 데이터를 잘못 처리하거나 실수로 데이터를 지웠어도 간단하게 아래와 같이 예전 데이터를 불러올 수 있다.

df = spark.read.format("delta").option("timestampAsOf", "2022-03-07 09:08:45.333").load("/tmp/delta-table")

데이터를 날짜별로 Dataframe에 불러와서 통계 분석 하는 방식으로 time travel을 활용할 수도 있다.

데이터 버전의 보존 기간은 직접 수동으로 설정할 수 있으며, vacuum 명령어를 사용하여 명시적으로 오래된 버전의 데이터를 영구 삭제해주는 기능도 제공하고 있다.

성능

Delta Lake는 압축, 데이터 스키핑, 캐싱 등을 활용해서 더욱더 빠른 쿼리 성능을 낼 수 있도록 시스템을 발전시키고 있다.

비용

AWS의 RDS는 저장 용량이 클수록 비용이 늘어나며, 시스템의 수평 확장이 어렵기 때문에, 데이터 증가에 따른 쿼리 비용 또한 크게 증가한다. Delta Lake로 전환 시 분산 저장 스토리지인 AWS S3를 사용하였는데, S3는 상대적으로 저장 비용이 저렴하며 네트워크 사용량에 따라 비용을 청구한다. 따라서 사용자 쿼리 레이어에는 시계열 쿼리에 최적화된 쿼리 엔진에 최종 집계된 데이터를 적재하여 이를 대시보드에 연결하는 방식으로 비용을 절감할 수 있다.

Delta VS Parquet

Delta Format과 Parquet Format의 차이는 무엇일까? Parquet는 많은 장점이 있지만 아래와 같은 단점이 있다.

  • 데이터 처리가 중간에 멈춘 경우 데이터의 일관성을 보장할 수 없다(참고 ACID).

  • 스트리밍 데이터 처리에 부적합하다. 스트리밍 워크로드의 경우 읽기 보다는 쓰기 성능이 중요한데, Parquet 포맷의 경우 읽기 성능이 최적화된 포맷이다.

Data Operation

Parquet에서 append와 overwrite

  • append

    df를 생성하고 parquet으로 저장

    columns = ["singer", "country"]
    data1 = [("feid", "colombia")]
    rdd1 = spark.sparkContext.parallelize(data1)
    df1 = rdd1.toDF(columns)
    df1.repartition(1).write.format("parquet").save("tmp/singers1")
    tmp/singers1
    ├── _SUCCESS
    └── part-00000-ffcc616b-4009-462a-a60d-9e2bd7130083-c000.snappy.parquet

    이후 또 다른 df를 만들어 parquet 테이블에 추가했을때

    ```python
    data2 = [("annita", "brasil")]
    rdd2 = spark.sparkContext.parallelize(data2)
    df2 = rdd2.toDF(columns)
    ```
    ```python
    df2.repartition(1).write.mode("append").format("parquet").save("tmp/singers1")
    ```
    ```
    tmp/singers1

    ├── _SUCCESS
    ├── part-00000-49da366f-fd15-481b-a3a4-8b3bd26ef2c7-c000.snappy.parquet
    └── part-00000-ffcc616b-4009-462a-a60d-9e2bd7130083-c000.snappy.parquet

    ```

  • overwrite
    df를 생성하고 overwrite로 저장했을때

    data3 = [("rihanna", "barbados")]
    rdd3 = spark.sparkContext.parallelize(data3)
    df3 = rdd3.toDF(columns)
    df3.repartition(1).write.mode("overwrite").format("parquet").save("tmp/singers1")
    +-------+--------+
    | singer| country|
    +-------+--------+
    |rihanna|barbados|
    +-------+--------+
    tmp/singers1
    ├── _SUCCESS
    └── part-00000-63531918-401d-4983-8848-7b99fff39713-c000.snappy.parquet

    savemode가 overwrite일때 paquet에서는 새 파일을 쓰고 기존 파일을 전부 삭제하는 것을 확인할 수 있다.

Delta lake에서 append와 overwrite

  • append

    위의 parquet과 동일하게 df를 생성하고 Delta table을 생성

    df1.repartition(1).write.format("delta").save("tmp/singers2")
    +------+--------+
    |singer| country|
    +------+--------+
    |  feid|colombia|
    +------+--------+
    tmp/singers2
    ├── _delta_log
    │   └── 00000000000000000000.json
    └── part-00000-946ae20f-fa5a-4e92-b1c9-49322594609a-c000.snappy.parquet

    이후 또 다른 df를 만들어 parquet 테이블에 추가했을때

    df2.repartition(1).write.mode("append").format("delta").save("tmp/singers2")
    +------+--------+
    |singer| country|
    +------+--------+
    |annita|  brasil|
    |  feid|colombia|
    +------+--------+
    tmp/singers2
    ├── _delta_log
    │   ├── 00000000000000000000.json
    │   └── 00000000000000000001.json
    ├── part-00000-946ae20f-fa5a-4e92-b1c9-49322594609a-c000.snappy.parquet
    └── part-00000-adda870a-83a2-4f5c-82a0-c6ecc60d9d2e-c000.snappy.parquet
  • overwrite

    df3.repartition(1).write.mode("overwrite").format("delta").save("tmp/singers2")
    +-------+--------+
    | singer| country|
    +-------+--------+
    |rihanna|barbados|
    +-------+--------+
    tmp/singers2
    ├── _delta_log
    │   ├── 00000000000000000000.json
    │   ├── 00000000000000000001.json
    │   └── 00000000000000000002.json
    ├── part-00000-2d176e2d-66e0-44b6-8922-6bc3a15a6b96-c000.snappy.parquet
    ├── part-00000-946ae20f-fa5a-4e92-b1c9-49322594609a-c000.snappy.parquet
    └── part-00000-adda870a-83a2-4f5c-82a0-c6ecc60d9d2e-c000.snappy.parquet

    Delta Lake의 overwrite 작업은 기본 Parquet 파일을 삭제하지 않으며 기존 파일을 무시하기 위해 트랜잭션 로그에 항목을 추가하고(논리적 삭제) 실제로 파일을 제거하지는 않는다(물리적 삭제).

Convert from Parquet to Delta Lake

  • Parquet to Delta Lake API

    df를 생성하고 parquet으로 저장

    columns = ["language", "num_speakers"]
    data = [("English", "1.5"), ("Mandarin", "1.1"), ("Hindi", "0.6")]
    rdd = spark.sparkContext.parallelize(data)
    df = rdd.toDF(columns)
    df.write.format("parquet").save("tmp/lake1")
    tmp/lake1
    ├── _SUCCESS
    ├── part-00000-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
    ├── part-00003-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
    ├── part-00006-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
    └── part-00009-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet

    그 후 Delta lake로 변환

    deltaTable = DeltaTable.convertToDelta(spark, "parquet.`tmp/lake1`")
    tmp/lake1
    ├── _SUCCESS
    ├── _delta_log
    │   ├── 00000000000000000000.checkpoint.parquet
    │   ├── 00000000000000000000.json
    │   └── _last_checkpoint
    ├── part-00000-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
    ├── part-00003-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
    ├── part-00006-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
    └── part-00009-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet

    _delta_log가 Parquet 파일을 스캔하여 Delta Lake 데이터 쿼리에 필요한 메타데이터가 포함된 디렉토리를 빌드한다.

  • Partitioned Parquet table to Delta Lake API

    Partitioning 된 Parquet 테이블 생성

    df.write.partitionBy("language").format("parquet").save("tmp/lake2")
    tmp/lake2
    ├── _SUCCESS
    ├── language=English
    │   └── part-00003-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
    ├── language=Hindi
    │   └── part-00009-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
    └── language=Mandarin
        └── part-00006-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet

    이 후 Delta Lake로 Convert

    deltaTable = DeltaTable.convertToDelta(spark, "parquet.`tmp/lake2`")

    하지만 이때 다음 에러 발생

    AnalysisException: Expecting 0 partition column(s): [], but found 1 partition column(s): [`language`] from parsing the file name: file:/.../delta-examples/notebooks/pyspark/tmp/lake2/language=English/part-00003-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet

    에러가 발생한 이유는 Partition Column의 데이터 타입을 지정해주기 않아서 발생한 것으로
    Parquet 테이블에서 Partition Column의 데이터 타입은 디렉토리 이름에 따라 결정되며, 이는 모호할 수 있다.
    예를 들어, 디렉토리 이름 date=2022-09-21을 읽을 때 Delta Lake는 TIMESTAMP라고 지정해줘야 한다.

    deltaTable = DeltaTable.convertToDelta(spark, "parquet.`tmp/lake2`", "language STRING")
    tmp/lake2
    ├── _SUCCESS
    ├── _delta_log
    │   ├── 00000000000000000000.checkpoint.parquet
    │   ├── 00000000000000000000.json
    │   └── _last_checkpoint
    ├── language=English
    │   └── part-00003-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
    ├── language=Hindi
    │   └── part-00009-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
    └── language=Mandarin
        └── part-00006-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet

Merge

merge 연산을 사용하는 경우

  • Slowly Changing Dimension 관리: 데이터의 부분적 업데이트 및 변경 이력 관리.
  • Change Data Capture: 외부 데이터 소스에서 수집된 변경 사항을 Delta Lake에 반영.
  • 동적 조건을 사용한 (INSERT, UPDATE, and DELETE): 여러 조건에 따라 데이터를 효율적으로 관리.
  • View 유지 관리: Delta 테이블로 저장된 View의 최신 상태를 유지.
  • GDPR 준수: 개인 데이터의 동적 관리 및 삭제.
  • merge의 whenNotMatchedInsert

기존 Delta Table 데이터셋과는 일치하지 않는 새로운 데이터셋이 들어왔을때 merge의 whenNotMatchedInsert으로 쉽게 처리 가능

Delta Table 생성

data = [(0, "Bob", 23), (1, "Sue", 25), (2, "Jim", 27)]

df = spark.createDataFrame(data).toDF("id", "name", "age")
df.repartition(1).write.format("delta").save("/tmp/people")

새로운 DF 생성

new_data = [
    (0, "Bob", 23),    # exists in our original dataset above
    (3, "Sally", 30),  # new data
    (4, "Henry", 33),  # new data
]

new_df = spark.createDataFrame(new_data).toDF("id", "name", "age").repartition(1)

기존 Delta Table 참조 후 whenNotMatchedInsert 절로 merge를 사용하여 기존 행과 일치하지 않는 새 행만 추가

people_table.alias("target").merge(
    new_df.alias("source"), "target.id = source.id"
).whenNotMatchedInsert(
    values={"id": "source.id", "name": "source.name", "age": "source.age"}
).execute()

whenNotMatchedInsertAll로 모든 source 필드의 값을 삽입하는 것도 가능

.whenNotMatchedInsertAll().execute()

결과 확인

people_table.toDF().show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  0|  Bob| 23|
|  1|  Sue| 25|
|  2|  Jim| 27|
|  3|Sally| 30|
|  4|Henry| 33|
+---+-----+---+

_delta_log 확인 결과 Delta Table은 각각 두 개의 별도 커밋에서 추가된 두 개의 parquet 파일로 구성되어 있다.
첫 번째 커밋은 merge 작업을 하기 이전의 Parquet File이며
두 번째 커밋은 merge 작업 이 후 생겨난 Parquet File이다.
이 것을 보면 Delta Lake는 append 연산으로 merge 작업으로도 이전 데이터를 overwrite하지 않고 append 하는 것을 확인할 수 있다.

> ls /tmp/people
_delta_log
part-00000-15d1e82e-2f94-4a91-a143-f970a973fccc-c000.snappy.parquet
part-00000-98425a3b-ca1c-4efd-be4a-3c843b765299-c000.snappy.parquet

> cat /tmp/people/_delta_log/00000000000000000000.json
…
{"add":{"path":"part-00000-15d1e82e-2f94-4a91-a143-f970a973fccc-c000.snappy.parquet", ... }

> cat /tmp/people/_delta_log/00000000000000000001.json
…
{"add":{"path":"part-00000-98425a3b-ca1c-4efd-be4a-3c843b765299-c000.snappy.parquet", ... }
  • merge의 whenMatchedUpdate

기존의 DF에서 만약 아래의 새로운 DF를 생성하여 merge 했을때 whenMatchedUpdate로 일치하는 것에 대한 조건을 넣어 줄 수 있다.
아래 코드에서는 .whenMatchedUpdate(set={"age": "source.age"}) 로 age를 update 한다.
whenNotMatchedInsert과 마찬가지로 whenMatchedUpdate에도 모든 source 필드의 값을 update 하는 whenMatchedUpdateAll 절이 존재한다.

new_data = [
    (4, "Henry", 34),
    (5, "Allie", 22),
]

new_df = spark.createDataFrame(new_data).toDF("id", "name", "age").repartition(1)

people_table.alias("target").merge(
    new_df.alias("source"), "target.id = source.id"
).whenMatchedUpdate(set={"age": "source.age"}).whenNotMatchedInsertAll().execute()

결과로 Henry의 나이가 update 되었는지 확인

people_table.toDF().show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  3|Sally| 30|
|  4|Henry| 34|
|  5|Allie| 22|
|  0|  Bob| 23|
|  1|  Sue| 25|
|  2|  Jim| 27|
+---+-----+---+
  • 만약 Delta Lake를 사용하지 않고 Parquet에서만 Table merge를 한다면?

동일한 데이터셋을 설정

data = [(0, "Bob", 23), (1, "Sue", 25), (2, "Jim", 27)]

df = spark.createDataFrame(data).toDF("id", "name", "age")
df.repartition(1).write.format("parquet").save("/tmp/parquet/people")

target = spark.read.format("parquet").load("/tmp/parquet/people")

new_data = [
    (0, "Bob", 23),
    (3, "Sally", 30),
    (4, "Henry", 33),
]

source = spark.createDataFrame(new_data).toDF("id", "name", "age").repartition(1)

두 데이터셋을 결합하고 source 데이터와 target 데이터 중 하나를 조건부로 선택하는 표현식을 수동으로 작성

source_prefix = source.select([F.col(c).alias("source_" + c) for c in source.columns])
target_prefix = target.select([F.col(c).alias("target_" + c) for c in target.columns])

joined_df = source_prefix.join(
    target_prefix, target_prefix.target_id == source_prefix.source_id, "full_outer"
)

final_df = joined_df.select(
    F.expr("CASE WHEN target_id IS NULL THEN source_id ELSE target_id END").alias("id"),
    F.expr("CASE WHEN target_name IS NULL THEN source_name ELSE target_name END").alias(
        "name"
    ),
    F.expr("CASE WHEN target_age IS NULL THEN source_age ELSE target_age END").alias(
        "age"
    ),
)

Parquet에서만 merge 작업을 한다면 기존 테이블 전체를 다시 작성해야하며 Delta Tabel에 비해 많이 느리다.

  • Delta Lake merge for full Change Data

데이터셋 준비

people_table.toDF().show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  0|  Bob| 23|
|  1|  Sue| 25|
|  2|  Jim| 27|
|  3|Sally| 30|
|  4|Henry| 33|
+---+-----+---+

new_data = [
    (9, "Richard", 75, "INSERT"),
    (3, "Sally", 31, "UPDATE"),
    (0, "Bob", 23, "DELETE"),
]


new_df = spark.createDataFrame(new_data).toDF("id", "name", "age", "_op").repartition(1)

new_df와 people_table의 데이터를 merge
whenNotMatchedInsert 절을 사용하여, _op 열의 값이 "INSERT"일 경우 새로운 데이터를 삽입
whenMatchedUpdate 절을 사용하여, _op 열의 값이 "UPDATE"일 경우 기존 데이터를 업데이트
whenMatchedDelete 절을 사용하여, _op 열의 값이 "DELETE"일 경우 기존 데이터를 삭제
whenMatchedDelete 절을 사용하여 _op 열의 값이 "DELETE"인 경우에 해당 행을 삭제

people_table.alias("target").merge(
    new_df.alias("source"), "target.id = source.id"
).whenNotMatchedInsert(
    condition='source._op = "INSERT"',
    values={"id": "source.id", "name": "source.name", "age": "source.age"},
).whenMatchedUpdate(
    condition='source._op = "UPDATE"',
    set={"id": "source.id", "name": "source.name", "age": "source.age"},
).whenMatchedDelete(
    condition='source._op = "DELETE"'
).execute()

결과 확인

people_table.toDF().show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  9|Richard| 75|
|  5|  Allie| 22|
|  1| SueNew| 25|
|  3|  Sally| 32|
|  2|    Jim| 27|
|  4|  Henry| 34|
+---+-------+---+

Time Travel

Delta Table을 만들고 일부 데이터를 추가한 다음 덮어쓰기 작업을 수행하여 3가지 버전의 Delta Table을 구축

먼저, 버전 0을 생성하기 위해 Delta Table 생성

df = spark.range(0, 3)
df.repartition(1).write.format("delta").save("tmp/some_nums")

이제 Delta Table에 일부 데이터를 추가하면 버전 1이 생성

df = spark.range(8, 11)
df.repartition(1).write.mode("append").format("delta").save("tmp/some_nums")

마지막으로 Delta Table을 덮어쓰면 Delta Table의 버전 2가 생성

df = spark.createDataFrame([(55,), (66,), (77,)]).toDF("id")
df.repartition(1).write.mode("overwrite").format("delta").save("tmp/some_nums")

최신 Delta Table이 버전2 데이터로 구성되어 있는지 확인

spark.read.format("delta").load("tmp/some_nums").show()

+---+
| id|
+---+
| 55|
| 66|
| 77|
+---+

버전0으로 Time Travel을 하여 이전 버전의 Delta Table을 읽는 방법

spark.read.format("delta").option("versionAsOf", "0").load("tmp/some_nums").show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
+---+

버전1 Time Travel

spark.read.format("delta").option("versionAsOf", "1").load("tmp/some_nums").show()

+---+
| id|
+---+
|  8|
|  9|
| 10|
|  0|
|  1|
|  2|
+---+

versionAsOf을 사용하여 Delta Table의 최신 버전을 명시적으로 읽을 수도 있다.

spark.read.format("delta").option("versionAsOf", "2").load("tmp/some_nums").show()

+---+
| id|
+---+
| 55|
| 66|
| 77|
+---+

history 명령어로 Delta table의 모든 버전과 관련 timestamp 표시도 가능

from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "tmp/some_nums")
delta_table.history().select("version", "timestamp", "operation").show(truncate=False)
+-------+-----------------------+---------+
|version|timestamp              |operation|
+-------+-----------------------+---------+
|2      |2023-01-31 06:27:08.508|WRITE    |
|1      |2023-01-31 06:26:56.895|WRITE    |
|0      |2023-01-31 06:26:35.825|WRITE    |
+-------+-----------------------+---------+

다음 다이어그램은 방금 예시에서 Transaction에 대해 추가되고 제거된 파일을 보여준다.

  • 버전 0의 경우 Delta Lake는 file A만 읽으면 된다.
  • 버전 1의 경우 Delta Lake는 file A, B를 읽어야 된다.
  • 버전 2의 경우 Delta Lake는 file A, file B, file C가 추가되었지만 file A와 file B는 제거되었으므로 file C만 읽고 다른 file은 건너뛴다.

Timestamp를 기반으로 Time Travel 하는 방법

spark.read.format("delta").option("timestampAsOf", "2019-01-01").load("tmp/some_nums")

Time Travel SQL 예제

-- 특정 버전
SELECT count(*) FROM my_table VERSION AS OF 5238
-- 특정 Timestamp
SELECT count(*) FROM my_table TIMESTAMP AS OF "2019-01-01"

Delta Lake는 현재 버전을 이전 버전으로 되돌리는 restore 명령을 지원한다.
Time Travle과의 차이점으로 Time Travle은 버전을 명시해줘야 하며 이전 버전을 읽어도 기본 설정은 현재 버전이다. 하지만 restore은 이전의 버전을 기본 설정으로 재설정 하기 때문에 기본 설정을 재설정 할때 적합하다.

Optimize

작은 파일들은 쿼리 읽기 속도를 늦추기 때문에 문제가 될 수 있으며 많은 작은 파일을 나열하고, 열고, 닫으면 비용이 많이 드는 오버헤드가 발생한다. 이를 Small File Problem 이라고 하며 데이터를 더 크고 효율적인 파일에 결합하면 이 오버헤드를 줄일 수 있다.

아래 코드는 200만 개의 행이 있는 Delta 테이블에 대한 쿼리를 실행한다. education Column에 따라 Partition 되어 있으며 Partition당 1440개의 파일이 있다.

    %%time
    df = spark.read.format("delta").load("test/delta_table_1440")
    res = df.where(df.education == "10th").collect()

    CPU times: user 175 ms, sys: 20.1 ms, total: 195 ms
    Wall time: 16.1 s

그리고 2백만 행의 데이터를 가지고, 각 파티션에 대해 단 하나의 최적화된 파일로 저장된 Delta 테이블에서 동일한 쿼리를 실행해보면

    %%time
    df = spark.read.format("delta").load("test/delta_table_1")
    res = df.where(df.education == "10th").collect()

    CPU times: user 156 ms, sys: 16 ms, total: 172 ms
    Wall time: 4.62 s

이 쿼리가 훨씬 빠르게 실행되는 것을 볼 수 있다.

small file 발생 이유

  • 사용자 오류: 사용자가 데이터 세트를 다시 분할하고 여러 개의 작은 파일로 데이터를 쓸 수 있다.
  • 불변 파일 작업: Parquet과 같은 불변 파일 형식은 덮어쓸 수 없다. 즉, 이러한 Data Set을 업데이트하면 새 파일이 생성되어 작은 파일 문제가 발생할 수 있다.
  • 파티셔닝: 높은 카디널리티 열에 테이블을 파티셔닝하면 많은 작은 파일로 구성된 디스크 파티션이 생성될 수 있다.
  • 빈번한 증분 업데이트: 빈번한 증분 업데이트가 있는 테이블은 많은 작은 파일을 가질 가능성이 높다. 2분마다 업데이트되는 테이블은 한 주당 5040개의 파일들을 생성한다.

Delta Lake Optimize

  • Manual Optimize

매일 일부 데이터를 수집하는 ETL 파이프라인이 있다고 가정하고 매일 작업이 끝나면 파티션당 1,440개의 파일이 생성된다고 했을때

    > # get n files per partition
    > !ls test/delta_table/education\=10th/*.parquet | wc -l
    1440

위에서 보았듯이, 많은 작은 파일이 있는 Delta 테이블에 대해 쿼리를 실행하는 것은 효율적이지않다.

이 Delta Table을 Delta OPTIMIZE 명령을 수동으로 실행하여 파일 수를 최적화할 수 있다.
이 작업 후 각 파티션에 기록되는 기본 파일 크기는 1GB이다.

    from delta.tables import *
    deltaTable = DeltaTable.forPath(spark, "test/delta_table")
    deltaTable.optimize().executeCompaction()
  • Optimized Write ( Delta 3.1.0 부터 사용가능 )

Optimized Write은 실행하기 전에 동일한 파티션에 대한 모든 작은 쓰기를 단일 쓰기 명령으로 결합한다.
이는 여러 프로세스가 동일한 분할된 델타 테이블에 쓸 때, 즉 분산 쓰기 작업일 때 매우 유용하다.

Optimized Write는 파일을 테이블에 쓰기 전에 데이터 셔플을 사용하여 데이터를 재조정한다.
이렇게 하면 작은 파일의 수가 줄어든다.

Delta Table을 write할때 optimizeWrite 옵션을 설정하면 활성화 할 수 있다.

df.write.format("delta").option("optimizeWrite", "True").save("path/to/delta")

만약 Delta Table 전체에 대해 Optimized Write를 활성화 하고 싶다면 delta.autoOptimize.optimizeWrite 속성을 설정하면 되며 Spark SQL에서도 spark.databricks.delta.optimizeWrite.enabled를 설정하면 Optimized Write를 활성화 할 수 있다. 하지만 SQL에서는 데이터가 쓰여지기 전에 수행되는 데이터 셔플 때문에 실행하는 데 약간 더 오래걸려 이 기능이 기본적으로 활성화되지 않는다.

import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.master("local[4]").appName("parallel") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
df = spark.read.csv("data/census_2M.csv", header=True)

여러개의 작은 파일이 있는 상황을 시뮬레이션 하기 위해 repartitioning

df = df.repartition(1440)

이후 Delta Table에 데이터를 education을 기준으로 병렬로 write

df.write.format("delta").partitionBy("education").save("delta/census_table_many/")

partition당 디스크에 1440개의 파일이 있는 것을 확인

> # get n files per partition
> !ls delta/census_table_many/education\=10th/*.parquet | wc -l

1440

여러 개의 작은 파일이 있는 이 Delta 테이블에 대한 쿼리 실행

> df_small = spark.read.format("delta").load("delta/census_table_many")

> %%time
> df_10th = df_small.where(df_small.education == "10th").collect()

CPU times: user 175 ms, sys: 20.1 ms, total: 195 ms
Wall time: 16.1 s

이제 최적화된 쓰기를 사용하여 동일한 데이터를 써보면 write 작업 전에 데이터 셔플을 실행하여 작은 파일 문제를 피할 수 있다.

df.write.format("delta").partitionBy("education").option("optimizeWrite", "True").save("delta/census_table_optimized/")

모든 데이터가 Partition당 1개의 파일로 결합되어있는 것을 확인

> # get n files per partition
> !ls delta/census_table_optimized/education\=10th/*.parquet | wc -l

1

최적화된 Delta Table에 대한 쿼리 실행 결과 성능이 4.5배 향상된 것을 확인

> df_opt = spark.read.format("delta").load("delta/census_table_optimized")

> %%time
> df_10th = df_opt.where(df_opt.education == "10th").collect()

CPU times: user 146 ms, sys: 30.3 ms, total: 177 ms
Wall time: 3.66 s
  • Auto Compaction ( Delta 3.1.0 부터 사용가능 )

Optimized Write으로도 Small File Problem을 해결하기에 충분하지 않을때가 있다.
예를 들어, table에 자주 작은 업데이트를 write할때 파일은 Optimized Write 후에도 여전히 작은 파일로 남아있게된다.

Auto Compaction은 특정 임계값 크기 미만의 파일 데이터는 자동으로 더 큰 파일로 결합하여 다운스트림 쿼리는 더 최적의 파일 크기의 이점을 얻을 수 있게 한다.

Delta 테이블(delta.autoOptimize.autoCompact)이나 전체 Spark 세션(spark.databricks.delta.autoCompact.enabled)에 대해 자동 압축을 활성화할 수 있으며 최소한 특정 수의 작은 파일이 있는 파티션이나 테이블에 대해서만 하는 자동 압축을 트리거하는 데 필요한 최소 파일 수는 (spark.databricks.delta.autoCompact.minNumFiles.)을 사용하여 구성할 수 있다.

간단한 예시를 위해 위에서 사용한 동일한 200만 행 데이터 세트로 시작하여 매분 쓰기가 수행되는 상황을 시뮬레이션하기 위해 다시 Partition한다.

df = spark.read.csv("data/census_2M.csv", header=True)
df = df.repartition(1440)

Auto Compaction 활성화

spark.sql("SET spark.databricks.delta.autoCompact.enabled=true")

Delta Table write

df.write.format("delta").partitionBy("education").save("delta/census_table_compact/")

쿼리 수행을 해보면 최적화 후에 성능이 향상되는 것을 확인할 수 있다.

df_comp = spark.read.format("delta").load("delta/census_table_compact")

%%time
df_10th = df_comp.where(df_comp.education == "10th").collect()
CPU times: user 205 ms, sys: 39.2 ms, total: 244 ms
Wall time: 4.69 s
  • Optimized Write vs Auto Compaction

Optimized Write는 동일한 파티션에 대한 여러 작은 쓰기를 하나의 더 큰 쓰기 작업으로 결합다. 이는 데이터가 Delta 테이블에 쓰여지기 전에 수행되는 최적화이다.

Auto Compaction은 많은 작은 파일을 더 크고 효율적인 파일로 결합한다. 이는 데이터가 Delta 테이블에 기록된 후 수행되는 최적화이다. 이 후 나머지 작은 파일을 정리하기 위해 그 후에 VACUUM 작업을 수행을 권장 한다.

  • Delta Lake Optimize maxFileSize

spark.databricks.delta.optimize.maxFileSize 옵션을 사용하여 최대 파일크기를 조정할 수 있다. 기본 옵션은 1GB이며 굳이 이걸 벗어날 필요는 없다.

  • Delta Lake Optimize: Tradeoffs

낮은 쓰기 지연 시간이 필요한 경우 Delta Lake 최적화는 적합하지 않을 수 있으며 더 빠른 읽기 성능으로부터 이익을 얻을 수 있는 다운스트림 쿼리가 많은 경우 최적화를 실행하는 것이 일반적으로 합리적이다.

Vacuum

Delta Lake는 논리적으로 파일을 삭제하는 작업을 위해 저장소에서 파일을 물리적으로 제거하지 않으며 삭제로 표시되었고 보존 기간보다 오래된 파일을 저장소에서 물리적으로 제거하려면 vacuum 명령을 사용해야 한다.

Delta Table 생성

df = spark.createDataFrame([("bob", 3), ("sue", 5)]).toDF("first_name", "age")
df.repartition(1).write.format("delta").saveAsTable("some_people")

데이터 추가

df = spark.createDataFrame([("ingrid", 58), ("luisa", 87)]).toDF("first_name", "age")
df.repartition(1).write.format("delta").mode("append").saveAsTable("some_people")

현재 Delta Table 내용

spark.table("some_people").show()

+----------+---+
|first_name|age|
+----------+---+
|    ingrid| 58|
|     luisa| 87|
|       bob|  3|
|       sue|  5|
+----------+---+

Delta Table은 현재 두 개의 Parquet 파일로 구성되어 있으며, 둘 다 Delta Table의 최신 버전을 읽을 때 사용된다. 두 Parquet 파일 모두 Delta 트랜잭션 로그에서 제거하도록 표시되지 않았으므로(일명 "tombstoned"), vacuum으로 제거되지 않는다. Tombstoned 파일은 트랜잭션 로그에서 제거하도록 표시되며 Vacuum 명령은 tombstoned된 파일만 저장소에서 삭제한다.

some_people
├── _delta_log
│   ├── 00000000000000000000.json
│   └── 00000000000000000001.json
├── part-00000-0e9cf175-b53d-4a1f-b132-8f71eacee991-c000.snappy.parquet
└── part-00000-9ced4666-4b26-4516-95d0-6e27bc2448e7-c000.snappy.parquet

기존 델타 테이블을 새로운 데이터로 Overwrite하면 트랜잭션 로그에서 제거할 모든 기존 데이터를 표시한다. (모든 기존 파일을 삭제 표시)

df = spark.createDataFrame([("jordana", 26), ("fred", 25)]).toDF("first_name", "age")
df.repartition(1).write.format("delta").mode("overwrite").saveAsTable("some_people")

Overwrite 후 Delta Talbe의 내용은 다음과 같으며

spark.table("some_people").show()

+----------+---+
|first_name|age|
+----------+---+
|   jordana| 26|
|      fred| 25|
+----------+---+

이미지에서 볼 수 있듯이 Overwrite 작업으로 새 파일이 추가되고 기존 파일은 삭제 마크가 되어있는 것을 볼 수있으며

세 개의 파일이 모두 아직 저장되어 있는 것을 볼 수 있다.

spark-warehouse/some_people
├── _delta_log
│   ├── 00000000000000000000.json
│   ├── 00000000000000000001.json
│   └── 00000000000000000002.json
├── part-00000-0e9cf175-b53d-4a1f-b132-8f71eacee991-c000.snappy.parquet
├── part-00000-1009797a-564f-4b0c-8035-c45354018f21-c000.snappy.parquet
└── part-00000-9ced4666-4b26-4516-95d0-6e27bc2448e7-c000.snappy.parquet

vacuum command 에서 DRY RUN 모드로 제거될 파일 목록을 조회 해보면

spark.sql("VACUUM some_people DRY RUN")

Found 0 files and directories in a total of 1 directories that are safe to delete.(총 1개의 디렉토리에서 삭제해도 안전한 0개의 파일과 디렉토리를 찾았습니다.) 라는 message를 return 한다. 이 이유는 기본적인 파일 보존 기간이 7일 이므로 기간이 지난 파일이 없기 때문이다.

따라서 만약 보존 기간을 0시간으로 설정하고 제거될 파일 목록을 조회 해보면

spark.sql("VACUUM some_people RETAIN 0 HOURS DRY RUN")

IllegalArgumentException: requirement failed: Are you sure you would like to vacuum files with such a low retention period?(IllegalArgumentException: 요구 사항 실패: 보존 기간이 이렇게 짧은 파일을 정말 청소하시겠습니까?) 라는 message를 return 한다. 왠만해서 데모가 아니라면 보관 기간을 0으로 설정하면 안된다.

만약 vacuum 설정을 하려는 테이블이 insert/upsert/delete/optimize와 같은 작업이 수행되지 않는다고 확신하는 경우 spark.databricks.delta.retentionDurationCheck.enabled = false 설정을 하여 이 검사를 끌 수 있다. 확실하지 않는 경우 기본 값인 "168시간" 이상의 값을 사용하는 것을 권장한다.

보존 기간을 0으로 설정할 수 있도록 구성을 업데이트하는 방법은 다음과 같다.

spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

이제 구성이 업데이트되었으므로 보관 기간을 0시간으로 설정하여 진공 작업을 실행할 수 있다.

spark.sql("VACUUM some_people RETAIN 0 HOURS").show(truncate=False)

위 쿼리를 실행하면 저장소에서 제거된 파일을 보여주는 DF를 반환한다.

.../part-00000-0e9cf175-b53d-4a1f-b132-8f71eacee991-c000.snappy.parquet
...//part-00000-9ced4666-4b26-4516-95d0-6e27bc2448e7-c000.snappy.parquet

저장소에서 제거되었는지 확인

spark-warehouse/some_people
├── _delta_log
│   ├── 00000000000000000000.json
│   ├── 00000000000000000001.json
│   └── 00000000000000000002.json
└── part-00000-1009797a-564f-4b0c-8035-c45354018f21-c000.snappy.parquet
  • Vacuum은 Time Travel을 제한한다.

Vacuum은 최신 Delta 테이블 버전에서 사용하는 파일을 제거하지 않지만, 이전 버전의 Delta 테이블에서 사용하는 삭제된 파일은 제거한다.

만약 Vacuum 후 Delta Table 버전 1을 읽으려고 한다면

spark.sql("SELECT * FROM some_people VERSION AS OF 1").show()

다음과 같은 오류 메시지가 나타난다.

ERROR Executor: Exception in task 0.0 in stage 237.0 (TID 113646)
java.io.FileNotFoundException:
File: .../some_people/part-00000-0e9cf175-b53d-4a1f-b132-8f71eacee991-c000.snappy.parquet does not exist

It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

이 에러가 발생한 이유는 Vacuum으로 버전 1의 Parquet 파일이 물리적으로 제거되었기 때문이다.

  • Vacuum은 성능 최적화가 아니다

Delta Lake 사용자는 Delta Table을 Vacuum 하면 query가 더 빨리 실행될 것이라고 잘못 생각할 수 있지만 잘못된 생각이다.

Delta Table을 query할 때 query engine은 먼저 트랜잭션 로그를 구문 분석하여 읽어야 할 파일을 결정한 다음 query에 필요한 관련 파일을 CherryPick 한다.

query SELECT * FROM some_people VERSION AS OF 2예로 들어 보자.

이 query는 트랜잭션 로그를 구문 분석하고 버전 2의 모든 데이터를 얻기 위해 어떤 파일을 읽어야 하는지 파악한다. 그런 다음 query engine은 쿼리를 실행하는 데 필요한 파일을 읽는다.

Delta Lake는 필요한 파일만 읽는다. 저장소에 삭제된 파일이 있어도 쿼리 성능에 영향을 미치지 않는다. 삭제된 파일은 단순히 무시된다.

이것이 Delta Table을 Vacuum 성능이 향상되지 않는 이유이다. Vacuum은 보관 비용을 절약하는 데 도움이 될 뿐이다.

참고 자료

profile
Data Engineer

0개의 댓글