"""
AIStor iNVENTORY Parquet Analyzer
====================================
MinIO AIStor iNVENTORY API로 생성된 parquet 파일을
특정 depth의 prefix별로 집계하여 분석하는 도구입니다.
사용법:
python aistor_inventory_analyzer.py \
--parquet <parquet_file_or_dir> \
--depth <int> \
[--bucket <bucket_name>] \
[--prefix-filter <prefix>] \
[--sort-by size|count|prefix] \
[--top <int>] \
[--output <csv|table|json>] \
[--save <output_file>]
AIStor iNVENTORY 설정 예시 (YAML):
apiVersion: v1
id: my-inventory-job
destination:
bucket: inventory-reports
prefix: reports/
format: parquet
compression: on
schedule: daily
mode: fast
versions: current
mc 명령어로 inventory 실행:
# 설정 파일 생성
mc inventory generate ALIAS/SOURCE_BUCKET > inventory-config.yaml
# 설정 적용 및 job 시작
mc inventory add ALIAS/SOURCE_BUCKET inventory-config.yaml
# 상태 확인
mc inventory status ALIAS/SOURCE_BUCKET --id my-inventory-job
# 결과 parquet 다운로드
mc cp --recursive ALIAS/inventory-reports/reports/ ./local-reports/
"""
import argparse
import glob
import json
import os
import sys
from pathlib import Path
try:
import pandas as pd
import pyarrow.parquet as pq
except ImportError:
print("[ERROR] 필수 패키지가 없습니다. 아래 명령어로 설치하세요:")
print(" pip install pandas pyarrow tabulate")
sys.exit(1)
try:
from tabulate import tabulate
HAS_TABULATE = True
except ImportError:
HAS_TABULATE = False
def human_readable_size(size_bytes: float) -> str:
"""바이트를 읽기 쉬운 단위로 변환"""
if pd.isna(size_bytes) or size_bytes == 0:
return "0 B"
for unit in ["B", "KB", "MB", "GB", "TB", "PB"]:
if abs(size_bytes) < 1024.0:
return f"{size_bytes:,.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:,.2f} EB"
def load_parquet_files(path: str) -> pd.DataFrame:
"""단일 파일 또는 디렉토리의 모든 parquet 파일을 로드"""
p = Path(path)
if p.is_file() and p.suffix == ".parquet":
print(f"[INFO] 파일 로드: {p}")
return pd.read_parquet(p)
if p.is_dir():
files = sorted(glob.glob(str(p / "**" / "*.parquet"), recursive=True))
if not files:
files = sorted(glob.glob(str(p / "*.parquet")))
if not files:
print(f"[ERROR] '{path}' 디렉토리에서 parquet 파일을 찾을 수 없습니다.")
sys.exit(1)
print(f"[INFO] {len(files)}개 parquet 파일 로드 중...")
dfs = []
for f in files:
print(f" - {f}")
dfs.append(pd.read_parquet(f))
return pd.concat(dfs, ignore_index=True)
files = sorted(glob.glob(path, recursive=True))
if not files:
print(f"[ERROR] '{path}' 에서 parquet 파일을 찾을 수 없습니다.")
sys.exit(1)
print(f"[INFO] glob 패턴으로 {len(files)}개 파일 로드 중...")
dfs = [pd.read_parquet(f) for f in files]
return pd.concat(dfs, ignore_index=True)
def validate_columns(df: pd.DataFrame) -> None:
"""필수 컬럼 존재 여부 확인"""
required = {"bucket", "prefix", "depth", "object_count", "total_bytes"}
missing = required - set(df.columns)
if missing:
print(f"[ERROR] 필수 컬럼 누락: {missing}")
print(f"[INFO] 현재 컬럼: {list(df.columns)}")
sys.exit(1)
def analyze_by_depth(
df: pd.DataFrame,
target_depth: int,
bucket_filter: str = None,
prefix_filter: str = None,
sort_by: str = "size",
top_n: int = None,
) -> pd.DataFrame:
"""
특정 depth의 prefix별로 object_count와 total_bytes를 집계합니다.
AIStor iNVENTORY parquet의 depth는 prefix의 '/' 구분자 깊이를 나타냅니다.
예: depth=1 → 'logs/', depth=2 → 'logs/2024/', depth=3 → 'logs/2024/01/'
"""
filtered = df.copy()
if bucket_filter:
filtered = filtered[filtered["bucket"] == bucket_filter]
if filtered.empty:
print(f"[WARN] 버킷 '{bucket_filter}'에 해당하는 데이터가 없습니다.")
return pd.DataFrame()
if prefix_filter:
mask = filtered["prefix"].str.startswith(prefix_filter, na=False)
filtered = filtered[mask]
if filtered.empty:
print(f"[WARN] prefix '{prefix_filter}'에 해당하는 데이터가 없습니다.")
return pd.DataFrame()
depth_df = filtered[filtered["depth"] == target_depth].copy()
if depth_df.empty:
available_depths = sorted(filtered["depth"].dropna().unique().tolist())
print(f"[WARN] depth={target_depth}에 해당하는 데이터가 없습니다.")
print(f"[INFO] 사용 가능한 depth 값: {available_depths}")
return pd.DataFrame()
result = (
depth_df.groupby(["bucket", "prefix"], as_index=False)
.agg(
object_count=("object_count", "sum"),
total_bytes=("total_bytes", "sum"),
)
)
result["avg_object_size"] = (
result["total_bytes"] / result["object_count"].replace(0, float("nan"))
)
total_objects = result["object_count"].sum()
total_bytes = result["total_bytes"].sum()
result["object_pct"] = (result["object_count"] / total_objects * 100).round(2)
result["size_pct"] = (result["total_bytes"] / total_bytes * 100).round(2)
sort_map = {
"size": "total_bytes",
"count": "object_count",
"prefix": "prefix",
}
sort_col = sort_map.get(sort_by, "total_bytes")
ascending = sort_col == "prefix"
result = result.sort_values(sort_col, ascending=ascending).reset_index(drop=True)
if top_n:
result = result.head(top_n)
return result
def print_summary(df_result: pd.DataFrame, target_depth: int, output_format: str) -> None:
"""분석 결과 출력"""
if df_result.empty:
print("[INFO] 출력할 결과가 없습니다.")
return
total_objects = df_result["object_count"].sum()
total_bytes = df_result["total_bytes"].sum()
prefix_count = len(df_result)
print("\n" + "=" * 70)
print(f" AIStor iNVENTORY 분석 결과 | depth = {target_depth}")
print("=" * 70)
print(f" 총 prefix 수 : {prefix_count:,}개")
print(f" 총 object 수 : {total_objects:,}개")
print(f" 총 데이터 크기 : {human_readable_size(total_bytes)}")
print("=" * 70 + "\n")
display_df = df_result[["bucket", "prefix", "object_count", "total_bytes",
"avg_object_size", "object_pct", "size_pct"]].copy()
display_df["total_bytes_hr"] = display_df["total_bytes"].apply(human_readable_size)
display_df["avg_size_hr"] = display_df["avg_object_size"].apply(
lambda x: human_readable_size(x) if not pd.isna(x) else "-"
)
display_df["object_pct_str"] = display_df["object_pct"].apply(lambda x: f"{x:.1f}%")
display_df["size_pct_str"] = display_df["size_pct"].apply(lambda x: f"{x:.1f}%")
show_cols = {
"bucket": "버킷",
"prefix": "Prefix",
"object_count": "객체 수",
"total_bytes_hr": "총 크기",
"avg_size_hr": "평균 크기",
"object_pct_str": "객체 비율",
"size_pct_str": "크기 비율",
}
out = display_df[list(show_cols.keys())].rename(columns=show_cols)
if output_format == "json":
cols = ["bucket", "prefix", "object_count", "total_bytes",
"avg_object_size", "object_pct", "size_pct"]
print(df_result[cols].to_json(orient="records", indent=2, force_ascii=False))
elif output_format == "csv":
print(out.to_csv(index=False))
else:
if HAS_TABULATE:
print(tabulate(out, headers="keys", tablefmt="rounded_outline",
showindex=True, numalign="right"))
else:
print(out.to_string(index=True))
print("\n[TIP] 더 예쁜 출력을 원하면: pip install tabulate")
def save_result(df_result: pd.DataFrame, save_path: str, output_format: str) -> None:
"""결과 파일 저장"""
if output_format == "json":
df_result.to_json(save_path, orient="records", indent=2, force_ascii=False)
elif output_format == "csv":
df_result.to_csv(save_path, index=False, encoding="utf-8-sig")
else:
df_result.to_csv(save_path, index=False, encoding="utf-8-sig")
print(f"\n[INFO] 결과 저장 완료: {save_path}")
def show_schema_info(df: pd.DataFrame) -> None:
"""데이터셋 기본 정보 출력"""
print("\n" + "─" * 50)
print(" 데이터셋 정보")
print("─" * 50)
print(f" 전체 행 수 : {len(df):,}")
print(f" 컬럼 목록 : {list(df.columns)}")
print(f" 버킷 목록 : {sorted(df['bucket'].dropna().unique().tolist())}")
depth_counts = df.groupby("depth").size().to_dict()
print(f" Depth 분포 :")
for d, cnt in sorted(depth_counts.items()):
print(f" depth={d}: {cnt:,}개 prefix")
print("─" * 50 + "\n")
def parse_args():
parser = argparse.ArgumentParser(
description="AIStor iNVENTORY parquet 파일을 depth별로 분석합니다.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
예시:
# depth=2의 모든 prefix 분석 (크기 순 정렬)
python aistor_inventory_analyzer.py --parquet ./reports/ --depth 2
# 특정 버킷만, 크기 기준 Top 20
python aistor_inventory_analyzer.py --parquet report.parquet --depth 3 \\
--bucket my-data-bucket --sort-by size --top 20
# 특정 prefix 하위만 분석, JSON 출력
python aistor_inventory_analyzer.py --parquet ./reports/ --depth 4 \\
--prefix-filter logs/2024/ --output json
# 데이터 구조 확인 (분석 없이 스키마만)
python aistor_inventory_analyzer.py --parquet report.parquet --info-only
""",
)
parser.add_argument(
"--parquet", "-p", required=True,
help="parquet 파일 경로, 디렉토리, 또는 glob 패턴"
)
parser.add_argument(
"--depth", "-d", type=int,
help="분석할 prefix depth (1부터 시작)"
)
parser.add_argument(
"--bucket", "-b", default=None,
help="특정 버킷만 분석 (생략 시 전체 버킷)"
)
parser.add_argument(
"--prefix-filter", default=None,
help="특정 prefix 하위만 분석 (예: logs/2024/)"
)
parser.add_argument(
"--sort-by", choices=["size", "count", "prefix"], default="size",
help="정렬 기준 (default: size)"
)
parser.add_argument(
"--top", type=int, default=None,
help="상위 N개만 출력"
)
parser.add_argument(
"--output", "-o", choices=["table", "csv", "json"], default="table",
help="출력 형식 (default: table)"
)
parser.add_argument(
"--save", "-s", default=None,
help="결과를 파일로 저장할 경로"
)
parser.add_argument(
"--info-only", action="store_true",
help="파일 구조 정보만 출력 (분석 없음)"
)
return parser.parse_args()
def main():
args = parse_args()
df = load_parquet_files(args.parquet)
validate_columns(df)
df["depth"] = pd.to_numeric(df["depth"], errors="coerce").astype("Int64")
df["object_count"] = pd.to_numeric(df["object_count"], errors="coerce").fillna(0)
df["total_bytes"] = pd.to_numeric(df["total_bytes"], errors="coerce").fillna(0)
show_schema_info(df)
if args.info_only:
return
if args.depth is None:
print("[ERROR] --depth 옵션이 필요합니다. --info-only 로 사용 가능한 depth를 확인하세요.")
sys.exit(1)
result = analyze_by_depth(
df,
target_depth=args.depth,
bucket_filter=args.bucket,
prefix_filter=args.prefix_filter,
sort_by=args.sort_by,
top_n=args.top,
)
print_summary(result, args.depth, args.output)
if args.save and not result.empty:
save_result(result, args.save, args.output)
if __name__ == "__main__":
main()