Delta Lake Operation 03. Merge

Q·2024년 8월 21일

Delta Lake

목록 보기
5/8

Merge

merge 연산을 사용하는 경우

  • Slowly Changing Dimension 관리: 데이터의 부분적 업데이트 및 변경 이력 관리.
  • Change Data Capture: 외부 데이터 소스에서 수집된 변경 사항을 Delta Lake에 반영.
  • 동적 조건을 사용한 (INSERT, UPDATE, and DELETE): 여러 조건에 따라 데이터를 효율적으로 관리.
  • View 유지 관리: Delta 테이블로 저장된 View의 최신 상태를 유지.
  • GDPR 준수: 개인 데이터의 동적 관리 및 삭제.
  • merge의 whenNotMatchedInsert

기존 Delta Table 데이터셋과는 일치하지 않는 새로운 데이터셋이 들어왔을때 merge의 whenNotMatchedInsert으로 쉽게 처리 가능

Delta Table 생성

data = [(0, "Bob", 23), (1, "Sue", 25), (2, "Jim", 27)]

df = spark.createDataFrame(data).toDF("id", "name", "age")
df.repartition(1).write.format("delta").save("/tmp/people")

새로운 DF 생성

new_data = [
    (0, "Bob", 23),    # exists in our original dataset above
    (3, "Sally", 30),  # new data
    (4, "Henry", 33),  # new data
]

new_df = spark.createDataFrame(new_data).toDF("id", "name", "age").repartition(1)

기존 Delta Table 참조 후 whenNotMatchedInsert 절로 merge를 사용하여 기존 행과 일치하지 않는 새 행만 추가

people_table.alias("target").merge(
    new_df.alias("source"), "target.id = source.id"
).whenNotMatchedInsert(
    values={"id": "source.id", "name": "source.name", "age": "source.age"}
).execute()

whenNotMatchedInsertAll로 모든 source 필드의 값을 삽입하는 것도 가능

.whenNotMatchedInsertAll().execute()

결과 확인

people_table.toDF().show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  0|  Bob| 23|
|  1|  Sue| 25|
|  2|  Jim| 27|
|  3|Sally| 30|
|  4|Henry| 33|
+---+-----+---+

_delta_log 확인 결과 Delta Table은 각각 두 개의 별도 커밋에서 추가된 두 개의 parquet 파일로 구성되어 있다.
첫 번째 커밋은 merge 작업을 하기 이전의 Parquet File이며
두 번째 커밋은 merge 작업 이 후 생겨난 Parquet File이다.
이 것을 보면 Delta Lake는 append 연산으로 merge 작업으로도 이전 데이터를 overwrite하지 않고 append 하는 것을 확인할 수 있다.

> ls /tmp/people
_delta_log
part-00000-15d1e82e-2f94-4a91-a143-f970a973fccc-c000.snappy.parquet
part-00000-98425a3b-ca1c-4efd-be4a-3c843b765299-c000.snappy.parquet

> cat /tmp/people/_delta_log/00000000000000000000.json
…
{"add":{"path":"part-00000-15d1e82e-2f94-4a91-a143-f970a973fccc-c000.snappy.parquet", ... }

> cat /tmp/people/_delta_log/00000000000000000001.json
…
{"add":{"path":"part-00000-98425a3b-ca1c-4efd-be4a-3c843b765299-c000.snappy.parquet", ... }
  • merge의 whenMatchedUpdate

기존의 DF에서 만약 아래의 새로운 DF를 생성하여 merge 했을때 whenMatchedUpdate로 일치하는 것에 대한 조건을 넣어 줄 수 있다.
아래 코드에서는 .whenMatchedUpdate(set={"age": "source.age"}) 로 age를 update 한다.
whenNotMatchedInsert과 마찬가지로 whenMatchedUpdate에도 모든 source 필드의 값을 update 하는 whenMatchedUpdateAll 절이 존재한다.

new_data = [
    (4, "Henry", 34),
    (5, "Allie", 22),
]

new_df = spark.createDataFrame(new_data).toDF("id", "name", "age").repartition(1)

people_table.alias("target").merge(
    new_df.alias("source"), "target.id = source.id"
).whenMatchedUpdate(set={"age": "source.age"}).whenNotMatchedInsertAll().execute()

결과로 Henry의 나이가 update 되었는지 확인

people_table.toDF().show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  3|Sally| 30|
|  4|Henry| 34|
|  5|Allie| 22|
|  0|  Bob| 23|
|  1|  Sue| 25|
|  2|  Jim| 27|
+---+-----+---+
  • 만약 Delta Lake를 사용하지 않고 Parquet에서만 Table merge를 한다면?

동일한 데이터셋을 설정

data = [(0, "Bob", 23), (1, "Sue", 25), (2, "Jim", 27)]

df = spark.createDataFrame(data).toDF("id", "name", "age")
df.repartition(1).write.format("parquet").save("/tmp/parquet/people")

target = spark.read.format("parquet").load("/tmp/parquet/people")

new_data = [
    (0, "Bob", 23),
    (3, "Sally", 30),
    (4, "Henry", 33),
]

source = spark.createDataFrame(new_data).toDF("id", "name", "age").repartition(1)

두 데이터셋을 결합하고 source 데이터와 target 데이터 중 하나를 조건부로 선택하는 표현식을 수동으로 작성

source_prefix = source.select([F.col(c).alias("source_" + c) for c in source.columns])
target_prefix = target.select([F.col(c).alias("target_" + c) for c in target.columns])

joined_df = source_prefix.join(
    target_prefix, target_prefix.target_id == source_prefix.source_id, "full_outer"
)

final_df = joined_df.select(
    F.expr("CASE WHEN target_id IS NULL THEN source_id ELSE target_id END").alias("id"),
    F.expr("CASE WHEN target_name IS NULL THEN source_name ELSE target_name END").alias(
        "name"
    ),
    F.expr("CASE WHEN target_age IS NULL THEN source_age ELSE target_age END").alias(
        "age"
    ),
)

Parquet에서만 merge 작업을 한다면 기존 테이블 전체를 다시 작성해야하며 Delta Tabel에 비해 많이 느리다.

  • Delta Lake merge for full Change Data

데이터셋 준비

people_table.toDF().show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  0|  Bob| 23|
|  1|  Sue| 25|
|  2|  Jim| 27|
|  3|Sally| 30|
|  4|Henry| 33|
+---+-----+---+

new_data = [
    (9, "Richard", 75, "INSERT"),
    (3, "Sally", 31, "UPDATE"),
    (0, "Bob", 23, "DELETE"),
]


new_df = spark.createDataFrame(new_data).toDF("id", "name", "age", "_op").repartition(1)

new_df와 people_table의 데이터를 merge
whenNotMatchedInsert 절을 사용하여, _op 열의 값이 "INSERT"일 경우 새로운 데이터를 삽입
whenMatchedUpdate 절을 사용하여, _op 열의 값이 "UPDATE"일 경우 기존 데이터를 업데이트
whenMatchedDelete 절을 사용하여, _op 열의 값이 "DELETE"일 경우 기존 데이터를 삭제
whenMatchedDelete 절을 사용하여 _op 열의 값이 "DELETE"인 경우에 해당 행을 삭제

people_table.alias("target").merge(
    new_df.alias("source"), "target.id = source.id"
).whenNotMatchedInsert(
    condition='source._op = "INSERT"',
    values={"id": "source.id", "name": "source.name", "age": "source.age"},
).whenMatchedUpdate(
    condition='source._op = "UPDATE"',
    set={"id": "source.id", "name": "source.name", "age": "source.age"},
).whenMatchedDelete(
    condition='source._op = "DELETE"'
).execute()

결과 확인

people_table.toDF().show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  9|Richard| 75|
|  5|  Allie| 22|
|  1| SueNew| 25|
|  3|  Sally| 32|
|  2|    Jim| 27|
|  4|  Henry| 34|
+---+-------+---+

참고 자료

profile
Data Engineer

0개의 댓글