MinIO AIStor Tables는 Apache Iceberg 테이블 포맷을 기반으로 하며, 객체 스토리지의 유연성과 관계형 데이터베이스의 강력한 쿼리 성능을 결합한 구조입니다. 제공해주신 문서를 바탕으로 Inventory API로 수집된 Parquet 파일을 Iceberg 기반 AIStor Table로 변환하고, 이를 Polaris 및 다양한 엔진(Trino, Spark)과 연계하여 활용하는 아키텍처를 설계해 드립니다.
Inventory API를 통해 수집된 Parquet 파일은 단순한 '객체' 상태입니다. 이를 고성능 분석이 가능한 'AIStor Table'로 승격시키는 과정입니다.
landing-bucket에 Parquet 포맷으로 저장합니다.catalog 타입을 polaris로 지정합니다.단순 복사가 아닌 Iceberg의 메타데이터(Manifest, Snapshot)를 생성하며 데이터를 옮깁니다.
INSERT INTO aistor_table SELECT * FROM parquet_raw_files앞서 논의된 Keycloak, Polaris, AIStor 환경을 녹여낸 통합 이용 방안입니다.
1,000노드 규모에서 AIStor Table의 성능을 유지하기 위한 핵심 운영 전략입니다.
| 관리 항목 | 최적화 방안 (Best Practice) |
|---|---|
| Compaction | 수집 시 발생하는 작은 파티션/파일들을 주기적으로 병합하여 메타데이터 읽기 부하를 줄입니다. (Spark/Trino 활용) |
| Snapshot 익스파이어 | Iceberg는 버전 관리를 위해 Snapshot을 유지합니다. 너무 오래된 Snapshot은 삭제하여 스토리지 공간 낭비를 막습니다. |
| Manifest 관리 | 쿼리 성능 향상을 위해 Manifest 파일의 크기와 개수를 모니터링하고 최적화합니다. |
| Schema Evolution | 비즈니스 로직 변경 시 데이터 재작성 없이 컬럼을 추가/수정하여 유연성을 확보합니다. |
"Inventory API로 들어오는 원천 Parquet은 '재료'이고, AIStor Table은 이를 맛있게 요리한 '완성된 요리'입니다. 8월에 도입될 130대의 대규모 Cold Pool을 고려할 때, Partitioning 전략을 지금부터 세밀하게 설계해 두어야 나중에 대규모 데이터 이동 시 시스템 전체의 Latency 튀는 현상을 막을 수 있습니다."
이 과정에서 특정 쿼리 엔진(예: Trino)의 성능을 우선시하시나요? 그렇다면 그에 맞춘 구체적인 Iceberg Partitioning 튜닝 파라미터를 추가로 제안해 드릴 수 있습니다.
===
하부 페이지도 추가로 확인할게.이제 설계를 시작할게. AIStor Tables docs와 Inventory API 내용을 종합해서 전체 파이프라인을 설계할게.
┌─────────────────────────────────────────────────────────────────┐
│ Hot/Cold MinIO AIStor │
│ │
│ ① mc inventory generate │
│ └── inventory-bucket/YYYY-MM-DD/*.parquet (원시 데이터) │
│ │
│ ② PyIceberg / Spark (ETL) │
│ └── Parquet 읽기 → 스키마 정규화 → AIStor Tables에 쓰기 │
│ │
│ ③ AIStor Tables (Iceberg REST Catalog) │
│ warehouse: infra-analytics │
│ └── namespace: inventory │
│ ├── table: object_inventory ← 증분 append │
│ ├── table: tier_summary ← daily rollup │
│ └── table: ilm_candidates ← ILM 대상 선별 │
│ │
│ ④ 쿼리 클라이언트 │
│ Trino / Spark / Dremio / PyIceberg │
└─────────────────────────────────────────────────────────────────┘
# Inventory 설정 생성 (Hot 클러스터 전체 대상)
mc inventory generate HOT \
--destination HOT/inventory-bucket \
--schedule "daily"
# 상태 확인
mc inventory list HOT
mc inventory status HOT/<inventory-id>
# 수동 실행 (즉시 수집)
mc inventory resume HOT/<inventory-id>
생성된 Parquet 파일 구조:
inventory-bucket/
└── YYYY-MM-DD/
└── <bucket-name>/
├── part-00000.parquet ← 오브젝트 메타 포함
└── part-00001.parquet
Parquet 컬럼 주요 필드 (Inventory API 기준):
bucket, key, version_id, is_latest, is_delete_marker,
size, last_modified, e_tag, storage_class,
replication_status, encryption_status,
object_lock_mode, object_lock_retain_until_date
# Warehouse 생성 (analytics용)
mc table warehouse create HOT infra-analytics
# Namespace 생성
mc table namespace create HOT infra-analytics inventory
# 메인 테이블: object_inventory (전체 오브젝트 스냅샷)
mc table create HOT infra-analytics inventory object_inventory \
--schema '{
"type": "struct",
"fields": [
{"id":1, "name":"snapshot_date", "type":"date", "required":true},
{"id":2, "name":"cluster", "type":"string", "required":true},
{"id":3, "name":"bucket", "type":"string", "required":true},
{"id":4, "name":"key", "type":"string", "required":true},
{"id":5, "name":"size_bytes", "type":"long", "required":false},
{"id":6, "name":"last_modified", "type":"timestamptz","required":false},
{"id":7, "name":"storage_class", "type":"string", "required":false},
{"id":8, "name":"is_latest", "type":"boolean", "required":false},
{"id":9, "name":"is_delete_marker", "type":"boolean", "required":false},
{"id":10, "name":"replication_status", "type":"string", "required":false},
{"id":11, "name":"version_id", "type":"string", "required":false},
{"id":12, "name":"tier", "type":"string", "required":false}
]
}'
# ILM 후보 테이블 (전환 대상 분석용)
mc table create HOT infra-analytics inventory ilm_candidates \
--schema '{
"type": "struct",
"fields": [
{"id":1, "name":"snapshot_date", "type":"date", "required":true},
{"id":2, "name":"bucket", "type":"string", "required":true},
{"id":3, "name":"prefix", "type":"string", "required":false},
{"id":4, "name":"object_count", "type":"long", "required":false},
{"id":5, "name":"total_size_bytes","type":"long", "required":false},
{"id":6, "name":"avg_age_days", "type":"double", "required":false},
{"id":7, "name":"storage_class", "type":"string", "required":false},
{"id":8, "name":"recommendation", "type":"string", "required":false}
]
}'
# 테이블 확인
mc table list HOT infra-analytics inventory
# inventory_to_iceberg.py
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import GreaterThanOrEqual
from datetime import date
import boto3
import os
from pathlib import Path
# ─────────────────────────────────────────────────────────
# 1. AIStor Iceberg Catalog 연결
# ─────────────────────────────────────────────────────────
catalog = load_catalog(
"aistor",
**{
"uri": "http://hot-aistor.example.com:9000/_iceberg",
"warehouse": "infra-analytics",
"rest.sigv4-enabled": "true",
"rest.signing-name": "s3tables",
"rest.signing-region": "local",
"s3.access-key-id": os.environ["MINIO_ACCESS_KEY"],
"s3.secret-access-key": os.environ["MINIO_SECRET_KEY"],
"s3.endpoint": "http://hot-aistor.example.com:9000"
}
)
# ─────────────────────────────────────────────────────────
# 2. 오늘 날짜 Inventory Parquet 파일 수집
# ─────────────────────────────────────────────────────────
s3 = boto3.client(
's3',
endpoint_url='http://hot-aistor.example.com:9000',
aws_access_key_id=os.environ["MINIO_ACCESS_KEY"],
aws_secret_access_key=os.environ["MINIO_SECRET_KEY"]
)
today = date.today()
prefix = f"{today.isoformat()}/"
# inventory-bucket에서 오늘 날짜 parquet 목록 수집
response = s3.list_objects_v2(Bucket='inventory-bucket', Prefix=prefix)
parquet_keys = [
obj['Key'] for obj in response.get('Contents', [])
if obj['Key'].endswith('.parquet')
]
# ─────────────────────────────────────────────────────────
# 3. Parquet 읽기 + 스키마 정규화
# ─────────────────────────────────────────────────────────
dfs = []
for key in parquet_keys:
local_path = f"/tmp/{Path(key).name}"
s3.download_file('inventory-bucket', key, local_path)
df = pd.read_parquet(local_path)
dfs.append(df)
raw_df = pd.concat(dfs, ignore_index=True)
# 컬럼 정규화 및 파생 컬럼 추가
raw_df['snapshot_date'] = today
raw_df['cluster'] = 'hot-primary'
raw_df['tier'] = raw_df['storage_class'].apply(
lambda x: 'cold' if str(x).startswith('COLD') else 'hot'
)
raw_df['size_bytes'] = raw_df['size'].fillna(0).astype('int64')
raw_df['last_modified'] = pd.to_datetime(raw_df['last_modified'], utc=True)
# 필요 컬럼만 선택
iceberg_df = raw_df[[
'snapshot_date', 'cluster', 'bucket', 'key',
'size_bytes', 'last_modified', 'storage_class',
'is_latest', 'is_delete_marker',
'replication_status', 'version_id', 'tier'
]]
# ─────────────────────────────────────────────────────────
# 4. PyArrow 변환 후 AIStor Iceberg Table에 Append
# ─────────────────────────────────────────────────────────
table = catalog.load_table("inventory.object_inventory")
arrow_table = pa.Table.from_pandas(iceberg_df)
table.append(arrow_table)
print(f"✅ {len(iceberg_df):,}건 → inventory.object_inventory append 완료")
# ─────────────────────────────────────────────────────────
# 5. ILM 후보 집계 → ilm_candidates 테이블 갱신
# ─────────────────────────────────────────────────────────
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
iceberg_df['age_days'] = (now - iceberg_df['last_modified']).dt.days
# tier=hot & age > 30일 오브젝트만 집계
candidates = (
iceberg_df[
(iceberg_df['tier'] == 'hot') &
(iceberg_df['age_days'] >= 30) &
(iceberg_df['is_latest'] == True) &
(iceberg_df['is_delete_marker'] != True)
]
.assign(prefix=lambda df: df['key'].str.split('/').str[0])
.groupby(['bucket', 'prefix', 'storage_class'])
.agg(
object_count=('key', 'count'),
total_size_bytes=('size_bytes', 'sum'),
avg_age_days=('age_days', 'mean')
)
.reset_index()
)
candidates['snapshot_date'] = today
candidates['recommendation'] = candidates.apply(
lambda r: 'TRANSITION_TO_COLD' if r['avg_age_days'] >= 90
else 'MONITOR', axis=1
)
ilm_table = catalog.load_table("inventory.ilm_candidates")
ilm_table.append(pa.Table.from_pandas(candidates))
print(f"✅ {len(candidates):,}건 → inventory.ilm_candidates append 완료")
# spark_inventory_etl.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, datediff, current_date, split
spark = SparkSession.builder \
.appName("InventoryToIceberg") \
.config("spark.sql.catalog.aistor", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.aistor.catalog-impl",
"org.apache.iceberg.rest.RESTCatalog") \
.config("spark.sql.catalog.aistor.uri",
"http://hot-aistor.example.com:9000/_iceberg") \
.config("spark.sql.catalog.aistor.warehouse", "infra-analytics") \
.config("spark.sql.catalog.aistor.rest.sigv4-enabled", "true") \
.config("spark.sql.catalog.aistor.rest.signing-name", "s3tables") \
.config("spark.sql.catalog.aistor.rest.signing-region", "local") \
.config("spark.sql.catalog.aistor.rest.access-key-id", "MINIO_ACCESS_KEY") \
.config("spark.sql.catalog.aistor.rest.secret-access-key", "MINIO_SECRET_KEY") \
.config("spark.hadoop.fs.s3a.endpoint",
"http://hot-aistor.example.com:9000") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.getOrCreate()
TODAY = "2025-05-01"
# 오늘 Inventory Parquet 읽기
raw = spark.read.parquet(
f"s3a://inventory-bucket/{TODAY}/**/*.parquet"
)
# 정규화
enriched = raw \
.withColumn("snapshot_date", lit(TODAY).cast("date")) \
.withColumn("cluster", lit("hot-primary")) \
.withColumn("size_bytes", col("size").cast("long")) \
.withColumn("tier",
when(col("storage_class").startswith("COLD"), "cold")
.otherwise("hot")
)
# Iceberg Table에 Append
enriched.select(
"snapshot_date", "cluster", "bucket", "key",
"size_bytes", "last_modified", "storage_class",
"is_latest", "is_delete_marker",
"replication_status", "version_id", "tier"
).writeTo("aistor.inventory.object_inventory").append()
print("✅ object_inventory append 완료")
# ILM 후보 집계
spark.sql("""
INSERT INTO aistor.inventory.ilm_candidates
SELECT
CAST('{today}' AS DATE) AS snapshot_date,
bucket,
split(key, '/')[0] AS prefix,
COUNT(*) AS object_count,
SUM(size_bytes) AS total_size_bytes,
AVG(datediff(current_date(), DATE(last_modified))) AS avg_age_days,
storage_class,
CASE
WHEN AVG(datediff(current_date(), DATE(last_modified))) >= 90
THEN 'TRANSITION_TO_COLD'
ELSE 'MONITOR'
END AS recommendation
FROM aistor.inventory.object_inventory
WHERE snapshot_date = CAST('{today}' AS DATE)
AND tier = 'hot'
AND is_latest = true
AND is_delete_marker = false
AND datediff(current_date(), DATE(last_modified)) >= 30
GROUP BY bucket, split(key, '/')[0], storage_class
""".format(today=TODAY))
-- Hot/Cold 용량 및 오브젝트 수 현황
SELECT
snapshot_date,
cluster,
tier,
storage_class,
COUNT(*) AS object_count,
SUM(size_bytes) / 1e15 AS size_petabytes,
AVG(size_bytes) / 1e6 AS avg_size_mb
FROM aistor.inventory.object_inventory
WHERE snapshot_date = CURRENT_DATE
GROUP BY snapshot_date, cluster, tier, storage_class
ORDER BY size_petabytes DESC;
-- Cold로 내릴 수 있는 Hot 데이터 (30일 이상 미접근 추정)
SELECT
bucket,
prefix,
object_count,
ROUND(total_size_bytes / 1e12, 2) AS size_tb,
ROUND(avg_age_days) AS avg_age_days,
recommendation
FROM aistor.inventory.ilm_candidates
WHERE snapshot_date = CURRENT_DATE
AND recommendation = 'TRANSITION_TO_COLD'
ORDER BY total_size_bytes DESC
LIMIT 50;
-- 30/60/90일 기준 전환 시 용량 절감 효과
SELECT
bucket,
SUM(CASE WHEN age_days >= 30 THEN size_bytes ELSE 0 END) / 1e12 AS cold_30d_tb,
SUM(CASE WHEN age_days >= 60 THEN size_bytes ELSE 0 END) / 1e12 AS cold_60d_tb,
SUM(CASE WHEN age_days >= 90 THEN size_bytes ELSE 0 END) / 1e12 AS cold_90d_tb,
SUM(size_bytes) / 1e12 AS total_tb
FROM (
SELECT
bucket,
size_bytes,
datediff(day, date(last_modified), current_date) AS age_days
FROM aistor.inventory.object_inventory
WHERE snapshot_date = CURRENT_DATE
AND tier = 'hot'
AND is_latest = true
AND is_delete_marker = false
)
GROUP BY bucket
ORDER BY cold_90d_tb DESC;
-- 주간 용량 증가 트렌드 (Iceberg의 snapshot 이력 활용)
SELECT
snapshot_date,
tier,
SUM(size_bytes) / 1e15 AS total_petabytes
FROM aistor.inventory.object_inventory
WHERE snapshot_date >= CURRENT_DATE - INTERVAL '30' DAY
GROUP BY snapshot_date, tier
ORDER BY snapshot_date, tier;
# crontab 예시: 매일 새벽 3시 실행
# 1. Inventory 생성 → 완료 대기
# 2. ETL 실행 → Iceberg Table append
# 3. 오래된 raw parquet 정리
# /etc/cron.d/minio-inventory-etl
0 3 * * * root /opt/scripts/run_inventory_etl.sh >> /var/log/inventory_etl.log 2>&1
#!/bin/bash
# run_inventory_etl.sh
DATE=$(date +%Y-%m-%d)
echo "[$DATE] Inventory ETL 시작"
# 1. Inventory 완료 확인
mc inventory status HOT/<inventory-id> | grep -q "Completed" || {
echo "Inventory 아직 미완료, 재시도"
exit 1
}
# 2. ETL 실행
python3 /opt/scripts/inventory_to_iceberg.py
# 3. 이전 Raw Parquet 정리 (30일 이전)
mc find HOT/inventory-bucket \
--older-than 30d \
--name "*.parquet" | xargs -I{} mc rm HOT/{}
echo "[$DATE] ETL 완료"
AIStor Tables는 외부 카탈로그 서비스나 메타데이터 DB 없이 MinIO AIStor 내부에서 직접 Iceberg 테이블을 생성·관리·쿼리할 수 있고, Spark, Trino, Dremio, Starburst 같은 클라이언트에서 Iceberg REST Catalog나 S3 API 양쪽으로 접근 가능하다.
이를 활용한 용도를 구체적으로 정리하면:
| 활용 분야 | 구체 쿼리 내용 | 의사결정 지원 |
|---|---|---|
| ILM 정책 수립 | 버킷/prefix별 age 분포 분석 | 30/60/90일 전환 기준 수치 근거 확보 |
| Cold Tier 용량 산정 | pool0~pool2 수용 가능 용량 vs 전환 대상 | 5월/8월 구축 규모 검증 |
| Hot Tier 공간 확보 | 전환 후 Hot 잔여 용량 예측 | Hot 클러스터 증설 여부 판단 |
| ILM 전환 모니터링 | storage_class 변화 추이 | 실제 Cold 전환 검증 |
| Delta Sharing 활용 | Delta Sharing Protocol로 Iceberg 테이블을 Databricks 등 외부에 공유 가능 | 고객사 보고/대시보드 연동 |
AIStor Tables는 테이블 위치(location)를 MinIO가 직접 관리하며, 사용자가 커스텀 위치를 지정할 수 없음 → Parquet을 직접 테이블 위치에 복사하는 방식 사용 불가, 반드시 PyIceberg/Spark 등 클라이언트 라이브러리를 통한 append 방식 사용
커밋은 낙관적 동시성 제어(optimistic locking)로 처리되며, 충돌 시 지수 백오프로 재시도 권장 — 병렬 ETL 실행 시 주의
AIStor Tables 기능은
minio RELEASE.2026-02-02T23-40-11Z이상에서 지원 — 버전 확인 필수