Python awswrangler 를 이용하여 AWS Glue Iceberg table UPSERT 하는 방법

김재민·2025년 1월 16일

iceberg

목록 보기
1/3
post-thumbnail

Background


  • 1시간 배치 마다 S3 에 적재 되는 parquet 데이터를 Glue Iceberg table 에 UPSERT 하는 작업이 필요했다.
  • Glue job(spark), DMS, Flink 등등의 방법이 있었으나, 배치로 구현 되며, 데이터 갱신량이 많지 않아 python 라이브러리로만 구현 하였다.
  • 향후 개발 시 참고할 수 있도록 로직과 테스트 하였던 코드를 기록한다.

UPSERT Flow


  1. READ: awswrangler.s3.read_parquet 를 이용하여 1시간 마다 s3에 적재 되는 신규 또는 갱신된 데이터를 읽어 데이터프레임으로 만든다.
  2. CREATE: awswrangler.s3.to_parquet 를 이용하여 1번에서 만든 데이터프레임을 s3 임시 경로에 object 를 저장하고 Glue data catalog 에 임시 테이블(이하 temp_table)을 만든다.
  3. UPSERT: awswrangler.athena.start_query_execution 를 이용하여 Athena DML 에서 지원하는 MERGE INTO Query 로 Iceberg 테이블과 temp_table 데이터로 UPSERT 진행한다.
  4. DELETE: awswrangler.catalog.delete_table_if_exists 를 이용하여 Glue catalog 에 임시로 생성해둔 temp_table 을 삭제한다.
  5. DELETE: awswrangler.s3.delete_objects 를 이용하여 임시 s3 경로에 업로드 했던 object 를 삭제한다.

Example Code


from awswrangler.s3 import read_parquet, to_parquet, delete_objects
from awswrangler.athena import start_query_execution
from awswrangler.catalog import delete_table_if_exists


glue_database = 'my_database'
glue_iceberg_table = 'iceberg_table'
glue_temp_table = 'temp_table'
athena_workgroup = 'my_athena_workgroup'
s3_incremental_path = 's3://my-prod-bucket/data_prefix/version=1.0/year=2025/month=01/day=01'
s3_temp_path = 's3://my-temp-bucket/data_prefix/version=1.0/year=2025/month=01/day=01'

# incremental(신규or갱신 데이터) 데이터프레임 생성
df_new = read_parquet(path=s3_incremental_path, dataset=True)


# incremetnal 데이터를 임시 Glue Table 로 생성
to_parquet(
    df=df_new,
    path=s3_temp_path,
    dataset=True,
    mode="overwrite",
    database=glue_database,
    table=glue_temp_table
)

# MERGE INTO Query
query = f"""
    MERGE INTO {glue_iceberg_table} AS old 
    USING {glue_temp_table} AS new 
    ON old.col1 = new.col1
    WHEN NOT MATCHED THEN  -- old.col1 과 new.col1 이 일치하지 않는 데이터일 경우 INSERT 진행
    INSERT (
        col1,
        col2,
        col3,
        year,
        month,
        day
    )
    VALUES (
        new.col1,
        new.col2,
        new.col3,
        year,
        month,
        day
    )
    WHEN MATCHED AND (new.col2 > old.col2 ) THEN -- old.col1 과 new.col1 이 일치하는 데이터일 경우 UPDATE 진행
    UPDATE SET 
        col1 = new.col1,
        col2 = new.col2,
        col3 = new.col3,
        year = new.year,
        month = new.month,
        day = new.day
"""

# Iceberg 테이블 UPSERT(MERGE INTO)
start_query_execution(database=glue_database,
                      workgroup=athena_workgroup,
                      sql=query
                      )

# 임시 Glue table 삭제
delete_table_if_exists(database=glue_database, table=glue_temp_table)


# 임시 S3 object 삭제
delete_objects(path=s3_temp_path)

Other flow


  • awswrangler.s3.to_parquet 대신 awswrangler.s3.store_parquet_metadata 를 이용하면 한 번 더 s3 temp path 에 parquet 를 upload 하는 과정 없이 s3_incremental_path 경로 데이터를 직접 바라보고 임시 Glue data catalog 테이블을 만들 수 있다.
  • 단, awswrangler.s3.store_parquet_metadata 를 사용할 경우 dataset=True 로 설정 하여도 year, month, day 와 같이 s3 경로에서 파티션으로 지정해둔 값들을 Glue table 에 넣어주지 않는다.
  • 나의 경우 s3 경로상에 존재하는 파티션 정보를 같이 활용해야하기 때문에 위 예제 코드와 같이 사용했지만
  • 필요 없는 경우에는 awswrangler.s3.store_parquet_metadata 를 이용하여 Flow 단계를 줄이고, 불필요한 s3 업로드 과정을 생략할 수 있다.
profile
안녕하세요. 데이터 엔지니어 김재민 입니다.

0개의 댓글