26J02a1

Young-Kyoo Kim·2일 전
"""
[2단계] 날짜 + 클러스터(COMPUTE/STORAGE) 복합 파티셔닝 및 데이터 마감 정산 엔진 (ILM 외주화 버전)
"""
import os
import argparse
import boto3
from botocore.exceptions import ClientError
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone
from pathlib import Path

import config
from config import (
    RAW_DIR, MERGED_DIR, CLUSTER_NODE_PATTERNS, 
    get_workload_type, classify_node_infrastructure
)

KST = timezone(timedelta(hours=9))
W_CPU, W_MEM, W_PV = 1.0, 0.11, 0.02

def parse_arguments():
    parser = argparse.ArgumentParser(description="FinOps Advanced Reprocessing Engine")
    parser.add_argument("--days", type=int, default=None)
    parser.add_argument("--start-date", type=str, default=None)
    parser.add_argument("--end-date", type=str, default=None)
    parser.add_argument("--cluster", type=str, default="ALL", choices=["COMPUTE", "STORAGE", "ALL"])
    parser.add_argument("--workload-domain", type=str, default=None)
    return parser.parse_args()

def get_minio_client():
    return boto3.client(
        "s3", endpoint_url=os.getenv("MINIO_ENDPOINT"), aws_access_key_id=os.getenv("MINIO_ACCESS_KEY"),
        aws_secret_access_key=os.getenv("MINIO_SECRET_KEY"), region_name="us-east-1",
        config=boto3.session.Config(signature_version="s3v4")
    )

def sync_down_raw_from_minio(s3_client, bucket_name, start_dt, end_dt):
    chunk_delta = timedelta(hours=1)
    current_start = start_dt
    print(f"\n📡 [AIStor 스마트 캐시 엔진] 원천 데이터 가용성 체크 시작...")

    while current_start < end_dt:
        chunk_str = current_start.strftime("%Y%m%d_%H")
        f_name = f"prom_raw_{chunk_str}.parquet"
        object_key = f"raw/{f_name}"
        local_path = RAW_DIR / f_name
        
        if local_path.exists() and local_path.stat().st_size > 0:
            print(f"   ⚡ [Cache Hit] 로컬 캐시 자산 활용 (다운로드 스킵): {f_name}")
        else:
            try:
                s3_client.head_object(Bucket=bucket_name, Key=object_key)
                print(f"   📥 [Cache Miss] AIStor 백업으로부터 다운로드: {object_key}")
                s3_client.download_file(bucket_name, object_key, str(local_path))
            except ClientError as e:
                if e.response['Error']['Code'] != "404":
                    print(f"   ❌ AIStor 통신 실패 에러 ({f_name}): {str(e)}")
            except Exception as e:
                print(f"   ⚠️ 일반 다운로드 예외 발생 ({f_name}): {str(e)}")
                
        current_start += chunk_delta

def main():
    args = parse_arguments()
    s3_client = get_minio_client()
    bucket_name = os.getenv("MINIO_RAW_BUCKET", "devops-test")
    now_kst = datetime.now(KST)
    
    if args.start_date and args.end_date:
        start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
        end_dt = (datetime.strptime(args.end_date, "%Y-%m-%d") + timedelta(days=1)).replace(tzinfo=KST)
    elif args.days:
        start_dt = (now_kst - timedelta(days=args.days)).replace(hour=0, minute=0, second=0, microsecond=0)
        end_dt = now_kst
    else:
        yesterday = (now_kst - timedelta(days=1)).date()
        start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
        end_dt = datetime.combine(now_kst.date(), datetime.min.time()).replace(tzinfo=KST)

    # 1. AIStor 동기화 작동
    sync_down_raw_from_minio(s3_client, bucket_name, start_dt, end_dt)

    raw_files = list(RAW_DIR.glob("prom_raw_*.parquet"))
    if not raw_files:
        print("\n🚨 [중단] 정산 가동할 물리 원천 파일이 존재하지 않습니다.")
        return

    print(f"\n📊 [데이터 병합 및 인프라 매핑 일원화 스캔] {len(raw_files)}개 파티션 로드 중...")
    df_raw = pd.concat([pd.read_parquet(f) for f in raw_files], ignore_index=True)
    
    df_raw["date"] = df_raw["timestamp"].dt.strftime("%Y%m%d")
    df_raw = df_raw[(df_raw["timestamp"] >= start_dt.replace(tzinfo=None)) & (df_raw["timestamp"] < end_dt.replace(tzinfo=None))].reset_index(drop=True)
    if df_raw.empty: 
        print("⚠️  해당 기간 내 정산할 데이터 로우가 없습니다.")
        return

    # 인프라 위계 정적 매핑 분리
    infra_meta = df_raw["node"].apply(classify_node_infrastructure)
    df_raw["cluster_type"]    = [x[0] for x in infra_meta]  # COMPUTE 또는 STORAGE
    df_raw["workload_domain"] = [x[1] for x in infra_meta]
    df_raw["workload_type"]   = df_raw["pod"].apply(get_workload_type)

    # ─── 💡 [분할 정복 핵심 구역] 날짜 및 클러스터 타입별 루프 스플릿 연산 ───
    target_dates = df_raw["date"].unique()
    target_clusters = ["COMPUTE", "STORAGE"] if args.cluster.upper() == "ALL" else [args.cluster.upper()]

    print(f"🎯 분할 연산 타겟 스케줄러 가동 -> 일자 풀: {target_dates} | 클러스터 풀: {target_clusters}")

    for date_chunk in target_dates:
        for cluster_chunk in target_clusters:
            
            # 격리 필터링
            df_slice = df_raw[(df_raw["date"] == date_chunk) & (df_raw["cluster_type"] == cluster_chunk)]
            if df_slice.empty:
                continue

            print(f"   ⏳ [정산 컴파일 중] {date_chunk} ➡️ {cluster_chunk} 그룹 연산 기동...")

            if args.workload_domain:
                df_slice = df_slice[df_slice["workload_domain"] == args.workload_domain]
                if df_slice.empty: continue

            target_fields = ["cpu_request", "cpu_limit", "cpu_usage", "cpu_throttled", "mem_request", "mem_limit", "mem_usage", "mem_rss", "oom_event", "pv_capacity", "pv_used"]
            for col in target_fields:
                if col not in df_slice.columns: df_slice[col] = 0.0

            # 그룹바이 집계 연산
            df_pod = df_slice.groupby(["date", "cluster_type", "workload_domain", "namespace", "workload_type", "node", "pod", "container"]).agg(
                minutes_running      = ("timestamp", "size"), cpu_request_max      = ("cpu_request", "max"), cpu_limit_max        = ("cpu_limit", "max"),
                cpu_usage_p95        = ("cpu_usage", lambda x: x.quantile(0.95)), cpu_throttled_max    = ("cpu_throttled", "max"),
                mem_request_max      = ("mem_request", "max"), mem_limit_max        = ("mem_limit", "max"), mem_usage_p95        = ("mem_usage", lambda x: x.quantile(0.95)),
                mem_rss_p95          = ("mem_rss", lambda x: x.quantile(0.95)), oom_strike_sum       = ("oom_event", "sum"),
                pv_capacity_max      = ("pv_capacity", "max"), pv_used_p95          = ("pv_used", lambda x: x.quantile(0.95))
            ).reset_index().fillna(0)

            # 단위 보정 (Bytes -> GB)
            for col in ["mem_request_max", "mem_limit_max", "mem_usage_p95", "mem_rss_p95", "pv_capacity_max", "pv_used_p95"]:
                df_pod[col] = df_pod[col] / (1024**3)

            # 시간 가중치 손실량 계산
            df_pod["cpu_allocated_core_hours"] = df_pod["cpu_request_max"] * (df_pod["minutes_running"] / 60.0)
            df_pod["cpu_usage_core_hours"]     = df_pod["cpu_usage_p95"] * (df_pod["minutes_running"] / 60.0)
            df_pod["cpu_waste_core_hours"]     = (df_pod["cpu_allocated_core_hours"] - df_pod["cpu_usage_core_hours"]).clip(lower=0)
            df_pod["mem_allocated_gb_hours"]   = df_pod["mem_request_max"] * (df_pod["minutes_running"] / 60.0)
            df_pod["mem_usage_gb_hours"]       = df_pod["mem_usage_p95"] * (df_pod["minutes_running"] / 60.0)
            df_pod["mem_waste_gb_hours"]       = (df_pod["mem_allocated_gb_hours"] - df_pod["mem_usage_gb_hours"]).clip(lower=0)
            df_pod["pv_allocated_gb_hours"]    = df_pod["pv_capacity_max"] * (df_pod["minutes_running"] / 60.0)
            df_pod["pv_usage_gb_hours"]        = df_pod["pv_used_p95"] * (df_pod["minutes_running"] / 60.0)
            df_pod["pv_waste_gb_hours"]        = (df_pod["pv_allocated_gb_hours"] - df_pod["pv_usage_gb_hours"]).clip(lower=0)

            df_pod["is_oom_killed"]     = df_pod["oom_strike_sum"] > 0
            df_pod["has_no_request"]    = (df_pod["cpu_request_max"] == 0) | (df_pod["mem_request_max"] == 0)
            df_pod["has_no_limit"]      = (df_pod["cpu_limit_max"] == 0) | (df_pod["mem_limit_max"] == 0)
            df_pod["cpu_shortage_cores"] = (df_pod["cpu_usage_p95"] - df_pod["cpu_request_max"]).clip(lower=0)

            df_pod["status"] = np.where(df_pod["is_oom_killed"], "💥 OOM장애발생",
                                np.where((df_pod["cpu_shortage_cores"] > 0.5) | (df_pod["cpu_throttled_max"] > 0.2), "⚠️ Request부족", 
                                np.where((df_pod["cpu_waste_core_hours"] > 10) | (df_pod["pv_waste_gb_hours"] > 50), "📉 과다할당", "✅ 최적화완료")))

            # ─── 💾 클러스터 그룹별 개별 물리 디렉토리 생성 및 다이렉트 바인딩 ───
            cluster_partition_dir = MERGED_DIR / cluster_chunk
            cluster_partition_dir.mkdir(parents=True, exist_ok=True)
            
            target_output_file = cluster_partition_dir / f"daily_enriched_{cluster_chunk}_{date_chunk}.parquet"
            df_pod.to_parquet(target_output_file, index=False)
            print(f"     💾 [저장 성료] 인프라 자산 정산 원부 생성됨 ➡️ {target_output_file.name}")

            # Namespace 정산 원부 롤업 백업
            df_daily_ns = df_pod.groupby(["date", "namespace"]).agg(
                cpu_used_ch=("cpu_usage_core_hours", "sum"), cpu_alloc_ch=("cpu_allocated_core_hours", "sum"), cpu_waste_ch=("cpu_waste_core_hours", "sum"),
                mem_used_gh=("mem_usage_gb_hours", "sum"), mem_alloc_gh=("mem_allocated_gb_hours", "sum"),
                pv_used_gh=("pv_usage_gb_hours", "sum"), pv_alloc_gh=("pv_allocated_gb_hours", "sum")
            ).reset_index()
            df_daily_ns["final_usage_score"] = ((df_daily_ns["cpu_alloc_ch"] * W_CPU) + (df_daily_ns["mem_alloc_gh"] * W_MEM) + (df_daily_ns["pv_alloc_gh"] * W_PV)).round(1)
            df_daily_ns.to_parquet(cluster_partition_dir / f"daily_ns_usage_{cluster_chunk}_{date_chunk}.parquet", index=False)

            # 파레토 원부 롤업 백업
            df_ns = df_pod.groupby("namespace").agg(
                total_waste_core_hours=("cpu_waste_core_hours", "sum"), minutes_running_sum=("minutes_running", "sum"),
                container_cnt=("container", "count"), total_allocated_core_hours=("cpu_allocated_core_hours", "sum")
            ).reset_index().sort_values("total_waste_core_hours", ascending=False)
            global_total_waste = df_ns["total_waste_core_hours"].sum() or 0.1
            df_ns["waste_share_pct"] = (df_ns["total_waste_core_hours"] / global_total_waste * 100).round(2)
            df_ns["waste_cumsum_pct"] = df_ns["waste_share_pct"].cumsum().round(2)
            df_ns.to_parquet(cluster_partition_dir / f"pareto_ns_{cluster_chunk}_{date_chunk}.parquet", index=False)

    # ─── 🛡️ [변경사항 반영] 원천 데이터 강제 소거 루프(unlink) 원천 봉쇄 ───
    print("\nℹ️  [안내] 원천 RAW 파일 자동 삭제 세션이 종료되었습니다.")
    print("    - 로컬 디스크 및 AIStor S3 버킷의 RAW 데이터는 사내 스토리지 인프라 ILM 정책에 의해 자동 순환 소거됩니다.")

    print("\n🏁 === [Step2 분할 정복 배치 완료] 날짜 및 클러스터별 파티셔닝 구조가 완벽히 구축되었습니다. ===")

if __name__ == "__main__":
    main()

0개의 댓글