PySpark 에서 AWS Glue data catalog 에 등록 되어 있는 Iceberg table 로드하는 방법

김재민·2025년 2월 7일
post-thumbnail

Background


  • AWS Glue data catalog 에서 Iceberg table 을 관리 중이다.
  • Iceberg 도 처음 사용해보고, PySpark 에서 테이블 Read 하는 것도 처음 시도 하여서 관련 내용을 기록하려 한다.

Local 작업 시


  • Mac OS M1 환경에서 Docker 기반 Glue job(spark) 작업 환경을 구성하고 Iceberg 테이블을 Read 할 수 있다.

Docker 기반 Glue Job(spark) 작업 환경 구성

$ export MY_WORK_DIR='나의 작업 경로'

$ docker run -it \
  -v ~/.aws:/home/glue_user/.aws \
  -v $MY_WORK_DIR:/home/glue_user/workspace/jupyter_workspace/ \
  -e DISABLE_SSL=true \
  -e DATALAKE_FORMATS=iceberg \
  -p 4040:4040 \
  -p 18080:18080 \
  -p 8998:8998 \
  -p 8888:8888 \
  --name glue_jupyter_lab \
  --restart always \
  amazon/aws-glue-libs:glue_libs_4.0.0_image_01 \
  /home/glue_user/jupyter/jupyter_start.sh
  
# 컨테이너가 실행된 후 localhost:8888/lab 포트로 들어가서 Jupyter lab 을 사용하면 된다.

AWS 인증 권한 설정

  • AWS S3 등에 접근해야 하므로 아래와 같이 Jupyter lab 에서 환경변수를 추가해 두자.(서버에서 직접 추가해도 된다.)
!export AWS_ACCESS_KEY_ID="MY_AWS_ACCESS_KEY_ID"
!export AWS_SECRET_ACCESS_KEY="MY_AWS_SECRET_ACCESS_KEY"
!export AWS_SESSION_TOKEN="MY_AWS_SESSION_TOKEN"

PySpark config & Read iceberg table


  • Spark config 에서 my_catalog 로 되어 있는 부분은 코드 실행 시 내부적으로 등록해서 임시로 사용할 catalog 이름이다. 커스텀하게 사용하면 된다.
  • spark.sql.catalog.my_catalog.warehouse 옵션에는 iceberg 테이블의 s3 location 또는 테이블들이 포함 되어 있는 경로 정도로 지정해주면 되겠다.
from awsglue.transforms import *
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark import SparkConf
from pyspark.sql import SparkSession

my_iceberg_warehouse_location="s3://my_bucket/my_prefix/"
my_region = "ap-northeast-2"
my_s3_endpoint = f"s3.{my_region}.amazonaws.com"

spark = SparkSession.builder \
    .config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.my_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.my_catalog.warehouse", my_iceberg_warehouse_location) \
    .config("spark.sql.catalog.my_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.hadoop.fs.s3a.endpoint", my_s3_endpoint \
    .config("spark.sql.iceberg.handle-timestamp-without-timezone", "true") \
    .getOrCreate()

glueContext = GlueContext(spark.sparkContext)
spark = glueContext.spark_session
job = Job(glueContext)


# Glue Data Catalog에 등록된 Iceberg 테이블 읽기
# 테이블 참조 방식: <catalog>.<database>.<table>
table_identifier = "my_catalog.my_database.my_iceberg_table"

# 방법 1: Spark DataFrameReader를 이용하여 읽기
df = spark.read.format("iceberg").load(table_identifier)

df.show(5)

>>>> 출력 값
+---+----------+
|id |name      |
+---+----------+
|1 |BASEBALL   |
|2 |BASKETBALL |
|3 |SOCCER     |
|4 |MMA        |
|5 |SWIMMING   |
+---+----------+
only showing top 5 rows

만약 Iceberg 관련 패키지 종속성 오류가 발생 할경우 아래 두 옵션을 추가해주는 형태로 대응할 수 있다.

spark = SparkSession.builder \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.7.1,org.apache.iceberg:iceberg-aws-bundle:1.7.1") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \

AWS Glue Job 작업 시


  • 대부분의 코드 내용은 로컬 작업과 동일하다.
  • Glue job 에 사용되는 IAM Role 에 Iceberg table s3 location 에 s3:GetObject 등의 권한이 필요하니 Policy 를 추가해 두어야 한다.
  • Glue job parameter 로 --datalake-formats 값을 iceberg 로 추가해서 실행 하면 된다.

참고 문서

profile
안녕하세요. 데이터 엔지니어 김재민 입니다.

0개의 댓글