26A10b

Young-Kyoo Kim·2026년 4월 10일
#!/usr/bin/env python3
"""
AIStor iNVENTORY Parquet Analyzer
====================================
MinIO AIStor iNVENTORY 결과 parquet 파일 컬럼:
  Bucket, Key, SequenceNumber, Size, LastModifiedDate

Key 예시:
  logs/2024/01/app.log       → depth=1: logs/
                              → depth=2: logs/2024/
                              → depth=3: logs/2024/01/
  images/raw/photo.jpg       → depth=1: images/
                              → depth=2: images/raw/

사용법:
  python aistor_inventory_analyzer.py --parquet <경로> --depth <int> [옵션]

옵션:
  --parquet, -p   parquet 파일 / 디렉토리 / glob 패턴  (필수)
  --depth,   -d   분석할 prefix depth (필수, --info-only 제외)
  --bucket,  -b   특정 버킷만 분석
  --prefix-filter 특정 prefix 하위만 분석 (예: logs/2024/)
  --sort-by       size | count | prefix  (default: size)
  --top           상위 N개만 출력
  --output, -o    table | csv | json     (default: table)
  --save,   -s    결과 파일 저장 경로
  --info-only     스키마·버킷 목록·depth 분포만 출력
"""

import argparse
import glob
import sys
from pathlib import Path

try:
    import pandas as pd
except ImportError:
    print("[ERROR] pandas가 없습니다: pip install pandas pyarrow")
    sys.exit(1)

try:
    from tabulate import tabulate
    HAS_TABULATE = True
except ImportError:
    HAS_TABULATE = False


# ──────────────────────────────────────────────────────────
# 유틸
# ──────────────────────────────────────────────────────────

def human_size(n: float) -> str:
    if pd.isna(n) or n == 0:
        return "0 B"
    for unit in ("B", "KB", "MB", "GB", "TB", "PB"):
        if abs(n) < 1024.0:
            return f"{n:,.2f} {unit}"
        n /= 1024.0
    return f"{n:,.2f} EB"


def load_parquets(path: str) -> pd.DataFrame:
    p = Path(path)

    if p.is_file():
        print(f"[INFO] 파일 로드: {p}")
        return pd.read_parquet(p)

    if p.is_dir():
        files = sorted(p.rglob("*.parquet"))
        if not files:
            sys.exit(f"[ERROR] '{path}' 에 parquet 파일이 없습니다.")
        print(f"[INFO] {len(files)}개 parquet 파일 로드 중...")
        for f in files:
            print(f"  · {f}")
        return pd.concat([pd.read_parquet(f) for f in files], ignore_index=True)

    # glob 패턴
    files = sorted(glob.glob(path, recursive=True))
    if not files:
        sys.exit(f"[ERROR] '{path}' 에 해당하는 파일이 없습니다.")
    return pd.concat([pd.read_parquet(f) for f in files], ignore_index=True)


def validate_columns(df: pd.DataFrame) -> None:
    required = {"Bucket", "Key", "Size"}
    missing = required - set(df.columns)
    if missing:
        sys.exit(f"[ERROR] 필수 컬럼 누락: {missing}\n현재 컬럼: {list(df.columns)}")


# ──────────────────────────────────────────────────────────
# Key → prefix 파생
# ──────────────────────────────────────────────────────────

def extract_prefix_at_depth(key: str, depth: int) -> str:
    """
    Key를 '/' 로 분리해 depth 단계까지의 prefix 를 반환.
    마지막 세그먼트(파일명)는 포함하지 않음.

    예)  key='logs/2024/01/app.log', depth=1 → 'logs/'
         key='logs/2024/01/app.log', depth=2 → 'logs/2024/'
         key='logs/2024/01/app.log', depth=3 → 'logs/2024/01/'
         key='top-level-file.txt',  depth=1 → '(root)'
    """
    parts = key.rstrip("/").split("/")
    dir_parts = parts[:-1]          # 파일명 제외, 디렉토리 세그먼트만

    if not dir_parts or len(dir_parts) < depth:
        return "(root)"

    return "/".join(dir_parts[:depth]) + "/"


def add_prefix_column(df: pd.DataFrame, depth: int) -> pd.DataFrame:
    df = df.copy()
    df["_prefix"] = df["Key"].apply(lambda k: extract_prefix_at_depth(str(k), depth))
    return df


# ──────────────────────────────────────────────────────────
# 분석
# ──────────────────────────────────────────────────────────

def analyze(
    df: pd.DataFrame,
    depth: int,
    bucket_filter: str = None,
    prefix_filter: str = None,
    sort_by: str = "size",
    top_n: int = None,
) -> pd.DataFrame:

    work = df.copy()

    # 버킷 필터
    if bucket_filter:
        work = work[work["Bucket"] == bucket_filter]
        if work.empty:
            print(f"[WARN] 버킷 '{bucket_filter}' 에 해당하는 데이터가 없습니다.")
            print(f"[INFO] 버킷 목록: {sorted(df['Bucket'].dropna().unique())}")
            return pd.DataFrame()

    # prefix 필터 (특정 경로 하위만 드릴다운)
    if prefix_filter:
        work = work[work["Key"].str.startswith(prefix_filter, na=False)]
        if work.empty:
            print(f"[WARN] prefix '{prefix_filter}' 에 해당하는 Key가 없습니다.")
            return pd.DataFrame()

    # depth 기준 prefix 컬럼 추가
    work = add_prefix_column(work, depth)

    # bucket + prefix 기준 집계
    result = (
        work.groupby(["Bucket", "_prefix"], as_index=False)
        .agg(
            object_count=("Key",  "count"),
            total_bytes =("Size", "sum"),
        )
        .rename(columns={"_prefix": "prefix"})
    )

    # 평균 객체 크기
    result["avg_object_size"] = (
        result["total_bytes"] / result["object_count"].replace(0, float("nan"))
    )

    # 비율
    total_obj   = result["object_count"].sum()
    total_bytes = result["total_bytes"].sum()
    result["object_pct"] = (result["object_count"] / total_obj   * 100).round(2)
    result["size_pct"]   = (result["total_bytes"]   / total_bytes * 100).round(2)

    # 정렬
    sort_col = {"size": "total_bytes", "count": "object_count", "prefix": "prefix"}[sort_by]
    result = result.sort_values(sort_col, ascending=(sort_col == "prefix")).reset_index(drop=True)

    if top_n:
        result = result.head(top_n)

    return result


# ──────────────────────────────────────────────────────────
# 출력
# ──────────────────────────────────────────────────────────

def print_result(result: pd.DataFrame, depth: int, fmt: str) -> None:
    if result.empty:
        print("[INFO] 출력할 결과가 없습니다.")
        return

    total_obj   = result["object_count"].sum()
    total_bytes = result["total_bytes"].sum()

    print("\n" + "=" * 72)
    print(f"  AIStor iNVENTORY 분석  |  depth = {depth}")
    print("=" * 72)
    print(f"  prefix 수      : {len(result):,}개")
    print(f"  총 object 수   : {total_obj:,}개")
    print(f"  총 데이터 크기 : {human_size(total_bytes)}")
    print("=" * 72 + "\n")

    if fmt == "json":
        print(result.to_json(orient="records", indent=2, force_ascii=False))
        return

    if fmt == "csv":
        print(result.to_csv(index=False))
        return

    # table
    disp = result.assign(
        total_size = result["total_bytes"].apply(human_size),
        avg_size   = result["avg_object_size"].apply(
                         lambda x: human_size(x) if not pd.isna(x) else "-"),
        obj_pct    = result["object_pct"].apply(lambda x: f"{x:.1f}%"),
        size_pct   = result["size_pct"].apply(lambda x: f"{x:.1f}%"),
    )[[
        "Bucket", "prefix", "object_count", "total_size",
        "avg_size", "obj_pct", "size_pct",
    ]].rename(columns={
        "Bucket":       "버킷",
        "prefix":       "Prefix",
        "object_count": "객체 수",
        "total_size":   "총 크기",
        "avg_size":     "평균 크기",
        "obj_pct":      "객체 비율",
        "size_pct":     "크기 비율",
    })

    if HAS_TABULATE:
        print(tabulate(disp, headers="keys", tablefmt="rounded_outline",
                       showindex=True, numalign="right"))
    else:
        print(disp.to_string(index=True))
        print("\n[TIP] pip install tabulate 로 더 예쁜 출력 가능")


def print_info(df: pd.DataFrame) -> None:
    """--info-only: 스키마 및 depth 분포 미리보기"""
    print("\n" + "─" * 54)
    print("  데이터셋 기본 정보")
    print("─" * 54)
    print(f"  총 object 수  : {len(df):,}")
    print(f"  컬럼          : {list(df.columns)}")
    print(f"  총 데이터 크기: {human_size(df['Size'].sum())}")
    print(f"  버킷 목록     : {sorted(df['Bucket'].dropna().unique().tolist())}")

    print("\n  depth별 unique prefix 수 (참고용):")
    for d in range(1, 7):
        n = df["Key"].apply(lambda k: extract_prefix_at_depth(str(k), d)).nunique()
        print(f"    depth={d}: {n:>8,}개 prefix")
    print("─" * 54 + "\n")
    print("[TIP] --depth 값을 위 표를 참고해 선택하세요.")


def save_result(result: pd.DataFrame, path: str, fmt: str) -> None:
    if fmt == "json":
        result.to_json(path, orient="records", indent=2, force_ascii=False)
    else:
        result.to_csv(path, index=False, encoding="utf-8-sig")
    print(f"[INFO] 저장 완료: {path}")


# ──────────────────────────────────────────────────────────
# CLI
# ──────────────────────────────────────────────────────────

def parse_args():
    p = argparse.ArgumentParser(
        description="AIStor iNVENTORY parquet → depth별 prefix 집계 분석",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
예시:
  # 스키마 및 depth 분포 확인 (먼저 실행 권장)
  python aistor_inventory_analyzer.py -p ./reports/ --info-only

  # depth=2, 크기 순 정렬
  python aistor_inventory_analyzer.py -p ./reports/ -d 2

  # 특정 버킷, depth=3, Top 30, CSV 저장
  python aistor_inventory_analyzer.py -p ./reports/ -d 3 \\
      --bucket my-bucket --sort-by size --top 30 --save out.csv

  # logs/2024/ 하위만 depth=4 드릴다운, JSON 출력
  python aistor_inventory_analyzer.py -p report.parquet -d 4 \\
      --prefix-filter logs/2024/ -o json
        """,
    )
    p.add_argument("--parquet",        "-p", required=True,
                   help="parquet 파일 / 디렉토리 / glob 패턴")
    p.add_argument("--depth",          "-d", type=int,
                   help="분석할 prefix depth (1부터 시작)")
    p.add_argument("--bucket",         "-b", default=None,
                   help="특정 버킷만 분석")
    p.add_argument("--prefix-filter",        default=None,
                   help="Key prefix 필터 (예: logs/2024/)")
    p.add_argument("--sort-by",
                   choices=["size", "count", "prefix"], default="size",
                   help="정렬 기준 (default: size)")
    p.add_argument("--top",            type=int, default=None,
                   help="상위 N개만 출력")
    p.add_argument("--output", "-o",
                   choices=["table", "csv", "json"], default="table",
                   help="출력 형식 (default: table)")
    p.add_argument("--save",   "-s",   default=None,
                   help="결과 저장 경로 (.csv 또는 .json)")
    p.add_argument("--info-only",      action="store_true",
                   help="스키마 정보만 출력하고 종료")
    return p.parse_args()


def main():
    args = parse_args()

    df = load_parquets(args.parquet)
    validate_columns(df)

    # 타입 정규화
    df["Size"] = pd.to_numeric(df["Size"], errors="coerce").fillna(0)

    if args.info_only:
        print_info(df)
        return

    if args.depth is None:
        sys.exit("[ERROR] --depth 가 필요합니다. 먼저 --info-only 로 depth 분포를 확인하세요.")

    result = analyze(
        df,
        depth=args.depth,
        bucket_filter=args.bucket,
        prefix_filter=args.prefix_filter,
        sort_by=args.sort_by,
        top_n=args.top,
    )

    print_result(result, args.depth, args.output)

    if args.save and not result.empty:
        save_result(result, args.save, args.output)


if __name__ == "__main__":
    main()

0개의 댓글