26A27d

Young-Kyoo Kim·2026년 4월 27일

MinIO AIStor Tables는 Apache Iceberg 테이블 포맷을 기반으로 하며, 객체 스토리지의 유연성과 관계형 데이터베이스의 강력한 쿼리 성능을 결합한 구조입니다. 제공해주신 문서를 바탕으로 Inventory API로 수집된 Parquet 파일을 Iceberg 기반 AIStor Table로 변환하고, 이를 Polaris 및 다양한 엔진(Trino, Spark)과 연계하여 활용하는 아키텍처를 설계해 드립니다.


1. AIStor Table 생성 및 전환 프로세스 (Pipeline Design)

Inventory API를 통해 수집된 Parquet 파일은 단순한 '객체' 상태입니다. 이를 고성능 분석이 가능한 'AIStor Table'로 승격시키는 과정입니다.

Step 1: Landing Zone 저장 (Inventory API)

  • 작업: Inventory API가 소스 시스템에서 데이터를 수집하여 MinIO의 landing-bucket에 Parquet 포맷으로 저장합니다.
  • 특징: 이 단계에서는 아직 Iceberg 메타데이터가 없는 순수 데이터 파일 상태입니다.

Step 2: Iceberg Table Schema 정의 및 생성

  • 작업: Spark 또는 Trino를 사용하여 AIStor 내에 Iceberg 테이블 껍데기(Schema)를 생성합니다.
  • 설정: Polaris Catalog를 사용하므로, 생성 시 catalog 타입을 polaris로 지정합니다.

Step 3: 데이터 로드 및 변환 (Ingestion to Iceberg)

단순 복사가 아닌 Iceberg의 메타데이터(Manifest, Snapshot)를 생성하며 데이터를 옮깁니다.

  • 방법: INSERT INTO aistor_table SELECT * FROM parquet_raw_files
  • 최적화: 이 과정에서 Partitioning(날짜, 부서 등)과 Sorting(Z-order 등)을 적용하여 향후 쿼리 시 스캔 범위를 최소화합니다.

2. 하부 인프라 및 거버넌스 연계 설계

앞서 논의된 Keycloak, Polaris, AIStor 환경을 녹여낸 통합 이용 방안입니다.

① 권한 및 보안 (Keycloak + Polaris)

  • RBAC 적용: 사용자가 Trino나 Spark에서 AIStor Table에 접근할 때, Keycloak에서 발급받은 토큰이 Polaris로 전달됩니다.
  • Access Control: Polaris는 Iceberg 테이블의 네임스페이스 및 테이블 단위로 권한을 체크하며, 승인된 경우에만 MinIO의 임시 자격 증명을 반환합니다.

② 엔진별 활용 (Trino, Spark, StarRocks)

  • Trino/StarRocks: 실시간 대시보드 및 고속 쿼리에 활용합니다. Iceberg의 Snapshot 기능을 이용해 특정 시점의 데이터로 쿼리(Time Travel)를 수행할 수 있습니다.
  • Spark: 대규모 배치 처리 및 AI 모델 학습용 데이터 전처리에 활용합니다. Iceberg 테이블에서 필요한 증분 데이터(Incremental Read)만 효율적으로 읽어옵니다.

③ 데이터 생애주기 관리 (ILM & Tiering 연계)

  • Hot Tier (NVMe): 최근 생성된 Iceberg 데이터와 메타데이터 파일들을 보관하여 3,000 RPS 수준의 고성능 I/O를 지원합니다.
  • Cold Tier (SATA SSD - Pool 2): Iceberg의 Partition Evolution 기능을 활용합니다. 오래된 날짜의 데이터 파티션 파일들만 Cold Pool로 이동(Tiering)시켜 저장 효율을 극대화합니다.

3. AIStor Table 운영 및 최적화 방안 (DevOps 가이드)

1,000노드 규모에서 AIStor Table의 성능을 유지하기 위한 핵심 운영 전략입니다.

관리 항목최적화 방안 (Best Practice)
Compaction수집 시 발생하는 작은 파티션/파일들을 주기적으로 병합하여 메타데이터 읽기 부하를 줄입니다. (Spark/Trino 활용)
Snapshot 익스파이어Iceberg는 버전 관리를 위해 Snapshot을 유지합니다. 너무 오래된 Snapshot은 삭제하여 스토리지 공간 낭비를 막습니다.
Manifest 관리쿼리 성능 향상을 위해 Manifest 파일의 크기와 개수를 모니터링하고 최적화합니다.
Schema Evolution비즈니스 로직 변경 시 데이터 재작성 없이 컬럼을 추가/수정하여 유연성을 확보합니다.

4. 기대 효과 및 활용 시나리오

  1. 초고속 데이터 탐색: 단순 객체 조회가 아닌 SQL 기반의 고성능 분석이 가능해지며, Iceberg의 메타데이터 필터링 덕분에 스토리지 부하가 획기적으로 줄어듭니다.
  2. 데이터 일관성 (ACID): 여러 엔진(Spark, Trino 등)이 동시에 데이터를 쓰고 읽어도 트랜잭션 보장 덕분에 데이터 오염이 발생하지 않습니다.
  3. Vibe Coding Zone 연계: 개발 환경에서 운영 환경의 특정 시점 Snapshot을 Read-only로 마운트하여 실제 데이터를 기반으로 안전하게 모델링 실험을 수행할 수 있습니다.

설계 제언

"Inventory API로 들어오는 원천 Parquet은 '재료'이고, AIStor Table은 이를 맛있게 요리한 '완성된 요리'입니다. 8월에 도입될 130대의 대규모 Cold Pool을 고려할 때, Partitioning 전략을 지금부터 세밀하게 설계해 두어야 나중에 대규모 데이터 이동 시 시스템 전체의 Latency 튀는 현상을 막을 수 있습니다."

이 과정에서 특정 쿼리 엔진(예: Trino)의 성능을 우선시하시나요? 그렇다면 그에 맞춘 구체적인 Iceberg Partitioning 튜닝 파라미터를 추가로 제안해 드릴 수 있습니다.

===

하부 페이지도 추가로 확인할게.이제 설계를 시작할게. AIStor Tables docs와 Inventory API 내용을 종합해서 전체 파이프라인을 설계할게.


Inventory Parquet → AIStor Iceberg Table 파이프라인 설계

전체 아키텍처

┌─────────────────────────────────────────────────────────────────┐
│                        Hot/Cold MinIO AIStor                     │
│                                                                   │
│  ① mc inventory generate                                         │
│     └── inventory-bucket/YYYY-MM-DD/*.parquet  (원시 데이터)      │
│                                                                   │
│  ② PyIceberg / Spark (ETL)                                       │
│     └── Parquet 읽기 → 스키마 정규화 → AIStor Tables에 쓰기       │
│                                                                   │
│  ③ AIStor Tables (Iceberg REST Catalog)                          │
│     warehouse: infra-analytics                                    │
│     └── namespace: inventory                                      │
│         ├── table: object_inventory     ← 증분 append              │
│         ├── table: tier_summary         ← daily rollup            │
│         └── table: ilm_candidates      ← ILM 대상 선별           │
│                                                                   │
│  ④ 쿼리 클라이언트                                                │
│     Trino / Spark / Dremio / PyIceberg                           │
└─────────────────────────────────────────────────────────────────┘

Step 1 — Inventory 수집 설정 (mc inventory)

# Inventory 설정 생성 (Hot 클러스터 전체 대상)
mc inventory generate HOT \
  --destination HOT/inventory-bucket \
  --schedule "daily"

# 상태 확인
mc inventory list HOT
mc inventory status HOT/<inventory-id>

# 수동 실행 (즉시 수집)
mc inventory resume HOT/<inventory-id>

생성된 Parquet 파일 구조:

inventory-bucket/
  └── YYYY-MM-DD/
      └── <bucket-name>/
          ├── part-00000.parquet   ← 오브젝트 메타 포함
          └── part-00001.parquet

Parquet 컬럼 주요 필드 (Inventory API 기준):

bucket, key, version_id, is_latest, is_delete_marker,
size, last_modified, e_tag, storage_class,
replication_status, encryption_status,
object_lock_mode, object_lock_retain_until_date

Step 2 — AIStor Tables 리소스 구성

2-1. Warehouse / Namespace / Table 생성

# Warehouse 생성 (analytics용)
mc table warehouse create HOT infra-analytics

# Namespace 생성
mc table namespace create HOT infra-analytics inventory

# 메인 테이블: object_inventory (전체 오브젝트 스냅샷)
mc table create HOT infra-analytics inventory object_inventory \
  --schema '{
    "type": "struct",
    "fields": [
      {"id":1,  "name":"snapshot_date",       "type":"date",    "required":true},
      {"id":2,  "name":"cluster",             "type":"string",  "required":true},
      {"id":3,  "name":"bucket",              "type":"string",  "required":true},
      {"id":4,  "name":"key",                 "type":"string",  "required":true},
      {"id":5,  "name":"size_bytes",          "type":"long",    "required":false},
      {"id":6,  "name":"last_modified",       "type":"timestamptz","required":false},
      {"id":7,  "name":"storage_class",       "type":"string",  "required":false},
      {"id":8,  "name":"is_latest",           "type":"boolean", "required":false},
      {"id":9,  "name":"is_delete_marker",    "type":"boolean", "required":false},
      {"id":10, "name":"replication_status",  "type":"string",  "required":false},
      {"id":11, "name":"version_id",          "type":"string",  "required":false},
      {"id":12, "name":"tier",                "type":"string",  "required":false}
    ]
  }'

# ILM 후보 테이블 (전환 대상 분석용)
mc table create HOT infra-analytics inventory ilm_candidates \
  --schema '{
    "type": "struct",
    "fields": [
      {"id":1, "name":"snapshot_date",   "type":"date",   "required":true},
      {"id":2, "name":"bucket",          "type":"string", "required":true},
      {"id":3, "name":"prefix",          "type":"string", "required":false},
      {"id":4, "name":"object_count",    "type":"long",   "required":false},
      {"id":5, "name":"total_size_bytes","type":"long",   "required":false},
      {"id":6, "name":"avg_age_days",    "type":"double", "required":false},
      {"id":7, "name":"storage_class",   "type":"string", "required":false},
      {"id":8, "name":"recommendation",  "type":"string", "required":false}
    ]
  }'

# 테이블 확인
mc table list HOT infra-analytics inventory

Step 3 — ETL: Parquet → AIStor Iceberg Table

PyIceberg 기반 ETL 스크립트

# inventory_to_iceberg.py
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import GreaterThanOrEqual
from datetime import date
import boto3
import os
from pathlib import Path

# ─────────────────────────────────────────────────────────
# 1. AIStor Iceberg Catalog 연결
# ─────────────────────────────────────────────────────────
catalog = load_catalog(
    "aistor",
    **{
        "uri": "http://hot-aistor.example.com:9000/_iceberg",
        "warehouse": "infra-analytics",
        "rest.sigv4-enabled": "true",
        "rest.signing-name": "s3tables",
        "rest.signing-region": "local",
        "s3.access-key-id": os.environ["MINIO_ACCESS_KEY"],
        "s3.secret-access-key": os.environ["MINIO_SECRET_KEY"],
        "s3.endpoint": "http://hot-aistor.example.com:9000"
    }
)

# ─────────────────────────────────────────────────────────
# 2. 오늘 날짜 Inventory Parquet 파일 수집
# ─────────────────────────────────────────────────────────
s3 = boto3.client(
    's3',
    endpoint_url='http://hot-aistor.example.com:9000',
    aws_access_key_id=os.environ["MINIO_ACCESS_KEY"],
    aws_secret_access_key=os.environ["MINIO_SECRET_KEY"]
)

today = date.today()
prefix = f"{today.isoformat()}/"

# inventory-bucket에서 오늘 날짜 parquet 목록 수집
response = s3.list_objects_v2(Bucket='inventory-bucket', Prefix=prefix)
parquet_keys = [
    obj['Key'] for obj in response.get('Contents', [])
    if obj['Key'].endswith('.parquet')
]

# ─────────────────────────────────────────────────────────
# 3. Parquet 읽기 + 스키마 정규화
# ─────────────────────────────────────────────────────────
dfs = []
for key in parquet_keys:
    local_path = f"/tmp/{Path(key).name}"
    s3.download_file('inventory-bucket', key, local_path)
    df = pd.read_parquet(local_path)
    dfs.append(df)

raw_df = pd.concat(dfs, ignore_index=True)

# 컬럼 정규화 및 파생 컬럼 추가
raw_df['snapshot_date'] = today
raw_df['cluster']       = 'hot-primary'
raw_df['tier']          = raw_df['storage_class'].apply(
    lambda x: 'cold' if str(x).startswith('COLD') else 'hot'
)
raw_df['size_bytes']    = raw_df['size'].fillna(0).astype('int64')
raw_df['last_modified'] = pd.to_datetime(raw_df['last_modified'], utc=True)

# 필요 컬럼만 선택
iceberg_df = raw_df[[
    'snapshot_date', 'cluster', 'bucket', 'key',
    'size_bytes', 'last_modified', 'storage_class',
    'is_latest', 'is_delete_marker',
    'replication_status', 'version_id', 'tier'
]]

# ─────────────────────────────────────────────────────────
# 4. PyArrow 변환 후 AIStor Iceberg Table에 Append
# ─────────────────────────────────────────────────────────
table = catalog.load_table("inventory.object_inventory")

arrow_table = pa.Table.from_pandas(iceberg_df)
table.append(arrow_table)

print(f"✅ {len(iceberg_df):,}건 → inventory.object_inventory append 완료")

# ─────────────────────────────────────────────────────────
# 5. ILM 후보 집계 → ilm_candidates 테이블 갱신
# ─────────────────────────────────────────────────────────
from datetime import datetime, timezone

now = datetime.now(timezone.utc)
iceberg_df['age_days'] = (now - iceberg_df['last_modified']).dt.days

# tier=hot & age > 30일 오브젝트만 집계
candidates = (
    iceberg_df[
        (iceberg_df['tier'] == 'hot') &
        (iceberg_df['age_days'] >= 30) &
        (iceberg_df['is_latest'] == True) &
        (iceberg_df['is_delete_marker'] != True)
    ]
    .assign(prefix=lambda df: df['key'].str.split('/').str[0])
    .groupby(['bucket', 'prefix', 'storage_class'])
    .agg(
        object_count=('key', 'count'),
        total_size_bytes=('size_bytes', 'sum'),
        avg_age_days=('age_days', 'mean')
    )
    .reset_index()
)

candidates['snapshot_date'] = today
candidates['recommendation'] = candidates.apply(
    lambda r: 'TRANSITION_TO_COLD' if r['avg_age_days'] >= 90
              else 'MONITOR', axis=1
)

ilm_table = catalog.load_table("inventory.ilm_candidates")
ilm_table.append(pa.Table.from_pandas(candidates))

print(f"✅ {len(candidates):,}건 → inventory.ilm_candidates append 완료")

Spark 기반 ETL (대용량 처리 시)

# spark_inventory_etl.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, datediff, current_date, split

spark = SparkSession.builder \
    .appName("InventoryToIceberg") \
    .config("spark.sql.catalog.aistor", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.aistor.catalog-impl",
            "org.apache.iceberg.rest.RESTCatalog") \
    .config("spark.sql.catalog.aistor.uri",
            "http://hot-aistor.example.com:9000/_iceberg") \
    .config("spark.sql.catalog.aistor.warehouse", "infra-analytics") \
    .config("spark.sql.catalog.aistor.rest.sigv4-enabled", "true") \
    .config("spark.sql.catalog.aistor.rest.signing-name", "s3tables") \
    .config("spark.sql.catalog.aistor.rest.signing-region", "local") \
    .config("spark.sql.catalog.aistor.rest.access-key-id", "MINIO_ACCESS_KEY") \
    .config("spark.sql.catalog.aistor.rest.secret-access-key", "MINIO_SECRET_KEY") \
    .config("spark.hadoop.fs.s3a.endpoint",
            "http://hot-aistor.example.com:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .getOrCreate()

TODAY = "2025-05-01"

# 오늘 Inventory Parquet 읽기
raw = spark.read.parquet(
    f"s3a://inventory-bucket/{TODAY}/**/*.parquet"
)

# 정규화
enriched = raw \
    .withColumn("snapshot_date", lit(TODAY).cast("date")) \
    .withColumn("cluster",       lit("hot-primary")) \
    .withColumn("size_bytes",    col("size").cast("long")) \
    .withColumn("tier",
        when(col("storage_class").startswith("COLD"), "cold")
        .otherwise("hot")
    )

# Iceberg Table에 Append
enriched.select(
    "snapshot_date", "cluster", "bucket", "key",
    "size_bytes", "last_modified", "storage_class",
    "is_latest", "is_delete_marker",
    "replication_status", "version_id", "tier"
).writeTo("aistor.inventory.object_inventory").append()

print("✅ object_inventory append 완료")

# ILM 후보 집계
spark.sql("""
    INSERT INTO aistor.inventory.ilm_candidates
    SELECT
        CAST('{today}' AS DATE) AS snapshot_date,
        bucket,
        split(key, '/')[0] AS prefix,
        COUNT(*)             AS object_count,
        SUM(size_bytes)      AS total_size_bytes,
        AVG(datediff(current_date(), DATE(last_modified))) AS avg_age_days,
        storage_class,
        CASE
          WHEN AVG(datediff(current_date(), DATE(last_modified))) >= 90
          THEN 'TRANSITION_TO_COLD'
          ELSE 'MONITOR'
        END AS recommendation
    FROM aistor.inventory.object_inventory
    WHERE snapshot_date = CAST('{today}' AS DATE)
      AND tier = 'hot'
      AND is_latest = true
      AND is_delete_marker = false
      AND datediff(current_date(), DATE(last_modified)) >= 30
    GROUP BY bucket, split(key, '/')[0], storage_class
""".format(today=TODAY))

Step 4 — 활용 쿼리 설계 (Trino 기준)

4-1. Tier별 용량 현황

-- Hot/Cold 용량 및 오브젝트 수 현황
SELECT
    snapshot_date,
    cluster,
    tier,
    storage_class,
    COUNT(*)                          AS object_count,
    SUM(size_bytes) / 1e15            AS size_petabytes,
    AVG(size_bytes) / 1e6             AS avg_size_mb
FROM aistor.inventory.object_inventory
WHERE snapshot_date = CURRENT_DATE
GROUP BY snapshot_date, cluster, tier, storage_class
ORDER BY size_petabytes DESC;

4-2. ILM 전환 우선순위 대상 식별

-- Cold로 내릴 수 있는 Hot 데이터 (30일 이상 미접근 추정)
SELECT
    bucket,
    prefix,
    object_count,
    ROUND(total_size_bytes / 1e12, 2)  AS size_tb,
    ROUND(avg_age_days)                AS avg_age_days,
    recommendation
FROM aistor.inventory.ilm_candidates
WHERE snapshot_date = CURRENT_DATE
  AND recommendation = 'TRANSITION_TO_COLD'
ORDER BY total_size_bytes DESC
LIMIT 50;

4-3. 버킷별 Cold 전환 시뮬레이션

-- 30/60/90일 기준 전환 시 용량 절감 효과
SELECT
    bucket,
    SUM(CASE WHEN age_days >= 30 THEN size_bytes ELSE 0 END) / 1e12 AS cold_30d_tb,
    SUM(CASE WHEN age_days >= 60 THEN size_bytes ELSE 0 END) / 1e12 AS cold_60d_tb,
    SUM(CASE WHEN age_days >= 90 THEN size_bytes ELSE 0 END) / 1e12 AS cold_90d_tb,
    SUM(size_bytes) / 1e12                                           AS total_tb
FROM (
    SELECT
        bucket,
        size_bytes,
        datediff(day, date(last_modified), current_date) AS age_days
    FROM aistor.inventory.object_inventory
    WHERE snapshot_date = CURRENT_DATE
      AND tier = 'hot'
      AND is_latest = true
      AND is_delete_marker = false
)
GROUP BY bucket
ORDER BY cold_90d_tb DESC;

4-4. 날짜별 증가 추이 (Iceberg Time Travel 활용)

-- 주간 용량 증가 트렌드 (Iceberg의 snapshot 이력 활용)
SELECT
    snapshot_date,
    tier,
    SUM(size_bytes) / 1e15 AS total_petabytes
FROM aistor.inventory.object_inventory
WHERE snapshot_date >= CURRENT_DATE - INTERVAL '30' DAY
GROUP BY snapshot_date, tier
ORDER BY snapshot_date, tier;

Step 5 — 주기적 실행 자동화

# crontab 예시: 매일 새벽 3시 실행
# 1. Inventory 생성 → 완료 대기
# 2. ETL 실행 → Iceberg Table append
# 3. 오래된 raw parquet 정리

# /etc/cron.d/minio-inventory-etl
0 3 * * * root /opt/scripts/run_inventory_etl.sh >> /var/log/inventory_etl.log 2>&1
#!/bin/bash
# run_inventory_etl.sh

DATE=$(date +%Y-%m-%d)
echo "[$DATE] Inventory ETL 시작"

# 1. Inventory 완료 확인
mc inventory status HOT/<inventory-id> | grep -q "Completed" || {
  echo "Inventory 아직 미완료, 재시도"
  exit 1
}

# 2. ETL 실행
python3 /opt/scripts/inventory_to_iceberg.py

# 3. 이전 Raw Parquet 정리 (30일 이전)
mc find HOT/inventory-bucket \
  --older-than 30d \
  --name "*.parquet" | xargs -I{} mc rm HOT/{}

echo "[$DATE] ETL 완료"

활용 방안 종합

AIStor Tables는 외부 카탈로그 서비스나 메타데이터 DB 없이 MinIO AIStor 내부에서 직접 Iceberg 테이블을 생성·관리·쿼리할 수 있고, Spark, Trino, Dremio, Starburst 같은 클라이언트에서 Iceberg REST Catalog나 S3 API 양쪽으로 접근 가능하다.

이를 활용한 용도를 구체적으로 정리하면:

활용 분야구체 쿼리 내용의사결정 지원
ILM 정책 수립버킷/prefix별 age 분포 분석30/60/90일 전환 기준 수치 근거 확보
Cold Tier 용량 산정pool0~pool2 수용 가능 용량 vs 전환 대상5월/8월 구축 규모 검증
Hot Tier 공간 확보전환 후 Hot 잔여 용량 예측Hot 클러스터 증설 여부 판단
ILM 전환 모니터링storage_class 변화 추이실제 Cold 전환 검증
Delta Sharing 활용Delta Sharing Protocol로 Iceberg 테이블을 Databricks 등 외부에 공유 가능고객사 보고/대시보드 연동

핵심 주의사항

AIStor Tables는 테이블 위치(location)를 MinIO가 직접 관리하며, 사용자가 커스텀 위치를 지정할 수 없음 → Parquet을 직접 테이블 위치에 복사하는 방식 사용 불가, 반드시 PyIceberg/Spark 등 클라이언트 라이브러리를 통한 append 방식 사용

커밋은 낙관적 동시성 제어(optimistic locking)로 처리되며, 충돌 시 지수 백오프로 재시도 권장 — 병렬 ETL 실행 시 주의

AIStor Tables 기능은 minio RELEASE.2026-02-02T23-40-11Z 이상에서 지원 — 버전 확인 필수

0개의 댓글