"""
AIStor iNVENTORY Analyzer (원격 직접 읽기 버전)
=================================================
MinIO AIStor 버킷에 쌓인 iNVENTORY 결과를
로컬 복사 없이 Python에서 직접 스트리밍하여 분석합니다.
저장 경로 구조:
<DEST_BUCKET>/<PREFIX>/<SOURCE_BUCKET>/<INVENTORY_ID>/<TIMESTAMP>/
manifest.json
files/
file-001.parquet
file-002.parquet
...
parquet 컬럼:
Bucket, Key, SequenceNumber, Size, LastModifiedDate
사용법:
# 기본 (최신 timestamp 자동 선택)
python aistor_inventory_analyzer.py \\
--endpoint play.min.io:9000 \\
--access-key Q3AM3UQ867SPQQA43P2F \\
--secret-key zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG \\
--dest-bucket inventory-reports \\
--source-bucket devops-test \\
--inventory-id my-job-id \\
--depth 2
# .env 파일 또는 환경변수로 인증 정보 관리 (권장)
export AISTOR_ENDPOINT=play.min.io:9000
export AISTOR_ACCESS_KEY=...
export AISTOR_SECRET_KEY=...
python aistor_inventory_analyzer.py \\
--dest-bucket inventory-reports \\
--source-bucket devops-test \\
--inventory-id my-job-id \\
--depth 2
# 특정 timestamp 지정 (생략 시 최신 자동 선택)
python aistor_inventory_analyzer.py ... --timestamp 2025-01-15T03-00Z
# 구조 확인만 (분석 없이 어떤 job/timestamp가 있는지 확인)
python aistor_inventory_analyzer.py ... --list-jobs
python aistor_inventory_analyzer.py ... --info-only
"""
import argparse
import io
import json
import os
import sys
try:
import pandas as pd
import pyarrow.parquet as pq
from minio import Minio
from minio.error import S3Error
except ImportError as e:
print(f"[ERROR] 필수 패키지 부족: {e}")
print(" pip install minio pyarrow pandas tabulate")
sys.exit(1)
try:
from tabulate import tabulate
HAS_TABULATE = True
except ImportError:
HAS_TABULATE = False
def human_size(n: float) -> str:
if pd.isna(n) or n == 0:
return "0 B"
for unit in ("B", "KB", "MB", "GB", "TB", "PB"):
if abs(n) < 1024.0:
return f"{n:,.2f} {unit}"
n /= 1024.0
return f"{n:,.2f} EB"
def make_client(endpoint: str, access_key: str, secret_key: str, secure: bool) -> Minio:
return Minio(
endpoint,
access_key=access_key,
secret_key=secret_key,
secure=secure,
)
def list_jobs(client: Minio, dest_bucket: str, prefix: str,
source_bucket: str) -> dict[str, list[str]]:
"""
dest_bucket 안에서 source_bucket 하위의 inventory job ID와
각 job의 timestamp 목록을 반환합니다.
탐색 경로: <prefix>/<source_bucket>/<job_id>/<timestamp>/manifest.json
"""
base = f"{prefix}/{source_bucket}/" if prefix else f"{source_bucket}/"
base = base.lstrip("/")
jobs: dict[str, list[str]] = {}
try:
objects = client.list_objects(dest_bucket, prefix=base, recursive=True)
for obj in objects:
key = obj.object_name
if not key.endswith("manifest.json"):
continue
rel = key[len(base):]
parts = rel.split("/")
if len(parts) >= 3:
job_id, timestamp = parts[0], parts[1]
jobs.setdefault(job_id, []).append(timestamp)
except S3Error as e:
sys.exit(f"[ERROR] 버킷 조회 실패: {e}")
for jid in jobs:
jobs[jid] = sorted(jobs[jid], reverse=True)
return jobs
def resolve_manifest_key(prefix: str, source_bucket: str,
inventory_id: str, timestamp: str) -> str:
"""manifest.json의 오브젝트 키 조립"""
parts = [p for p in [prefix, source_bucket, inventory_id, timestamp, "manifest.json"] if p]
return "/".join(parts)
def read_manifest(client: Minio, dest_bucket: str, manifest_key: str) -> dict:
"""manifest.json 읽기"""
try:
resp = client.get_object(dest_bucket, manifest_key)
data = json.loads(resp.read().decode("utf-8"))
resp.close()
return data
except S3Error as e:
sys.exit(f"[ERROR] manifest 읽기 실패 ({manifest_key}): {e}")
def resolve_parquet_keys(manifest: dict, dest_bucket: str,
prefix: str, source_bucket: str,
inventory_id: str, timestamp: str) -> list[str]:
"""
manifest의 files 목록에서 parquet 오브젝트 키 목록을 반환.
manifest의 key 필드가 절대경로면 그대로, 상대경로면 base_path 를 앞에 붙임.
"""
files = manifest.get("files", [])
if not files:
sys.exit("[ERROR] manifest에 files 항목이 없습니다.")
keys = []
base = "/".join(p for p in [prefix, source_bucket, inventory_id, timestamp, "files"] if p)
for f in files:
raw_key = f["key"] if isinstance(f, dict) else str(f)
if raw_key.startswith(dest_bucket + "/"):
raw_key = raw_key[len(dest_bucket) + 1:]
if not raw_key.startswith(source_bucket) and not raw_key.startswith(prefix or source_bucket):
obj_key = f"{base}/{raw_key.lstrip('/')}"
else:
obj_key = raw_key
keys.append(obj_key)
return keys
def stream_parquet(client: Minio, dest_bucket: str, object_key: str) -> pd.DataFrame:
"""
MinIO에서 parquet 파일을 메모리로 스트리밍하여 DataFrame 반환.
256 MB 파일도 청크 없이 pyarrow가 처리 (columnar 포맷 특성상 효율적).
"""
try:
resp = client.get_object(dest_bucket, object_key)
buf = io.BytesIO(resp.read())
resp.close()
except S3Error as e:
print(f" [WARN] 파일 읽기 실패, 건너뜀 ({object_key}): {e}")
return pd.DataFrame()
try:
table = pq.read_table(buf)
return table.to_pandas()
except Exception as e:
print(f" [WARN] parquet 파싱 실패, 건너뜀 ({object_key}): {e}")
return pd.DataFrame()
def load_all_parquets(client: Minio, dest_bucket: str,
parquet_keys: list[str]) -> pd.DataFrame:
"""모든 parquet 파일을 순차 스트리밍 후 병합"""
dfs = []
total = len(parquet_keys)
for i, key in enumerate(parquet_keys, 1):
fname = key.split("/")[-1]
print(f" [{i}/{total}] {fname} 읽는 중...", end=" ", flush=True)
df = stream_parquet(client, dest_bucket, key)
if not df.empty:
print(f"{len(df):,} rows")
dfs.append(df)
else:
print("(건너뜀)")
if not dfs:
sys.exit("[ERROR] 읽을 수 있는 parquet 파일이 없습니다.")
result = pd.concat(dfs, ignore_index=True)
print(f"\n[INFO] 총 {len(result):,} rows 로드 완료")
return result
def extract_prefix_at_depth(key: str, depth: int) -> str:
"""
Key의 디렉토리 경로를 depth 단계까지 잘라 prefix 반환.
예) key='logs/2024/01/app.log', depth=2 → 'logs/2024/'
key='top-level.txt', depth=1 → '(root)'
"""
parts = key.rstrip("/").split("/")
dir_parts = parts[:-1]
if len(dir_parts) < depth:
return "(root)"
return "/".join(dir_parts[:depth]) + "/"
def analyze(df: pd.DataFrame, depth: int,
bucket_filter: str = None,
prefix_filter: str = None,
sort_by: str = "size",
top_n: int = None) -> pd.DataFrame:
work = df.copy()
if bucket_filter:
work = work[work["Bucket"] == bucket_filter]
if work.empty:
print(f"[WARN] 버킷 '{bucket_filter}' 데이터 없음")
print(f"[INFO] 버킷 목록: {sorted(df['Bucket'].dropna().unique())}")
return pd.DataFrame()
if prefix_filter:
work = work[work["Key"].str.startswith(prefix_filter, na=False)]
if work.empty:
print(f"[WARN] prefix '{prefix_filter}' 에 해당하는 Key 없음")
return pd.DataFrame()
work["_prefix"] = work["Key"].apply(lambda k: extract_prefix_at_depth(str(k), depth))
result = (
work.groupby(["Bucket", "_prefix"], as_index=False)
.agg(object_count=("Key", "count"), total_bytes=("Size", "sum"))
.rename(columns={"_prefix": "prefix"})
)
result["avg_object_size"] = (
result["total_bytes"] / result["object_count"].replace(0, float("nan"))
)
tot_obj = result["object_count"].sum()
tot_bytes = result["total_bytes"].sum()
result["object_pct"] = (result["object_count"] / tot_obj * 100).round(2)
result["size_pct"] = (result["total_bytes"] / tot_bytes * 100).round(2)
sort_col = {"size": "total_bytes", "count": "object_count", "prefix": "prefix"}[sort_by]
result = result.sort_values(sort_col, ascending=(sort_col == "prefix")).reset_index(drop=True)
if top_n:
result = result.head(top_n)
return result
def print_result(result: pd.DataFrame, depth: int, fmt: str) -> None:
if result.empty:
print("[INFO] 출력할 결과가 없습니다.")
return
tot_obj = result["object_count"].sum()
tot_bytes = result["total_bytes"].sum()
print("\n" + "=" * 72)
print(f" AIStor iNVENTORY 분석 | depth = {depth}")
print("=" * 72)
print(f" prefix 수 : {len(result):,}개")
print(f" 총 object 수 : {tot_obj:,}개")
print(f" 총 데이터 크기 : {human_size(tot_bytes)}")
print("=" * 72 + "\n")
if fmt == "json":
print(result.to_json(orient="records", indent=2, force_ascii=False))
return
if fmt == "csv":
print(result.to_csv(index=False))
return
disp = result.assign(
total_size = result["total_bytes"].apply(human_size),
avg_size = result["avg_object_size"].apply(
lambda x: human_size(x) if not pd.isna(x) else "-"),
obj_pct = result["object_pct"].apply(lambda x: f"{x:.1f}%"),
size_pct = result["size_pct"].apply(lambda x: f"{x:.1f}%"),
)[[
"Bucket", "prefix", "object_count", "total_size", "avg_size", "obj_pct", "size_pct",
]].rename(columns={
"Bucket": "버킷", "prefix": "Prefix", "object_count": "객체 수",
"total_size": "총 크기", "avg_size": "평균 크기",
"obj_pct": "객체 비율", "size_pct": "크기 비율",
})
if HAS_TABULATE:
print(tabulate(disp, headers="keys", tablefmt="rounded_outline",
showindex=True, numalign="right"))
else:
print(disp.to_string(index=True))
print("\n[TIP] pip install tabulate → 더 예쁜 테이블 출력")
def print_info(df: pd.DataFrame) -> None:
print("\n" + "─" * 54)
print(" 데이터셋 기본 정보")
print("─" * 54)
print(f" 총 object 수 : {len(df):,}")
print(f" 컬럼 : {list(df.columns)}")
print(f" 총 데이터 크기: {human_size(df['Size'].sum())}")
print(f" 버킷 목록 : {sorted(df['Bucket'].dropna().unique().tolist())}")
print("\n depth별 unique prefix 수:")
for d in range(1, 7):
n = df["Key"].apply(lambda k: extract_prefix_at_depth(str(k), d)).nunique()
print(f" depth={d}: {n:>8,}개 prefix")
print("─" * 54)
print("[TIP] --depth 값을 위 표를 참고해 선택하세요.\n")
def save_result(result: pd.DataFrame, path: str, fmt: str) -> None:
if fmt == "json":
result.to_json(path, orient="records", indent=2, force_ascii=False)
else:
result.to_csv(path, index=False, encoding="utf-8-sig")
print(f"[INFO] 저장 완료: {path}")
def parse_args():
p = argparse.ArgumentParser(
description="AIStor iNVENTORY parquet를 원격에서 직접 읽어 depth별 prefix 집계",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
환경변수 (--access-key/--secret-key 대신 사용 가능):
AISTOR_ENDPOINT MinIO endpoint (예: minio.example.com:9000)
AISTOR_ACCESS_KEY 액세스 키
AISTOR_SECRET_KEY 시크릿 키
예시:
# job 목록과 timestamp 확인
python aistor_inventory_analyzer.py \\
--endpoint minio.example.com:9000 \\
--access-key AKID --secret-key SKID \\
--dest-bucket inventory-reports --source-bucket devops-test \\
--list-jobs
# 최신 timestamp 자동 선택, depth=2 분석
python aistor_inventory_analyzer.py \\
--endpoint minio.example.com:9000 \\
--access-key AKID --secret-key SKID \\
--dest-bucket inventory-reports --source-bucket devops-test \\
--inventory-id my-job-id --depth 2
# 특정 timestamp 지정, Top 30, CSV 저장
python aistor_inventory_analyzer.py \\
--endpoint minio.example.com:9000 \\
--access-key AKID --secret-key SKID \\
--dest-bucket inventory-reports --source-bucket devops-test \\
--inventory-id my-job-id \\
--timestamp 2025-01-15T03-00Z \\
--depth 3 --sort-by size --top 30 --save result.csv
# prefix 드릴다운 + JSON 출력
python aistor_inventory_analyzer.py ... \\
--inventory-id my-job-id --depth 4 \\
--prefix-filter logs/2024/ -o json
""",
)
conn = p.add_argument_group("MinIO 연결 정보")
conn.add_argument("--endpoint", "-e",
default=os.environ.get("AISTOR_ENDPOINT"),
help="MinIO endpoint (예: minio.example.com:9000)")
conn.add_argument("--access-key",
default=os.environ.get("AISTOR_ACCESS_KEY"),
help="액세스 키 (환경변수 AISTOR_ACCESS_KEY 도 가능)")
conn.add_argument("--secret-key",
default=os.environ.get("AISTOR_SECRET_KEY"),
help="시크릿 키 (환경변수 AISTOR_SECRET_KEY 도 가능)")
conn.add_argument("--no-tls", action="store_true",
help="TLS 비활성화 (http 접속)")
path = p.add_argument_group("inventory 경로")
path.add_argument("--dest-bucket", required=True,
help="inventory 결과가 저장된 버킷명")
path.add_argument("--prefix", default="",
help="dest-bucket 안의 prefix (설정 YAML의 destination.prefix, 없으면 생략)")
path.add_argument("--source-bucket", required=True,
help="인벤토리를 생성한 원본 버킷명")
path.add_argument("--inventory-id",
help="inventory job ID (--list-jobs 생략 시 필수)")
path.add_argument("--timestamp", default=None,
help="특정 실행 timestamp (예: 2025-01-15T03-00Z). 생략 시 최신 자동 선택")
ana = p.add_argument_group("분석 옵션")
ana.add_argument("--depth", "-d", type=int,
help="분석할 prefix depth (1~)")
ana.add_argument("--bucket-filter", "-b", default=None,
help="특정 Bucket 컬럼 값으로 필터")
ana.add_argument("--prefix-filter", default=None,
help="Key prefix 필터 (예: logs/2024/)")
ana.add_argument("--sort-by",
choices=["size", "count", "prefix"], default="size")
ana.add_argument("--top", type=int, default=None,
help="상위 N개만 출력")
ana.add_argument("--output", "-o",
choices=["table", "csv", "json"], default="table")
ana.add_argument("--save", "-s", default=None,
help="결과 저장 경로")
p.add_argument("--list-jobs", action="store_true",
help="job ID / timestamp 목록만 출력하고 종료")
p.add_argument("--info-only", action="store_true",
help="데이터 로드 후 스키마/depth 분포만 출력")
return p.parse_args()
def main():
args = parse_args()
if not args.endpoint:
sys.exit("[ERROR] --endpoint 또는 환경변수 AISTOR_ENDPOINT 가 필요합니다.")
if not args.access_key or not args.secret_key:
sys.exit("[ERROR] --access-key / --secret-key 또는 환경변수가 필요합니다.")
client = make_client(
args.endpoint,
args.access_key,
args.secret_key,
secure=not args.no_tls,
)
if args.list_jobs:
print(f"\n[INFO] '{args.dest_bucket}' 버킷의 inventory job 목록 조회 중...")
jobs = list_jobs(client, args.dest_bucket, args.prefix, args.source_bucket)
if not jobs:
print("[INFO] inventory job을 찾을 수 없습니다.")
return
print(f"\n{'Job ID':<40} {'Timestamp (최신순)'}")
print("─" * 72)
for jid, timestamps in sorted(jobs.items()):
for i, ts in enumerate(timestamps):
label = jid if i == 0 else ""
latest = " ← 최신" if i == 0 else ""
print(f"{label:<40} {ts}{latest}")
print()
return
if not args.inventory_id:
sys.exit("[ERROR] --inventory-id 가 필요합니다. --list-jobs 로 목록을 먼저 확인하세요.")
timestamp = args.timestamp
if not timestamp:
print("[INFO] --timestamp 미지정 → 최신 timestamp 자동 선택 중...")
jobs = list_jobs(client, args.dest_bucket, args.prefix, args.source_bucket)
ts_list = jobs.get(args.inventory_id, [])
if not ts_list:
sys.exit(f"[ERROR] inventory-id '{args.inventory_id}' 를 찾을 수 없습니다.")
timestamp = ts_list[0]
print(f"[INFO] 선택된 timestamp: {timestamp}")
manifest_key = resolve_manifest_key(
args.prefix, args.source_bucket, args.inventory_id, timestamp
)
print(f"[INFO] manifest 읽기: {args.dest_bucket}/{manifest_key}")
manifest = read_manifest(client, args.dest_bucket, manifest_key)
fmt = manifest.get("fileFormat", "parquet").lower()
if fmt != "parquet":
sys.exit(f"[ERROR] 이 도구는 parquet 포맷만 지원합니다. (현재 포맷: {fmt})")
print(f"[INFO] sourceBucket : {manifest.get('sourceBucket', '?')}")
print(f"[INFO] inventoryId : {manifest.get('inventoryId', '?')}")
print(f"[INFO] createdAt : {manifest.get('createdAt', '?')}")
file_count = len(manifest.get("files", []))
print(f"[INFO] parquet 파일 : {file_count}개\n")
parquet_keys = resolve_parquet_keys(
manifest, args.dest_bucket,
args.prefix, args.source_bucket,
args.inventory_id, timestamp,
)
df = load_all_parquets(client, args.dest_bucket, parquet_keys)
df["Size"] = pd.to_numeric(df["Size"], errors="coerce").fillna(0)
if args.info_only:
print_info(df)
return
if not args.depth:
sys.exit("[ERROR] --depth 가 필요합니다. --info-only 로 depth 분포를 먼저 확인하세요.")
result = analyze(
df,
depth=args.depth,
bucket_filter=args.bucket_filter,
prefix_filter=args.prefix_filter,
sort_by=args.sort_by,
top_n=args.top,
)
print_result(result, args.depth, args.output)
if args.save and not result.empty:
save_result(result, args.save, args.output)
if __name__ == "__main__":
main()