"""
AIStor iNVENTORY Parquet Analyzer
====================================
MinIO AIStor iNVENTORY 결과 parquet 파일 컬럼:
Bucket, Key, SequenceNumber, Size, LastModifiedDate
Key 예시:
logs/2024/01/app.log → depth=1: logs/
→ depth=2: logs/2024/
→ depth=3: logs/2024/01/
images/raw/photo.jpg → depth=1: images/
→ depth=2: images/raw/
사용법:
python aistor_inventory_analyzer.py --parquet <경로> --depth <int> [옵션]
옵션:
--parquet, -p parquet 파일 / 디렉토리 / glob 패턴 (필수)
--depth, -d 분석할 prefix depth (필수, --info-only 제외)
--bucket, -b 특정 버킷만 분석
--prefix-filter 특정 prefix 하위만 분석 (예: logs/2024/)
--sort-by size | count | prefix (default: size)
--top 상위 N개만 출력
--output, -o table | csv | json (default: table)
--save, -s 결과 파일 저장 경로
--info-only 스키마·버킷 목록·depth 분포만 출력
"""
import argparse
import glob
import sys
from pathlib import Path
try:
import pandas as pd
except ImportError:
print("[ERROR] pandas가 없습니다: pip install pandas pyarrow")
sys.exit(1)
try:
from tabulate import tabulate
HAS_TABULATE = True
except ImportError:
HAS_TABULATE = False
def human_size(n: float) -> str:
if pd.isna(n) or n == 0:
return "0 B"
for unit in ("B", "KB", "MB", "GB", "TB", "PB"):
if abs(n) < 1024.0:
return f"{n:,.2f} {unit}"
n /= 1024.0
return f"{n:,.2f} EB"
def load_parquets(path: str) -> pd.DataFrame:
p = Path(path)
if p.is_file():
print(f"[INFO] 파일 로드: {p}")
return pd.read_parquet(p)
if p.is_dir():
files = sorted(p.rglob("*.parquet"))
if not files:
sys.exit(f"[ERROR] '{path}' 에 parquet 파일이 없습니다.")
print(f"[INFO] {len(files)}개 parquet 파일 로드 중...")
for f in files:
print(f" · {f}")
return pd.concat([pd.read_parquet(f) for f in files], ignore_index=True)
files = sorted(glob.glob(path, recursive=True))
if not files:
sys.exit(f"[ERROR] '{path}' 에 해당하는 파일이 없습니다.")
return pd.concat([pd.read_parquet(f) for f in files], ignore_index=True)
def validate_columns(df: pd.DataFrame) -> None:
required = {"Bucket", "Key", "Size"}
missing = required - set(df.columns)
if missing:
sys.exit(f"[ERROR] 필수 컬럼 누락: {missing}\n현재 컬럼: {list(df.columns)}")
def extract_prefix_at_depth(key: str, depth: int) -> str:
"""
Key를 '/' 로 분리해 depth 단계까지의 prefix 를 반환.
마지막 세그먼트(파일명)는 포함하지 않음.
예) key='logs/2024/01/app.log', depth=1 → 'logs/'
key='logs/2024/01/app.log', depth=2 → 'logs/2024/'
key='logs/2024/01/app.log', depth=3 → 'logs/2024/01/'
key='top-level-file.txt', depth=1 → '(root)'
"""
parts = key.rstrip("/").split("/")
dir_parts = parts[:-1]
if not dir_parts or len(dir_parts) < depth:
return "(root)"
return "/".join(dir_parts[:depth]) + "/"
def add_prefix_column(df: pd.DataFrame, depth: int) -> pd.DataFrame:
df = df.copy()
df["_prefix"] = df["Key"].apply(lambda k: extract_prefix_at_depth(str(k), depth))
return df
def analyze(
df: pd.DataFrame,
depth: int,
bucket_filter: str = None,
prefix_filter: str = None,
sort_by: str = "size",
top_n: int = None,
) -> pd.DataFrame:
work = df.copy()
if bucket_filter:
work = work[work["Bucket"] == bucket_filter]
if work.empty:
print(f"[WARN] 버킷 '{bucket_filter}' 에 해당하는 데이터가 없습니다.")
print(f"[INFO] 버킷 목록: {sorted(df['Bucket'].dropna().unique())}")
return pd.DataFrame()
if prefix_filter:
work = work[work["Key"].str.startswith(prefix_filter, na=False)]
if work.empty:
print(f"[WARN] prefix '{prefix_filter}' 에 해당하는 Key가 없습니다.")
return pd.DataFrame()
work = add_prefix_column(work, depth)
result = (
work.groupby(["Bucket", "_prefix"], as_index=False)
.agg(
object_count=("Key", "count"),
total_bytes =("Size", "sum"),
)
.rename(columns={"_prefix": "prefix"})
)
result["avg_object_size"] = (
result["total_bytes"] / result["object_count"].replace(0, float("nan"))
)
total_obj = result["object_count"].sum()
total_bytes = result["total_bytes"].sum()
result["object_pct"] = (result["object_count"] / total_obj * 100).round(2)
result["size_pct"] = (result["total_bytes"] / total_bytes * 100).round(2)
sort_col = {"size": "total_bytes", "count": "object_count", "prefix": "prefix"}[sort_by]
result = result.sort_values(sort_col, ascending=(sort_col == "prefix")).reset_index(drop=True)
if top_n:
result = result.head(top_n)
return result
def print_result(result: pd.DataFrame, depth: int, fmt: str) -> None:
if result.empty:
print("[INFO] 출력할 결과가 없습니다.")
return
total_obj = result["object_count"].sum()
total_bytes = result["total_bytes"].sum()
print("\n" + "=" * 72)
print(f" AIStor iNVENTORY 분석 | depth = {depth}")
print("=" * 72)
print(f" prefix 수 : {len(result):,}개")
print(f" 총 object 수 : {total_obj:,}개")
print(f" 총 데이터 크기 : {human_size(total_bytes)}")
print("=" * 72 + "\n")
if fmt == "json":
print(result.to_json(orient="records", indent=2, force_ascii=False))
return
if fmt == "csv":
print(result.to_csv(index=False))
return
disp = result.assign(
total_size = result["total_bytes"].apply(human_size),
avg_size = result["avg_object_size"].apply(
lambda x: human_size(x) if not pd.isna(x) else "-"),
obj_pct = result["object_pct"].apply(lambda x: f"{x:.1f}%"),
size_pct = result["size_pct"].apply(lambda x: f"{x:.1f}%"),
)[[
"Bucket", "prefix", "object_count", "total_size",
"avg_size", "obj_pct", "size_pct",
]].rename(columns={
"Bucket": "버킷",
"prefix": "Prefix",
"object_count": "객체 수",
"total_size": "총 크기",
"avg_size": "평균 크기",
"obj_pct": "객체 비율",
"size_pct": "크기 비율",
})
if HAS_TABULATE:
print(tabulate(disp, headers="keys", tablefmt="rounded_outline",
showindex=True, numalign="right"))
else:
print(disp.to_string(index=True))
print("\n[TIP] pip install tabulate 로 더 예쁜 출력 가능")
def print_info(df: pd.DataFrame) -> None:
"""--info-only: 스키마 및 depth 분포 미리보기"""
print("\n" + "─" * 54)
print(" 데이터셋 기본 정보")
print("─" * 54)
print(f" 총 object 수 : {len(df):,}")
print(f" 컬럼 : {list(df.columns)}")
print(f" 총 데이터 크기: {human_size(df['Size'].sum())}")
print(f" 버킷 목록 : {sorted(df['Bucket'].dropna().unique().tolist())}")
print("\n depth별 unique prefix 수 (참고용):")
for d in range(1, 7):
n = df["Key"].apply(lambda k: extract_prefix_at_depth(str(k), d)).nunique()
print(f" depth={d}: {n:>8,}개 prefix")
print("─" * 54 + "\n")
print("[TIP] --depth 값을 위 표를 참고해 선택하세요.")
def save_result(result: pd.DataFrame, path: str, fmt: str) -> None:
if fmt == "json":
result.to_json(path, orient="records", indent=2, force_ascii=False)
else:
result.to_csv(path, index=False, encoding="utf-8-sig")
print(f"[INFO] 저장 완료: {path}")
def parse_args():
p = argparse.ArgumentParser(
description="AIStor iNVENTORY parquet → depth별 prefix 집계 분석",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
예시:
# 스키마 및 depth 분포 확인 (먼저 실행 권장)
python aistor_inventory_analyzer.py -p ./reports/ --info-only
# depth=2, 크기 순 정렬
python aistor_inventory_analyzer.py -p ./reports/ -d 2
# 특정 버킷, depth=3, Top 30, CSV 저장
python aistor_inventory_analyzer.py -p ./reports/ -d 3 \\
--bucket my-bucket --sort-by size --top 30 --save out.csv
# logs/2024/ 하위만 depth=4 드릴다운, JSON 출력
python aistor_inventory_analyzer.py -p report.parquet -d 4 \\
--prefix-filter logs/2024/ -o json
""",
)
p.add_argument("--parquet", "-p", required=True,
help="parquet 파일 / 디렉토리 / glob 패턴")
p.add_argument("--depth", "-d", type=int,
help="분석할 prefix depth (1부터 시작)")
p.add_argument("--bucket", "-b", default=None,
help="특정 버킷만 분석")
p.add_argument("--prefix-filter", default=None,
help="Key prefix 필터 (예: logs/2024/)")
p.add_argument("--sort-by",
choices=["size", "count", "prefix"], default="size",
help="정렬 기준 (default: size)")
p.add_argument("--top", type=int, default=None,
help="상위 N개만 출력")
p.add_argument("--output", "-o",
choices=["table", "csv", "json"], default="table",
help="출력 형식 (default: table)")
p.add_argument("--save", "-s", default=None,
help="결과 저장 경로 (.csv 또는 .json)")
p.add_argument("--info-only", action="store_true",
help="스키마 정보만 출력하고 종료")
return p.parse_args()
def main():
args = parse_args()
df = load_parquets(args.parquet)
validate_columns(df)
df["Size"] = pd.to_numeric(df["Size"], errors="coerce").fillna(0)
if args.info_only:
print_info(df)
return
if args.depth is None:
sys.exit("[ERROR] --depth 가 필요합니다. 먼저 --info-only 로 depth 분포를 확인하세요.")
result = analyze(
df,
depth=args.depth,
bucket_filter=args.bucket,
prefix_filter=args.prefix_filter,
sort_by=args.sort_by,
top_n=args.top,
)
print_result(result, args.depth, args.output)
if args.save and not result.empty:
save_result(result, args.save, args.output)
if __name__ == "__main__":
main()