Background
- 1시간 배치 마다 S3 에 적재 되는 parquet 데이터를 Glue Iceberg table 에 UPSERT 하는 작업이 필요했다.
- Glue job(spark), DMS, Flink 등등의 방법이 있었으나, 배치로 구현 되며, 데이터 갱신량이 많지 않아 python 라이브러리로만 구현 하였다.
- 향후 개발 시 참고할 수 있도록 로직과 테스트 하였던 코드를 기록한다.
UPSERT Flow
- READ:
awswrangler.s3.read_parquet 를 이용하여 1시간 마다 s3에 적재 되는 신규 또는 갱신된 데이터를 읽어 데이터프레임으로 만든다.
- CREATE:
awswrangler.s3.to_parquet 를 이용하여 1번에서 만든 데이터프레임을 s3 임시 경로에 object 를 저장하고 Glue data catalog 에 임시 테이블(이하 temp_table)을 만든다.
- UPSERT:
awswrangler.athena.start_query_execution 를 이용하여 Athena DML 에서 지원하는 MERGE INTO Query 로 Iceberg 테이블과 temp_table 데이터로 UPSERT 진행한다.
- DELETE:
awswrangler.catalog.delete_table_if_exists 를 이용하여 Glue catalog 에 임시로 생성해둔 temp_table 을 삭제한다.
- DELETE:
awswrangler.s3.delete_objects 를 이용하여 임시 s3 경로에 업로드 했던 object 를 삭제한다.
Example Code
from awswrangler.s3 import read_parquet, to_parquet, delete_objects
from awswrangler.athena import start_query_execution
from awswrangler.catalog import delete_table_if_exists
glue_database = 'my_database'
glue_iceberg_table = 'iceberg_table'
glue_temp_table = 'temp_table'
athena_workgroup = 'my_athena_workgroup'
s3_incremental_path = 's3://my-prod-bucket/data_prefix/version=1.0/year=2025/month=01/day=01'
s3_temp_path = 's3://my-temp-bucket/data_prefix/version=1.0/year=2025/month=01/day=01'
df_new = read_parquet(path=s3_incremental_path, dataset=True)
to_parquet(
df=df_new,
path=s3_temp_path,
dataset=True,
mode="overwrite",
database=glue_database,
table=glue_temp_table
)
query = f"""
MERGE INTO {glue_iceberg_table} AS old
USING {glue_temp_table} AS new
ON old.col1 = new.col1
WHEN NOT MATCHED THEN -- old.col1 과 new.col1 이 일치하지 않는 데이터일 경우 INSERT 진행
INSERT (
col1,
col2,
col3,
year,
month,
day
)
VALUES (
new.col1,
new.col2,
new.col3,
year,
month,
day
)
WHEN MATCHED AND (new.col2 > old.col2 ) THEN -- old.col1 과 new.col1 이 일치하는 데이터일 경우 UPDATE 진행
UPDATE SET
col1 = new.col1,
col2 = new.col2,
col3 = new.col3,
year = new.year,
month = new.month,
day = new.day
"""
start_query_execution(database=glue_database,
workgroup=athena_workgroup,
sql=query
)
delete_table_if_exists(database=glue_database, table=glue_temp_table)
delete_objects(path=s3_temp_path)
Other flow
awswrangler.s3.to_parquet 대신 awswrangler.s3.store_parquet_metadata 를 이용하면 한 번 더 s3 temp path 에 parquet 를 upload 하는 과정 없이 s3_incremental_path 경로 데이터를 직접 바라보고 임시 Glue data catalog 테이블을 만들 수 있다.
- 단,
awswrangler.s3.store_parquet_metadata 를 사용할 경우 dataset=True 로 설정 하여도 year, month, day 와 같이 s3 경로에서 파티션으로 지정해둔 값들을 Glue table 에 넣어주지 않는다.
- 나의 경우 s3 경로상에 존재하는 파티션 정보를 같이 활용해야하기 때문에 위 예제 코드와 같이 사용했지만
- 필요 없는 경우에는
awswrangler.s3.store_parquet_metadata 를 이용하여 Flow 단계를 줄이고, 불필요한 s3 업로드 과정을 생략할 수 있다.