merge 연산을 사용하는 경우
- Slowly Changing Dimension 관리: 데이터의 부분적 업데이트 및 변경 이력 관리.
- Change Data Capture: 외부 데이터 소스에서 수집된 변경 사항을 Delta Lake에 반영.
- 동적 조건을 사용한 (INSERT, UPDATE, and DELETE): 여러 조건에 따라 데이터를 효율적으로 관리.
- View 유지 관리: Delta 테이블로 저장된 View의 최신 상태를 유지.
- GDPR 준수: 개인 데이터의 동적 관리 및 삭제.
기존 Delta Table 데이터셋과는 일치하지 않는 새로운 데이터셋이 들어왔을때 merge의 whenNotMatchedInsert으로 쉽게 처리 가능
Delta Table 생성
data = [(0, "Bob", 23), (1, "Sue", 25), (2, "Jim", 27)]
df = spark.createDataFrame(data).toDF("id", "name", "age")
df.repartition(1).write.format("delta").save("/tmp/people")
새로운 DF 생성
new_data = [
(0, "Bob", 23), # exists in our original dataset above
(3, "Sally", 30), # new data
(4, "Henry", 33), # new data
]
new_df = spark.createDataFrame(new_data).toDF("id", "name", "age").repartition(1)
기존 Delta Table 참조 후 whenNotMatchedInsert 절로 merge를 사용하여 기존 행과 일치하지 않는 새 행만 추가
people_table.alias("target").merge(
new_df.alias("source"), "target.id = source.id"
).whenNotMatchedInsert(
values={"id": "source.id", "name": "source.name", "age": "source.age"}
).execute()
whenNotMatchedInsertAll로 모든 source 필드의 값을 삽입하는 것도 가능
.whenNotMatchedInsertAll().execute()
결과 확인
people_table.toDF().show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 0| Bob| 23|
| 1| Sue| 25|
| 2| Jim| 27|
| 3|Sally| 30|
| 4|Henry| 33|
+---+-----+---+
_delta_log 확인 결과 Delta Table은 각각 두 개의 별도 커밋에서 추가된 두 개의 parquet 파일로 구성되어 있다.
첫 번째 커밋은 merge 작업을 하기 이전의 Parquet File이며
두 번째 커밋은 merge 작업 이 후 생겨난 Parquet File이다.
이 것을 보면 Delta Lake는 append 연산으로 merge 작업으로도 이전 데이터를 overwrite하지 않고 append 하는 것을 확인할 수 있다.
> ls /tmp/people
_delta_log
part-00000-15d1e82e-2f94-4a91-a143-f970a973fccc-c000.snappy.parquet
part-00000-98425a3b-ca1c-4efd-be4a-3c843b765299-c000.snappy.parquet
> cat /tmp/people/_delta_log/00000000000000000000.json
…
{"add":{"path":"part-00000-15d1e82e-2f94-4a91-a143-f970a973fccc-c000.snappy.parquet", ... }
> cat /tmp/people/_delta_log/00000000000000000001.json
…
{"add":{"path":"part-00000-98425a3b-ca1c-4efd-be4a-3c843b765299-c000.snappy.parquet", ... }
기존의 DF에서 만약 아래의 새로운 DF를 생성하여 merge 했을때 whenMatchedUpdate로 일치하는 것에 대한 조건을 넣어 줄 수 있다.
아래 코드에서는 .whenMatchedUpdate(set={"age": "source.age"}) 로 age를 update 한다.
whenNotMatchedInsert과 마찬가지로 whenMatchedUpdate에도 모든 source 필드의 값을 update 하는 whenMatchedUpdateAll 절이 존재한다.
new_data = [
(4, "Henry", 34),
(5, "Allie", 22),
]
new_df = spark.createDataFrame(new_data).toDF("id", "name", "age").repartition(1)
people_table.alias("target").merge(
new_df.alias("source"), "target.id = source.id"
).whenMatchedUpdate(set={"age": "source.age"}).whenNotMatchedInsertAll().execute()
결과로 Henry의 나이가 update 되었는지 확인
people_table.toDF().show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 3|Sally| 30|
| 4|Henry| 34|
| 5|Allie| 22|
| 0| Bob| 23|
| 1| Sue| 25|
| 2| Jim| 27|
+---+-----+---+
동일한 데이터셋을 설정
data = [(0, "Bob", 23), (1, "Sue", 25), (2, "Jim", 27)]
df = spark.createDataFrame(data).toDF("id", "name", "age")
df.repartition(1).write.format("parquet").save("/tmp/parquet/people")
target = spark.read.format("parquet").load("/tmp/parquet/people")
new_data = [
(0, "Bob", 23),
(3, "Sally", 30),
(4, "Henry", 33),
]
source = spark.createDataFrame(new_data).toDF("id", "name", "age").repartition(1)
두 데이터셋을 결합하고 source 데이터와 target 데이터 중 하나를 조건부로 선택하는 표현식을 수동으로 작성
source_prefix = source.select([F.col(c).alias("source_" + c) for c in source.columns])
target_prefix = target.select([F.col(c).alias("target_" + c) for c in target.columns])
joined_df = source_prefix.join(
target_prefix, target_prefix.target_id == source_prefix.source_id, "full_outer"
)
final_df = joined_df.select(
F.expr("CASE WHEN target_id IS NULL THEN source_id ELSE target_id END").alias("id"),
F.expr("CASE WHEN target_name IS NULL THEN source_name ELSE target_name END").alias(
"name"
),
F.expr("CASE WHEN target_age IS NULL THEN source_age ELSE target_age END").alias(
"age"
),
)
Parquet에서만 merge 작업을 한다면 기존 테이블 전체를 다시 작성해야하며 Delta Tabel에 비해 많이 느리다.
데이터셋 준비
people_table.toDF().show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 0| Bob| 23|
| 1| Sue| 25|
| 2| Jim| 27|
| 3|Sally| 30|
| 4|Henry| 33|
+---+-----+---+
new_data = [
(9, "Richard", 75, "INSERT"),
(3, "Sally", 31, "UPDATE"),
(0, "Bob", 23, "DELETE"),
]
new_df = spark.createDataFrame(new_data).toDF("id", "name", "age", "_op").repartition(1)
new_df와 people_table의 데이터를 merge
whenNotMatchedInsert 절을 사용하여, _op 열의 값이 "INSERT"일 경우 새로운 데이터를 삽입
whenMatchedUpdate 절을 사용하여, _op 열의 값이 "UPDATE"일 경우 기존 데이터를 업데이트
whenMatchedDelete 절을 사용하여, _op 열의 값이 "DELETE"일 경우 기존 데이터를 삭제
whenMatchedDelete 절을 사용하여 _op 열의 값이 "DELETE"인 경우에 해당 행을 삭제
people_table.alias("target").merge(
new_df.alias("source"), "target.id = source.id"
).whenNotMatchedInsert(
condition='source._op = "INSERT"',
values={"id": "source.id", "name": "source.name", "age": "source.age"},
).whenMatchedUpdate(
condition='source._op = "UPDATE"',
set={"id": "source.id", "name": "source.name", "age": "source.age"},
).whenMatchedDelete(
condition='source._op = "DELETE"'
).execute()
결과 확인
people_table.toDF().show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 9|Richard| 75|
| 5| Allie| 22|
| 1| SueNew| 25|
| 3| Sally| 32|
| 2| Jim| 27|
| 4| Henry| 34|
+---+-------+---+