26A10d

Young-Kyoo Kim·2026년 4월 10일
#!/usr/bin/env python3
"""
AIStor iNVENTORY Analyzer  (원격 직접 읽기 버전)
=================================================
MinIO AIStor 버킷에 쌓인 iNVENTORY 결과를
로컬 복사 없이 Python에서 직접 스트리밍하여 분석합니다.

저장 경로 구조:
  <DEST_BUCKET>/<PREFIX>/<SOURCE_BUCKET>/<INVENTORY_ID>/<TIMESTAMP>/
      manifest.json
      files/
          file-001.parquet
          file-002.parquet
          ...

parquet 컬럼:
  Bucket, Key, SequenceNumber, Size, LastModifiedDate

사용법:
  # 기본 (최신 timestamp 자동 선택)
  python aistor_inventory_analyzer.py \\
      --endpoint play.min.io:9000 \\
      --access-key Q3AM3UQ867SPQQA43P2F \\
      --secret-key zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG \\
      --dest-bucket inventory-reports \\
      --source-bucket devops-test \\
      --inventory-id my-job-id \\
      --depth 2

  # .env 파일 또는 환경변수로 인증 정보 관리 (권장)
  export AISTOR_ENDPOINT=play.min.io:9000
  export AISTOR_ACCESS_KEY=...
  export AISTOR_SECRET_KEY=...
  python aistor_inventory_analyzer.py \\
      --dest-bucket inventory-reports \\
      --source-bucket devops-test \\
      --inventory-id my-job-id \\
      --depth 2

  # 특정 timestamp 지정 (생략 시 최신 자동 선택)
  python aistor_inventory_analyzer.py ... --timestamp 2025-01-15T03-00Z

  # 구조 확인만 (분석 없이 어떤 job/timestamp가 있는지 확인)
  python aistor_inventory_analyzer.py ... --list-jobs
  python aistor_inventory_analyzer.py ... --info-only
"""

import argparse
import io
import json
import os
import sys

try:
    import pandas as pd
    import pyarrow.parquet as pq
    from minio import Minio
    from minio.error import S3Error
except ImportError as e:
    print(f"[ERROR] 필수 패키지 부족: {e}")
    print("  pip install minio pyarrow pandas tabulate")
    sys.exit(1)

try:
    from tabulate import tabulate
    HAS_TABULATE = True
except ImportError:
    HAS_TABULATE = False


# ──────────────────────────────────────────────────────────
# 유틸
# ──────────────────────────────────────────────────────────

def human_size(n: float) -> str:
    if pd.isna(n) or n == 0:
        return "0 B"
    for unit in ("B", "KB", "MB", "GB", "TB", "PB"):
        if abs(n) < 1024.0:
            return f"{n:,.2f} {unit}"
        n /= 1024.0
    return f"{n:,.2f} EB"


def make_client(endpoint: str, access_key: str, secret_key: str, secure: bool) -> Minio:
    return Minio(
        endpoint,
        access_key=access_key,
        secret_key=secret_key,
        secure=secure,
    )


# ──────────────────────────────────────────────────────────
# MinIO 경로 탐색
# ──────────────────────────────────────────────────────────

def list_jobs(client: Minio, dest_bucket: str, prefix: str,
              source_bucket: str) -> dict[str, list[str]]:
    """
    dest_bucket 안에서 source_bucket 하위의 inventory job ID와
    각 job의 timestamp 목록을 반환합니다.

    탐색 경로: <prefix>/<source_bucket>/<job_id>/<timestamp>/manifest.json
    """
    base = f"{prefix}/{source_bucket}/" if prefix else f"{source_bucket}/"
    base = base.lstrip("/")

    jobs: dict[str, list[str]] = {}
    try:
        objects = client.list_objects(dest_bucket, prefix=base, recursive=True)
        for obj in objects:
            key = obj.object_name  # e.g. "reports/devops-test/my-job/2025-01-15T03-00Z/manifest.json"
            if not key.endswith("manifest.json"):
                continue
            # base 이후 파트: <job_id>/<timestamp>/manifest.json
            rel = key[len(base):]
            parts = rel.split("/")
            if len(parts) >= 3:
                job_id, timestamp = parts[0], parts[1]
                jobs.setdefault(job_id, []).append(timestamp)
    except S3Error as e:
        sys.exit(f"[ERROR] 버킷 조회 실패: {e}")

    # timestamp 최신순 정렬
    for jid in jobs:
        jobs[jid] = sorted(jobs[jid], reverse=True)
    return jobs


def resolve_manifest_key(prefix: str, source_bucket: str,
                          inventory_id: str, timestamp: str) -> str:
    """manifest.json의 오브젝트 키 조립"""
    parts = [p for p in [prefix, source_bucket, inventory_id, timestamp, "manifest.json"] if p]
    return "/".join(parts)


def read_manifest(client: Minio, dest_bucket: str, manifest_key: str) -> dict:
    """manifest.json 읽기"""
    try:
        resp = client.get_object(dest_bucket, manifest_key)
        data = json.loads(resp.read().decode("utf-8"))
        resp.close()
        return data
    except S3Error as e:
        sys.exit(f"[ERROR] manifest 읽기 실패 ({manifest_key}): {e}")


def resolve_parquet_keys(manifest: dict, dest_bucket: str,
                         prefix: str, source_bucket: str,
                         inventory_id: str, timestamp: str) -> list[str]:
    """
    manifest의 files 목록에서 parquet 오브젝트 키 목록을 반환.
    manifest의 key 필드가 절대경로면 그대로, 상대경로면 base_path 를 앞에 붙임.
    """
    files = manifest.get("files", [])
    if not files:
        sys.exit("[ERROR] manifest에 files 항목이 없습니다.")

    keys = []
    base = "/".join(p for p in [prefix, source_bucket, inventory_id, timestamp, "files"] if p)

    for f in files:
        # f 는 {"key": "...", "size": ...} 또는 문자열일 수 있음
        raw_key = f["key"] if isinstance(f, dict) else str(f)

        # 절대 경로(버킷명 포함) vs 상대 경로 처리
        if raw_key.startswith(dest_bucket + "/"):
            raw_key = raw_key[len(dest_bucket) + 1:]

        # 상대 경로인 경우 base 앞에 붙임
        if not raw_key.startswith(source_bucket) and not raw_key.startswith(prefix or source_bucket):
            obj_key = f"{base}/{raw_key.lstrip('/')}"
        else:
            obj_key = raw_key

        keys.append(obj_key)

    return keys


# ──────────────────────────────────────────────────────────
# 원격 parquet 스트리밍 읽기
# ──────────────────────────────────────────────────────────

def stream_parquet(client: Minio, dest_bucket: str, object_key: str) -> pd.DataFrame:
    """
    MinIO에서 parquet 파일을 메모리로 스트리밍하여 DataFrame 반환.
    256 MB 파일도 청크 없이 pyarrow가 처리 (columnar 포맷 특성상 효율적).
    """
    try:
        resp = client.get_object(dest_bucket, object_key)
        buf = io.BytesIO(resp.read())
        resp.close()
    except S3Error as e:
        print(f"  [WARN] 파일 읽기 실패, 건너뜀 ({object_key}): {e}")
        return pd.DataFrame()

    try:
        table = pq.read_table(buf)
        return table.to_pandas()
    except Exception as e:
        print(f"  [WARN] parquet 파싱 실패, 건너뜀 ({object_key}): {e}")
        return pd.DataFrame()


def load_all_parquets(client: Minio, dest_bucket: str,
                      parquet_keys: list[str]) -> pd.DataFrame:
    """모든 parquet 파일을 순차 스트리밍 후 병합"""
    dfs = []
    total = len(parquet_keys)
    for i, key in enumerate(parquet_keys, 1):
        fname = key.split("/")[-1]
        print(f"  [{i}/{total}] {fname} 읽는 중...", end=" ", flush=True)
        df = stream_parquet(client, dest_bucket, key)
        if not df.empty:
            print(f"{len(df):,} rows")
            dfs.append(df)
        else:
            print("(건너뜀)")

    if not dfs:
        sys.exit("[ERROR] 읽을 수 있는 parquet 파일이 없습니다.")

    result = pd.concat(dfs, ignore_index=True)
    print(f"\n[INFO] 총 {len(result):,} rows 로드 완료")
    return result


# ──────────────────────────────────────────────────────────
# Key → prefix 파생
# ──────────────────────────────────────────────────────────

def extract_prefix_at_depth(key: str, depth: int) -> str:
    """
    Key의 디렉토리 경로를 depth 단계까지 잘라 prefix 반환.

    예) key='logs/2024/01/app.log', depth=2  →  'logs/2024/'
        key='top-level.txt',       depth=1  →  '(root)'
    """
    parts = key.rstrip("/").split("/")
    dir_parts = parts[:-1]   # 파일명(마지막) 제외
    if len(dir_parts) < depth:
        return "(root)"
    return "/".join(dir_parts[:depth]) + "/"


# ──────────────────────────────────────────────────────────
# 분석
# ──────────────────────────────────────────────────────────

def analyze(df: pd.DataFrame, depth: int,
            bucket_filter: str = None,
            prefix_filter: str = None,
            sort_by: str = "size",
            top_n: int = None) -> pd.DataFrame:

    work = df.copy()

    if bucket_filter:
        work = work[work["Bucket"] == bucket_filter]
        if work.empty:
            print(f"[WARN] 버킷 '{bucket_filter}' 데이터 없음")
            print(f"[INFO] 버킷 목록: {sorted(df['Bucket'].dropna().unique())}")
            return pd.DataFrame()

    if prefix_filter:
        work = work[work["Key"].str.startswith(prefix_filter, na=False)]
        if work.empty:
            print(f"[WARN] prefix '{prefix_filter}' 에 해당하는 Key 없음")
            return pd.DataFrame()

    work["_prefix"] = work["Key"].apply(lambda k: extract_prefix_at_depth(str(k), depth))

    result = (
        work.groupby(["Bucket", "_prefix"], as_index=False)
        .agg(object_count=("Key", "count"), total_bytes=("Size", "sum"))
        .rename(columns={"_prefix": "prefix"})
    )

    result["avg_object_size"] = (
        result["total_bytes"] / result["object_count"].replace(0, float("nan"))
    )
    tot_obj   = result["object_count"].sum()
    tot_bytes = result["total_bytes"].sum()
    result["object_pct"] = (result["object_count"] / tot_obj   * 100).round(2)
    result["size_pct"]   = (result["total_bytes"]   / tot_bytes * 100).round(2)

    sort_col = {"size": "total_bytes", "count": "object_count", "prefix": "prefix"}[sort_by]
    result = result.sort_values(sort_col, ascending=(sort_col == "prefix")).reset_index(drop=True)

    if top_n:
        result = result.head(top_n)
    return result


# ──────────────────────────────────────────────────────────
# 출력
# ──────────────────────────────────────────────────────────

def print_result(result: pd.DataFrame, depth: int, fmt: str) -> None:
    if result.empty:
        print("[INFO] 출력할 결과가 없습니다.")
        return

    tot_obj   = result["object_count"].sum()
    tot_bytes = result["total_bytes"].sum()

    print("\n" + "=" * 72)
    print(f"  AIStor iNVENTORY 분석  |  depth = {depth}")
    print("=" * 72)
    print(f"  prefix 수      : {len(result):,}개")
    print(f"  총 object 수   : {tot_obj:,}개")
    print(f"  총 데이터 크기 : {human_size(tot_bytes)}")
    print("=" * 72 + "\n")

    if fmt == "json":
        print(result.to_json(orient="records", indent=2, force_ascii=False))
        return
    if fmt == "csv":
        print(result.to_csv(index=False))
        return

    disp = result.assign(
        total_size = result["total_bytes"].apply(human_size),
        avg_size   = result["avg_object_size"].apply(
                         lambda x: human_size(x) if not pd.isna(x) else "-"),
        obj_pct    = result["object_pct"].apply(lambda x: f"{x:.1f}%"),
        size_pct   = result["size_pct"].apply(lambda x: f"{x:.1f}%"),
    )[[
        "Bucket", "prefix", "object_count", "total_size", "avg_size", "obj_pct", "size_pct",
    ]].rename(columns={
        "Bucket": "버킷", "prefix": "Prefix", "object_count": "객체 수",
        "total_size": "총 크기", "avg_size": "평균 크기",
        "obj_pct": "객체 비율", "size_pct": "크기 비율",
    })

    if HAS_TABULATE:
        print(tabulate(disp, headers="keys", tablefmt="rounded_outline",
                       showindex=True, numalign="right"))
    else:
        print(disp.to_string(index=True))
        print("\n[TIP] pip install tabulate  →  더 예쁜 테이블 출력")


def print_info(df: pd.DataFrame) -> None:
    print("\n" + "─" * 54)
    print("  데이터셋 기본 정보")
    print("─" * 54)
    print(f"  총 object 수  : {len(df):,}")
    print(f"  컬럼          : {list(df.columns)}")
    print(f"  총 데이터 크기: {human_size(df['Size'].sum())}")
    print(f"  버킷 목록     : {sorted(df['Bucket'].dropna().unique().tolist())}")
    print("\n  depth별 unique prefix 수:")
    for d in range(1, 7):
        n = df["Key"].apply(lambda k: extract_prefix_at_depth(str(k), d)).nunique()
        print(f"    depth={d}: {n:>8,}개 prefix")
    print("─" * 54)
    print("[TIP] --depth 값을 위 표를 참고해 선택하세요.\n")


def save_result(result: pd.DataFrame, path: str, fmt: str) -> None:
    if fmt == "json":
        result.to_json(path, orient="records", indent=2, force_ascii=False)
    else:
        result.to_csv(path, index=False, encoding="utf-8-sig")
    print(f"[INFO] 저장 완료: {path}")


# ──────────────────────────────────────────────────────────
# CLI
# ──────────────────────────────────────────────────────────

def parse_args():
    p = argparse.ArgumentParser(
        description="AIStor iNVENTORY parquet를 원격에서 직접 읽어 depth별 prefix 집계",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
환경변수 (--access-key/--secret-key 대신 사용 가능):
  AISTOR_ENDPOINT    MinIO endpoint  (예: minio.example.com:9000)
  AISTOR_ACCESS_KEY  액세스 키
  AISTOR_SECRET_KEY  시크릿 키

예시:
  # job 목록과 timestamp 확인
  python aistor_inventory_analyzer.py \\
      --endpoint minio.example.com:9000 \\
      --access-key AKID --secret-key SKID \\
      --dest-bucket inventory-reports --source-bucket devops-test \\
      --list-jobs

  # 최신 timestamp 자동 선택, depth=2 분석
  python aistor_inventory_analyzer.py \\
      --endpoint minio.example.com:9000 \\
      --access-key AKID --secret-key SKID \\
      --dest-bucket inventory-reports --source-bucket devops-test \\
      --inventory-id my-job-id --depth 2

  # 특정 timestamp 지정, Top 30, CSV 저장
  python aistor_inventory_analyzer.py \\
      --endpoint minio.example.com:9000 \\
      --access-key AKID --secret-key SKID \\
      --dest-bucket inventory-reports --source-bucket devops-test \\
      --inventory-id my-job-id \\
      --timestamp 2025-01-15T03-00Z \\
      --depth 3 --sort-by size --top 30 --save result.csv

  # prefix 드릴다운 + JSON 출력
  python aistor_inventory_analyzer.py ... \\
      --inventory-id my-job-id --depth 4 \\
      --prefix-filter logs/2024/ -o json
        """,
    )

    # 연결 정보
    conn = p.add_argument_group("MinIO 연결 정보")
    conn.add_argument("--endpoint",   "-e",
                      default=os.environ.get("AISTOR_ENDPOINT"),
                      help="MinIO endpoint (예: minio.example.com:9000)")
    conn.add_argument("--access-key",
                      default=os.environ.get("AISTOR_ACCESS_KEY"),
                      help="액세스 키 (환경변수 AISTOR_ACCESS_KEY 도 가능)")
    conn.add_argument("--secret-key",
                      default=os.environ.get("AISTOR_SECRET_KEY"),
                      help="시크릿 키 (환경변수 AISTOR_SECRET_KEY 도 가능)")
    conn.add_argument("--no-tls",     action="store_true",
                      help="TLS 비활성화 (http 접속)")

    # 경로 정보
    path = p.add_argument_group("inventory 경로")
    path.add_argument("--dest-bucket",   required=True,
                      help="inventory 결과가 저장된 버킷명")
    path.add_argument("--prefix",        default="",
                      help="dest-bucket 안의 prefix (설정 YAML의 destination.prefix, 없으면 생략)")
    path.add_argument("--source-bucket", required=True,
                      help="인벤토리를 생성한 원본 버킷명")
    path.add_argument("--inventory-id",
                      help="inventory job ID (--list-jobs 생략 시 필수)")
    path.add_argument("--timestamp",     default=None,
                      help="특정 실행 timestamp (예: 2025-01-15T03-00Z). 생략 시 최신 자동 선택")

    # 분석 옵션
    ana = p.add_argument_group("분석 옵션")
    ana.add_argument("--depth",         "-d", type=int,
                     help="분석할 prefix depth (1~)")
    ana.add_argument("--bucket-filter", "-b", default=None,
                     help="특정 Bucket 컬럼 값으로 필터")
    ana.add_argument("--prefix-filter",       default=None,
                     help="Key prefix 필터 (예: logs/2024/)")
    ana.add_argument("--sort-by",
                     choices=["size", "count", "prefix"], default="size")
    ana.add_argument("--top",           type=int, default=None,
                     help="상위 N개만 출력")
    ana.add_argument("--output", "-o",
                     choices=["table", "csv", "json"], default="table")
    ana.add_argument("--save",   "-s",  default=None,
                     help="결과 저장 경로")

    # 탐색 모드
    p.add_argument("--list-jobs",  action="store_true",
                   help="job ID / timestamp 목록만 출력하고 종료")
    p.add_argument("--info-only",  action="store_true",
                   help="데이터 로드 후 스키마/depth 분포만 출력")

    return p.parse_args()


def main():
    args = parse_args()

    # 연결 정보 검증
    if not args.endpoint:
        sys.exit("[ERROR] --endpoint 또는 환경변수 AISTOR_ENDPOINT 가 필요합니다.")
    if not args.access_key or not args.secret_key:
        sys.exit("[ERROR] --access-key / --secret-key 또는 환경변수가 필요합니다.")

    client = make_client(
        args.endpoint,
        args.access_key,
        args.secret_key,
        secure=not args.no_tls,
    )

    # ── --list-jobs ──────────────────────────────────────
    if args.list_jobs:
        print(f"\n[INFO] '{args.dest_bucket}' 버킷의 inventory job 목록 조회 중...")
        jobs = list_jobs(client, args.dest_bucket, args.prefix, args.source_bucket)
        if not jobs:
            print("[INFO] inventory job을 찾을 수 없습니다.")
            return
        print(f"\n{'Job ID':<40}  {'Timestamp (최신순)'}")
        print("─" * 72)
        for jid, timestamps in sorted(jobs.items()):
            for i, ts in enumerate(timestamps):
                label = jid if i == 0 else ""
                latest = " ← 최신" if i == 0 else ""
                print(f"{label:<40}  {ts}{latest}")
        print()
        return

    # ── inventory-id 필수 확인 ────────────────────────────
    if not args.inventory_id:
        sys.exit("[ERROR] --inventory-id 가 필요합니다. --list-jobs 로 목록을 먼저 확인하세요.")

    # ── timestamp 자동 선택 ───────────────────────────────
    timestamp = args.timestamp
    if not timestamp:
        print("[INFO] --timestamp 미지정 → 최신 timestamp 자동 선택 중...")
        jobs = list_jobs(client, args.dest_bucket, args.prefix, args.source_bucket)
        ts_list = jobs.get(args.inventory_id, [])
        if not ts_list:
            sys.exit(f"[ERROR] inventory-id '{args.inventory_id}' 를 찾을 수 없습니다.")
        timestamp = ts_list[0]
        print(f"[INFO] 선택된 timestamp: {timestamp}")

    # ── manifest 읽기 ─────────────────────────────────────
    manifest_key = resolve_manifest_key(
        args.prefix, args.source_bucket, args.inventory_id, timestamp
    )
    print(f"[INFO] manifest 읽기: {args.dest_bucket}/{manifest_key}")
    manifest = read_manifest(client, args.dest_bucket, manifest_key)

    fmt = manifest.get("fileFormat", "parquet").lower()
    if fmt != "parquet":
        sys.exit(f"[ERROR] 이 도구는 parquet 포맷만 지원합니다. (현재 포맷: {fmt})")

    print(f"[INFO] sourceBucket : {manifest.get('sourceBucket', '?')}")
    print(f"[INFO] inventoryId  : {manifest.get('inventoryId', '?')}")
    print(f"[INFO] createdAt    : {manifest.get('createdAt', '?')}")
    file_count = len(manifest.get("files", []))
    print(f"[INFO] parquet 파일 : {file_count}개\n")

    # ── parquet 키 목록 ───────────────────────────────────
    parquet_keys = resolve_parquet_keys(
        manifest, args.dest_bucket,
        args.prefix, args.source_bucket,
        args.inventory_id, timestamp,
    )

    # ── 스트리밍 로드 ─────────────────────────────────────
    df = load_all_parquets(client, args.dest_bucket, parquet_keys)

    # 타입 정규화
    df["Size"] = pd.to_numeric(df["Size"], errors="coerce").fillna(0)

    # ── --info-only ───────────────────────────────────────
    if args.info_only:
        print_info(df)
        return

    if not args.depth:
        sys.exit("[ERROR] --depth 가 필요합니다. --info-only 로 depth 분포를 먼저 확인하세요.")

    # ── 분석 & 출력 ───────────────────────────────────────
    result = analyze(
        df,
        depth=args.depth,
        bucket_filter=args.bucket_filter,
        prefix_filter=args.prefix_filter,
        sort_by=args.sort_by,
        top_n=args.top,
    )

    print_result(result, args.depth, args.output)

    if args.save and not result.empty:
        save_result(result, args.save, args.output)


if __name__ == "__main__":
    main()

0개의 댓글