Spark, Iceberg

Jeonghak Cho·2025년 3월 31일

Spark

목록 보기
8/12

📗아파치 아이스버그 활용

Apache Iceberg는 데이터 레이크를 위한 현대적인 테이블 포맷으로, 기존 Hive 기반의 데이터 관리 문제를 해결하고, ACID 트랜잭션, 스키마 진화, 타임 트래블, 최적화된 데이터 스캔 등의 기능을 제공한다.
Spark, Trino, Flink 등 다양한 엔진과 통합할 수 있어 데이터 엔지니어링, 분석, AI/ML 파이프라인 등에 폭넓게 활용될 수 있다.

🏳️‍🌈 [궁금한점]

  • Iceberg에 대한 이해, 활용 방법
    • 활용사례
    • DDM
    • DML
    • 기존 SQL과 차이나는 기능 ( 스냅샷, 칼럼추가 )

🔗[목차]

Apache Iceberg 개요

Apache Iceberg는 대규모 데이터 레이크에서 트랜잭션 기능, 스키마 진화, 타임 트래블 등을 지원하는 오픈소스 테이블 포맷이다. 기존의 Hive 테이블 포맷이 가진 성능 및 확장성 문제를 해결하기 위해 Netflix에서 개발하였으며, 현재는 Apache Software Foundation의 오픈소스 프로젝트로 운영되고 있다.

주요 특징

  • ACID 트랜잭션 지원
    Iceberg는 스냅샷 기반의 트랜잭션 모델을 사용하여 원자성(Atomicity), 일관성(Consistency), 고립성(Isolation), 지속성(Durability) (ACID) 트랜잭션을 보장한다.

여러 사용자가 동시에 데이터를 읽고 쓰더라도 일관된 데이터를 유지할 수 있다.

  • 스키마 진화(Schema Evolution)
    Iceberg는 테이블을 재생성하지 않고도 컬럼을 추가, 삭제, 변경할 수 있다.

예를 들어, ALTER TABLE ADD COLUMN 또는 DROP COLUMN 같은 연산이 기존 데이터에 영향을 주지 않는다.

  • 타임 트래블(Time Travel) 및 버전 관리
    Iceberg는 스냅샷(snapshot) 기반의 데이터 관리를 제공하여 특정 시점의 데이터를 조회할 수 있다.

이를 통해 데이터 정합성 유지뿐만 아니라 데이터 롤백 및 재현이 가능하다.

  • 효율적인 데이터 스캔 (Hidden Partitioning)
    Iceberg는 파일을 직접 탐색하지 않고도 빠르게 데이터 필터링할 수 있도록 설계되었다.

Hive와 달리 파티션 열을 명시적으로 저장하지 않고도 최적화된 스캔을 수행한다.

결과적으로 쿼리 성능이 향상되고, 중복된 데이터 스캔이 줄어든다.

  • 다양한 데이터 엔진과 통합 가능
    Iceberg는 Spark, Trino, Presto, Hive, Flink 등 여러 데이터 처리 엔진과 호환된다.

기존 Hive Metastore(HMS) 또는 새로운 Iceberg Catalog를 통해 테이블을 관리할 수 있다.

Apache Iceberg vs 기존 테이블 포맷 비교

비교 항목Apache IcebergApache HiveDelta Lake
ACID 트랜잭션(지원)(제한적)(지원)
스키마 진화(완전 지원)(제한적)(지원)
타임 트래블(스냅샷 기반)(미지원)(지원)
파티션 관리(자동 최적화)(수동 관리 필요)(지원)
파일 형식Parquet, ORC, AvroParquet, ORCParquet
데이터 처리 엔진Spark, Flink, Trino, Presto 등Hive 중심Spark 중심

활용 사례

대규모 데이터 레이크 관리

Iceberg는 S3, HDFS, Google Cloud Storage(GCS), Azure Blob Storage 등 다양한 클라우드 및 온프레미스 스토리지에서 대용량 데이터를 효율적으로 관리할 수 있다.

실시간 분석 및 데이터 웨어하우스 대체

Spark, Trino, Flink 등과 함께 사용하여 데이터 레이크 기반의 실시간 분석이 가능.
기존의 데이터 웨어하우스(Hive, Redshift, Snowflake)를 대체하는 솔루션으로 활용 가능.

CDC(Change Data Capture) 및 데이터 롤백

과거 데이터 조회(Time Travel) 및 데이터 롤백 기능을 통해 데이터 정합성을 유지하면서 변경 사항을 추적 가능.

머신러닝 및 AI 데이터 파이프라인

버전 관리 및 스냅샷 기능을 활용하여 모델 학습 데이터를 재현할 수 있음.

Apache Iceberg 설치 및 사용 예제

spark-shell 에서 Iceberg 사용하기

spark-shell \
--packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.4.2 \
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.type=hadoop \
--conf spark.sql.catalog.my_catalog.warehouse=s3a://mybucket/ \
--conf spark.hadoop.fs.s3a.access.key=4Kj6Q3rWIrab0PVHGQLh \
--conf spark.hadoop.fs.s3a.secret.key=j980yntyJFvGQuvBC6ItSmeGpA0O3usxSmzHFvG8 \
--conf spark.hadoop.fs.s3a.endpoint=http://172.18.0.3:9000 \
--conf spark.hadoop.fs.s3a.connection.maximum=100 \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.multipart.size=104857600 \
--conf spark.hadoop.fs.s3a.maxConnections=100 \
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=false \
--conf spark.hadoop.fs.s3a.fast.upload=true

설정 조회

spark.conf.getAll.foreach(println)
  • 카탈로그 설정
    spark.sql("SET spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog")
    spark.sql("SET spark.sql.catalog.my_catalog.type=hadoop")

테이블 생성

scala> spark.sql("DROP TABLE IF EXISTS my_catalog.my_db.my_table")

scala> spark.sql("CREATE DATABASE IF NOT EXISTS my_catalog.my_db")

scala> spark.sql("SHOW TABLES IN my_db").show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+

scala> spark.sql("""
     |   CREATE TABLE IF NOT EXISTS my_catalog.my_db.my_table (
     |     id INT,
     |     name STRING
     |   ) USING iceberg
     | """)
res2: org.apache.spark.sql.DataFrame = []

scala> spark.sql("SHOW TABLES IN my_catalog.my_db").show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|    my_db| my_table|      false|
+---------+---------+-----------+

spark.sql("SELECT * FROM my_catalog.my_db.my_table").show()

스키마 보기

scala> spark.sql("DESCRIBE TABLE my_catalog.my_db.my_table").show(false)
+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|id      |int      |NULL   |
|name    |string   |NULL   |
+--------+---------+-------+

데이터 삽입

scala>  spark.sql("INSERT INTO my_catalog.my_db.my_table VALUES (1, 'Alice')")
res0: org.apache.spark.sql.DataFrame = []

scala> spark.sql("SELECT * FROM my_catalog.my_db.my_table").show()
+---+-----+
| id| name|
+---+-----+
|  1|Alice|
+---+-----+

타임 트래블을 이용한 특정 시점 데이터 조회

spark.sql("SELECT * FROM my_catalog.my_db.my_table TIMESTAMP AS OF '2024-03-01 12:00:00'").show()

스키마 변경 (컬럼 추가)

spark.sql("ALTER TABLE my_catalog.my_db.my_table ADD COLUMN email STRING")

scala> spark.sql("DESCRIBE TABLE my_catalog.my_db.my_table").show(false)
+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|id      |int      |NULL   |
|name    |string   |NULL   |
|email   |string   |NULL   |
+--------+---------+-------+

버킷 확인

UI 콘솔 확인

  • 데이터

  • 메터 데이터

파일 시스템

ubuntu@DESKTOP-SCOK45O:~/minio/data$ tree
.
└── mybucket
    └── my_db
        └── my_table
            ├── data
            │   ├── 00000-0-31e1b362-ace4-4806-af1d-b247621cc3bf-00001.parquet
            │   │   └── xl.meta
            │   └── 00000-0-4b344c9a-383f-4b1a-b0cb-cad8c4a306ac-00001.parquet
            │       └── xl.meta
            └── metadata
                ├── 03648673-bbad-4d0a-96d6-4ea75f3aa19b-m0.avro
                │   └── xl.meta
                ├── 98095ae0-f432-483e-85c1-2fc88473c27e-m0.avro
                │   └── xl.meta
                ├── snap-1805372406519592918-1-03648673-bbad-4d0a-96d6-4ea75f3aa19b.avro
                │   └── xl.meta
                ├── snap-5357614036041547018-1-98095ae0-f432-483e-85c1-2fc88473c27e.avro
                │   └── xl.meta
                ├── v1.metadata.json
                │   └── xl.meta
                ├── v2.metadata.json
                │   └── xl.meta
                ├── v3.metadata.json
                │   └── xl.meta
                ├── v4.metadata.json
                │   └── xl.meta
                ├── v5.metadata.json
                │   └── xl.meta
                └── version-hint.text
                    └── xl.meta
108 directories, 98 files

0개의 댓글