26A10a

Young-Kyoo Kim·2026년 4월 10일
#!/usr/bin/env python3
"""
AIStor iNVENTORY Parquet Analyzer
====================================
MinIO AIStor iNVENTORY API로 생성된 parquet 파일을
특정 depth의 prefix별로 집계하여 분석하는 도구입니다.

사용법:
    python aistor_inventory_analyzer.py \
        --parquet <parquet_file_or_dir> \
        --depth <int> \
        [--bucket <bucket_name>] \
        [--prefix-filter <prefix>] \
        [--sort-by size|count|prefix] \
        [--top <int>] \
        [--output <csv|table|json>] \
        [--save <output_file>]

AIStor iNVENTORY 설정 예시 (YAML):
    apiVersion: v1
    id: my-inventory-job
    destination:
      bucket: inventory-reports
      prefix: reports/
      format: parquet
      compression: on
    schedule: daily
    mode: fast
    versions: current

mc 명령어로 inventory 실행:
    # 설정 파일 생성
    mc inventory generate ALIAS/SOURCE_BUCKET > inventory-config.yaml

    # 설정 적용 및 job 시작
    mc inventory add ALIAS/SOURCE_BUCKET inventory-config.yaml

    # 상태 확인
    mc inventory status ALIAS/SOURCE_BUCKET --id my-inventory-job

    # 결과 parquet 다운로드
    mc cp --recursive ALIAS/inventory-reports/reports/ ./local-reports/
"""

import argparse
import glob
import json
import os
import sys
from pathlib import Path

try:
    import pandas as pd
    import pyarrow.parquet as pq
except ImportError:
    print("[ERROR] 필수 패키지가 없습니다. 아래 명령어로 설치하세요:")
    print("  pip install pandas pyarrow tabulate")
    sys.exit(1)

try:
    from tabulate import tabulate
    HAS_TABULATE = True
except ImportError:
    HAS_TABULATE = False


# ──────────────────────────────────────────────
# 유틸리티 함수
# ──────────────────────────────────────────────

def human_readable_size(size_bytes: float) -> str:
    """바이트를 읽기 쉬운 단위로 변환"""
    if pd.isna(size_bytes) or size_bytes == 0:
        return "0 B"
    for unit in ["B", "KB", "MB", "GB", "TB", "PB"]:
        if abs(size_bytes) < 1024.0:
            return f"{size_bytes:,.2f} {unit}"
        size_bytes /= 1024.0
    return f"{size_bytes:,.2f} EB"


def load_parquet_files(path: str) -> pd.DataFrame:
    """단일 파일 또는 디렉토리의 모든 parquet 파일을 로드"""
    p = Path(path)

    if p.is_file() and p.suffix == ".parquet":
        print(f"[INFO] 파일 로드: {p}")
        return pd.read_parquet(p)

    if p.is_dir():
        files = sorted(glob.glob(str(p / "**" / "*.parquet"), recursive=True))
        if not files:
            files = sorted(glob.glob(str(p / "*.parquet")))
        if not files:
            print(f"[ERROR] '{path}' 디렉토리에서 parquet 파일을 찾을 수 없습니다.")
            sys.exit(1)
        print(f"[INFO] {len(files)}개 parquet 파일 로드 중...")
        dfs = []
        for f in files:
            print(f"  - {f}")
            dfs.append(pd.read_parquet(f))
        return pd.concat(dfs, ignore_index=True)

    # glob 패턴 지원
    files = sorted(glob.glob(path, recursive=True))
    if not files:
        print(f"[ERROR] '{path}' 에서 parquet 파일을 찾을 수 없습니다.")
        sys.exit(1)
    print(f"[INFO] glob 패턴으로 {len(files)}개 파일 로드 중...")
    dfs = [pd.read_parquet(f) for f in files]
    return pd.concat(dfs, ignore_index=True)


def validate_columns(df: pd.DataFrame) -> None:
    """필수 컬럼 존재 여부 확인"""
    required = {"bucket", "prefix", "depth", "object_count", "total_bytes"}
    missing = required - set(df.columns)
    if missing:
        print(f"[ERROR] 필수 컬럼 누락: {missing}")
        print(f"[INFO] 현재 컬럼: {list(df.columns)}")
        sys.exit(1)


# ──────────────────────────────────────────────
# 핵심 분석 함수
# ──────────────────────────────────────────────

def analyze_by_depth(
    df: pd.DataFrame,
    target_depth: int,
    bucket_filter: str = None,
    prefix_filter: str = None,
    sort_by: str = "size",
    top_n: int = None,
) -> pd.DataFrame:
    """
    특정 depth의 prefix별로 object_count와 total_bytes를 집계합니다.

    AIStor iNVENTORY parquet의 depth는 prefix의 '/' 구분자 깊이를 나타냅니다.
    예: depth=1 → 'logs/', depth=2 → 'logs/2024/', depth=3 → 'logs/2024/01/'
    """
    filtered = df.copy()

    # 버킷 필터
    if bucket_filter:
        filtered = filtered[filtered["bucket"] == bucket_filter]
        if filtered.empty:
            print(f"[WARN] 버킷 '{bucket_filter}'에 해당하는 데이터가 없습니다.")
            return pd.DataFrame()

    # prefix 필터 (특정 prefix 하위만 분석)
    if prefix_filter:
        mask = filtered["prefix"].str.startswith(prefix_filter, na=False)
        filtered = filtered[mask]
        if filtered.empty:
            print(f"[WARN] prefix '{prefix_filter}'에 해당하는 데이터가 없습니다.")
            return pd.DataFrame()

    # depth 필터
    depth_df = filtered[filtered["depth"] == target_depth].copy()

    if depth_df.empty:
        available_depths = sorted(filtered["depth"].dropna().unique().tolist())
        print(f"[WARN] depth={target_depth}에 해당하는 데이터가 없습니다.")
        print(f"[INFO] 사용 가능한 depth 값: {available_depths}")
        return pd.DataFrame()

    # bucket + prefix 기준으로 집계
    result = (
        depth_df.groupby(["bucket", "prefix"], as_index=False)
        .agg(
            object_count=("object_count", "sum"),
            total_bytes=("total_bytes", "sum"),
        )
    )

    # 평균 object 크기 계산
    result["avg_object_size"] = (
        result["total_bytes"] / result["object_count"].replace(0, float("nan"))
    )

    # 비율 계산
    total_objects = result["object_count"].sum()
    total_bytes = result["total_bytes"].sum()
    result["object_pct"] = (result["object_count"] / total_objects * 100).round(2)
    result["size_pct"] = (result["total_bytes"] / total_bytes * 100).round(2)

    # 정렬
    sort_map = {
        "size": "total_bytes",
        "count": "object_count",
        "prefix": "prefix",
    }
    sort_col = sort_map.get(sort_by, "total_bytes")
    ascending = sort_col == "prefix"
    result = result.sort_values(sort_col, ascending=ascending).reset_index(drop=True)

    # Top N 제한
    if top_n:
        result = result.head(top_n)

    return result


def print_summary(df_result: pd.DataFrame, target_depth: int, output_format: str) -> None:
    """분석 결과 출력"""
    if df_result.empty:
        print("[INFO] 출력할 결과가 없습니다.")
        return

    total_objects = df_result["object_count"].sum()
    total_bytes = df_result["total_bytes"].sum()
    prefix_count = len(df_result)

    print("\n" + "=" * 70)
    print(f"  AIStor iNVENTORY 분석 결과  |  depth = {target_depth}")
    print("=" * 70)
    print(f"  총 prefix 수    : {prefix_count:,}개")
    print(f"  총 object 수    : {total_objects:,}개")
    print(f"  총 데이터 크기  : {human_readable_size(total_bytes)}")
    print("=" * 70 + "\n")

    # 표시용 DataFrame 생성
    display_df = df_result[["bucket", "prefix", "object_count", "total_bytes",
                              "avg_object_size", "object_pct", "size_pct"]].copy()

    display_df["total_bytes_hr"] = display_df["total_bytes"].apply(human_readable_size)
    display_df["avg_size_hr"] = display_df["avg_object_size"].apply(
        lambda x: human_readable_size(x) if not pd.isna(x) else "-"
    )
    display_df["object_pct_str"] = display_df["object_pct"].apply(lambda x: f"{x:.1f}%")
    display_df["size_pct_str"] = display_df["size_pct"].apply(lambda x: f"{x:.1f}%")

    show_cols = {
        "bucket": "버킷",
        "prefix": "Prefix",
        "object_count": "객체 수",
        "total_bytes_hr": "총 크기",
        "avg_size_hr": "평균 크기",
        "object_pct_str": "객체 비율",
        "size_pct_str": "크기 비율",
    }
    out = display_df[list(show_cols.keys())].rename(columns=show_cols)

    if output_format == "json":
        # JSON은 원본 숫자값 출력
        cols = ["bucket", "prefix", "object_count", "total_bytes",
                "avg_object_size", "object_pct", "size_pct"]
        print(df_result[cols].to_json(orient="records", indent=2, force_ascii=False))

    elif output_format == "csv":
        print(out.to_csv(index=False))

    else:  # table (default)
        if HAS_TABULATE:
            print(tabulate(out, headers="keys", tablefmt="rounded_outline",
                           showindex=True, numalign="right"))
        else:
            print(out.to_string(index=True))
            print("\n[TIP] 더 예쁜 출력을 원하면: pip install tabulate")


def save_result(df_result: pd.DataFrame, save_path: str, output_format: str) -> None:
    """결과 파일 저장"""
    if output_format == "json":
        df_result.to_json(save_path, orient="records", indent=2, force_ascii=False)
    elif output_format == "csv":
        df_result.to_csv(save_path, index=False, encoding="utf-8-sig")
    else:
        df_result.to_csv(save_path, index=False, encoding="utf-8-sig")
    print(f"\n[INFO] 결과 저장 완료: {save_path}")


def show_schema_info(df: pd.DataFrame) -> None:
    """데이터셋 기본 정보 출력"""
    print("\n" + "─" * 50)
    print("  데이터셋 정보")
    print("─" * 50)
    print(f"  전체 행 수    : {len(df):,}")
    print(f"  컬럼 목록     : {list(df.columns)}")
    print(f"  버킷 목록     : {sorted(df['bucket'].dropna().unique().tolist())}")
    depth_counts = df.groupby("depth").size().to_dict()
    print(f"  Depth 분포    :")
    for d, cnt in sorted(depth_counts.items()):
        print(f"    depth={d}: {cnt:,}개 prefix")
    print("─" * 50 + "\n")


# ──────────────────────────────────────────────
# CLI
# ──────────────────────────────────────────────

def parse_args():
    parser = argparse.ArgumentParser(
        description="AIStor iNVENTORY parquet 파일을 depth별로 분석합니다.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
예시:
  # depth=2의 모든 prefix 분석 (크기 순 정렬)
  python aistor_inventory_analyzer.py --parquet ./reports/ --depth 2

  # 특정 버킷만, 크기 기준 Top 20
  python aistor_inventory_analyzer.py --parquet report.parquet --depth 3 \\
      --bucket my-data-bucket --sort-by size --top 20

  # 특정 prefix 하위만 분석, JSON 출력
  python aistor_inventory_analyzer.py --parquet ./reports/ --depth 4 \\
      --prefix-filter logs/2024/ --output json

  # 데이터 구조 확인 (분석 없이 스키마만)
  python aistor_inventory_analyzer.py --parquet report.parquet --info-only
        """,
    )
    parser.add_argument(
        "--parquet", "-p", required=True,
        help="parquet 파일 경로, 디렉토리, 또는 glob 패턴"
    )
    parser.add_argument(
        "--depth", "-d", type=int,
        help="분석할 prefix depth (1부터 시작)"
    )
    parser.add_argument(
        "--bucket", "-b", default=None,
        help="특정 버킷만 분석 (생략 시 전체 버킷)"
    )
    parser.add_argument(
        "--prefix-filter", default=None,
        help="특정 prefix 하위만 분석 (예: logs/2024/)"
    )
    parser.add_argument(
        "--sort-by", choices=["size", "count", "prefix"], default="size",
        help="정렬 기준 (default: size)"
    )
    parser.add_argument(
        "--top", type=int, default=None,
        help="상위 N개만 출력"
    )
    parser.add_argument(
        "--output", "-o", choices=["table", "csv", "json"], default="table",
        help="출력 형식 (default: table)"
    )
    parser.add_argument(
        "--save", "-s", default=None,
        help="결과를 파일로 저장할 경로"
    )
    parser.add_argument(
        "--info-only", action="store_true",
        help="파일 구조 정보만 출력 (분석 없음)"
    )
    return parser.parse_args()


def main():
    args = parse_args()

    # 1. 데이터 로드
    df = load_parquet_files(args.parquet)
    validate_columns(df)

    # 타입 정규화
    df["depth"] = pd.to_numeric(df["depth"], errors="coerce").astype("Int64")
    df["object_count"] = pd.to_numeric(df["object_count"], errors="coerce").fillna(0)
    df["total_bytes"] = pd.to_numeric(df["total_bytes"], errors="coerce").fillna(0)

    # 2. 스키마 정보 출력
    show_schema_info(df)

    if args.info_only:
        return

    if args.depth is None:
        print("[ERROR] --depth 옵션이 필요합니다. --info-only 로 사용 가능한 depth를 확인하세요.")
        sys.exit(1)

    # 3. 분석
    result = analyze_by_depth(
        df,
        target_depth=args.depth,
        bucket_filter=args.bucket,
        prefix_filter=args.prefix_filter,
        sort_by=args.sort_by,
        top_n=args.top,
    )

    # 4. 결과 출력
    print_summary(result, args.depth, args.output)

    # 5. 파일 저장
    if args.save and not result.empty:
        save_result(result, args.save, args.output)


if __name__ == "__main__":
    main()

0개의 댓글