26Y01d3

Young-Kyoo Kim·약 12시간 전
"""
[2단계] 로컬 캐시 스마트 활용 + 데이터 변환 완수 후 RAW 일괄 휘발 정제 엔진 (KeyError 완치판)
"""
import os
import argparse
import boto3
from botocore.exceptions import ClientError
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone
from pathlib import Path

import config
from config import (
    RAW_DIR, MERGED_DIR, CLUSTER_NODE_PATTERNS, 
    get_workload_type, classify_node_infrastructure
)

KST = timezone(timedelta(hours=9))
W_CPU, W_MEM, W_PV = 1.0, 0.11, 0.02

def parse_arguments():
    parser = argparse.ArgumentParser(description="FinOps Advanced Reprocessing Engine")
    parser.add_argument("--days", type=int, default=None)
    parser.add_argument("--start-date", type=str, default=None)
    parser.add_argument("--end-date", type=str, default=None)
    parser.add_argument("--cluster", type=str, default="ALL", choices=["COMPUTE", "STORAGE", "ALL"])
    parser.add_argument("--workload-domain", type=str, default=None)
    return parser.parse_args()

def get_minio_client():
    return boto3.client(
        "s3", endpoint_url=os.getenv("MINIO_ENDPOINT"), aws_access_key_id=os.getenv("MINIO_ACCESS_KEY"),
        aws_secret_access_key=os.getenv("MINIO_SECRET_KEY"), region_name="us-east-1",
        config=boto3.session.Config(signature_version="s3v4")
    )

def filter_target_nodes(df, cluster_opt="ALL"):
    c_opt = cluster_opt.upper()
    if c_opt == "ALL":
        combined_pattern = "|".join(CLUSTER_NODE_PATTERNS.values())
        return df[df["node"].str.contains(combined_pattern, regex=True, na=False, case=False)]
    else:
        target_pattern = CLUSTER_NODE_PATTERNS.get(c_opt, ".*")
        return df[df["node"].str.contains(target_pattern, regex=True, na=False, case=False)]

def sync_down_raw_from_minio(s3_client, bucket_name, start_dt, end_dt):
    chunk_delta = timedelta(hours=1)
    current_start = start_dt
    print(f"\n📡 [AIStor 스마트 캐시 엔진] 원천 데이터 가용성 체크 시작...")

    while current_start < end_dt:
        chunk_str = current_start.strftime("%Y%m%d_%H")
        f_name = f"prom_raw_{chunk_str}.parquet"
        object_key = f"raw/{f_name}"
        local_path = RAW_DIR / f_name
        
        if local_path.exists() and local_path.stat().st_size > 0:
            print(f"   ⚡ [Cache Hit] 로컬 캐시 자산 활용 (다운로드 스킵): {f_name}")
        else:
            try:
                s3_client.head_object(Bucket=bucket_name, Key=object_key)
                print(f"   📥 [Cache Miss] AIStor 백업으로부터 다운로드: {object_key}")
                s3_client.download_file(bucket_name, object_key, str(local_path))
            except ClientError as e:
                if e.response['Error']['Code'] != "404":
                    print(f"   ❌ AIStor 통신 실패 에러 ({f_name}): {str(e)}")
            except Exception as e:
                print(f"   ⚠️ 일반 다운로드 예외 발생 ({f_name}): {str(e)}")
                
        current_start += chunk_delta

def main():
    args = parse_arguments()
    s3_client = get_minio_client()
    bucket_name = os.getenv("MINIO_RAW_BUCKET", "devops-test")
    now_kst = datetime.now(KST)
    
    if args.start_date and args.end_date:
        start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
        end_dt = (datetime.strptime(args.end_date, "%Y-%m-%d") + timedelta(days=1)).replace(tzinfo=KST)
    elif args.days:
        start_dt = (now_kst - timedelta(days=args.days)).replace(hour=0, minute=0, second=0, microsecond=0)
        end_dt = now_kst
    else:
        yesterday = (now_kst - timedelta(days=1)).date()
        start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
        end_dt = datetime.combine(yesterday, datetime.max.time()).replace(tzinfo=KST)

    sync_down_raw_from_minio(s3_client, bucket_name, start_dt, end_dt)

    raw_files = list(RAW_DIR.glob("prom_raw_*.parquet"))
    if not raw_files:
        print("\n🚨 [중단] 정산 가동할 물리 원천 파일이 존재하지 않습니다.")
        return

    print(f"\n📊 [데이터 병합] {len(raw_files)}개 시계열 파티션 로드 중...")
    df_raw = pd.concat([pd.read_parquet(f) for f in raw_files], ignore_index=True)
    
    df_raw["date"] = df_raw["timestamp"].dt.strftime("%Y-%m-%d")
    df_raw = df_raw[(df_raw["timestamp"] >= start_dt.replace(tzinfo=None)) & (df_raw["timestamp"] < end_dt.replace(tzinfo=None))].reset_index(drop=True)
    if df_raw.empty: return

    df_raw = filter_target_nodes(df_raw, cluster_opt=args.cluster)
    if df_raw.empty: return

    infra_meta = df_raw["node"].apply(classify_node_infrastructure)
    df_raw["cluster"]         = [x[0] for x in infra_meta]
    df_raw["workload_domain"] = [x[1] for x in infra_meta]

    if args.workload_domain:
        df_raw = df_raw[df_raw["workload_domain"] == args.workload_domain]

    df_raw["workload_type"] = df_raw["pod"].apply(get_workload_type)

    target_fields = ["cpu_request", "cpu_limit", "cpu_usage", "cpu_throttled", "mem_request", "mem_limit", "mem_usage", "mem_rss", "oom_event", "pv_capacity", "pv_used"]
    for col in target_fields:
        if col not in df_raw.columns: df_raw[col] = 0.0

    df_pod = df_raw.groupby(["date", "cluster", "workload_domain", "namespace", "workload_type", "node", "pod", "container"]).agg(
        minutes_running      = ("timestamp", "size"), cpu_request_max      = ("cpu_request", "max"), cpu_limit_max        = ("cpu_limit", "max"),
        cpu_usage_p95        = ("cpu_usage", lambda x: x.quantile(0.95)), cpu_throttled_max    = ("cpu_throttled", "max"),
        mem_request_max      = ("mem_request", "max"), mem_limit_max        = ("mem_limit", "max"), mem_usage_p95        = ("mem_usage", lambda x: x.quantile(0.95)),
        mem_rss_p95          = ("mem_rss", lambda x: x.quantile(0.95)), oom_strike_sum       = ("oom_event", "sum"),
        pv_capacity_max      = ("pv_capacity", "max"), pv_used_p95          = ("pv_used", lambda x: x.quantile(0.95))
    ).reset_index().fillna(0)

    for col in ["mem_request_max", "mem_limit_max", "mem_usage_p95", "mem_rss_p95", "pv_capacity_max", "pv_used_p95"]:
        df_pod[col] = df_pod[col] / (1024**3)

    df_pod["cpu_allocated_core_hours"] = df_pod["cpu_request_max"] * (df_pod["minutes_running"] / 60.0)
    df_pod["cpu_usage_core_hours"]     = df_pod["cpu_usage_p95"] * (df_pod["minutes_running"] / 60.0)
    df_pod["cpu_waste_core_hours"]     = (df_pod["cpu_allocated_core_hours"] - df_pod["cpu_usage_core_hours"]).clip(lower=0)
    df_pod["mem_allocated_gb_hours"]   = df_pod["mem_request_max"] * (df_pod["minutes_running"] / 60.0)
    df_pod["mem_usage_gb_hours"]       = df_pod["mem_usage_p95"] * (df_pod["minutes_running"] / 60.0)
    df_pod["mem_waste_gb_hours"]       = (df_pod["mem_allocated_gb_hours"] - df_pod["mem_usage_gb_hours"]).clip(lower=0)
    df_pod["pv_allocated_gb_hours"]    = df_pod["pv_capacity_max"] * (df_pod["minutes_running"] / 60.0)
    df_pod["pv_usage_gb_hours"]        = df_pod["pv_used_p95"] * (df_pod["minutes_running"] / 60.0)
    df_pod["pv_waste_gb_hours"]        = (df_pod["pv_allocated_gb_hours"] - df_pod["pv_usage_gb_hours"]).clip(lower=0)

    # ─── 💡 [완치 레이어] step6 엑셀 요약 및 장애 탐지용 특화 플래그 컬럼 전격 복구 ───
    df_pod["is_oom_killed"]       = df_pod["oom_strike_sum"] > 0
    df_pod["has_no_request"]      = (df_pod["cpu_request_max"] == 0) | (df_pod["mem_request_max"] == 0)
    df_pod["has_no_limit"]        = (df_pod["cpu_limit_max"] == 0) | (df_pod["mem_limit_max"] == 0)
    df_pod["cpu_shortage_cores"]   = (df_pod["cpu_usage_p95"] - df_pod["cpu_request_max"]).clip(lower=0)

    df_pod["status"] = np.where(df_pod["is_oom_killed"], "💥 OOM장애발생",
                        np.where((df_pod["cpu_shortage_cores"] > 0.5) | (df_pod["cpu_throttled_max"] > 0.2), "⚠️ Request부족", 
                        np.where((df_pod["cpu_waste_core_hours"] > 10) | (df_pod["pv_waste_gb_hours"] > 50), "📉 과다할당", "✅ 최적화완료")))

    df_pod.to_parquet(MERGED_DIR / "enriched_fixed_7d.parquet", index=False)

    # Namespace 일단위 롤업
    df_daily_ns = df_pod.groupby(["date", "namespace"]).agg(
        cpu_used_ch=("cpu_usage_core_hours", "sum"), cpu_alloc_ch=("cpu_allocated_core_hours", "sum"), cpu_waste_ch=("cpu_waste_core_hours", "sum"),
        mem_used_gh=("mem_usage_gb_hours", "sum"), mem_alloc_gh=("mem_allocated_gb_hours", "sum"),
        pv_used_gh=("pv_usage_gb_hours", "sum"), pv_alloc_gh=("pv_allocated_gb_hours", "sum")
    ).reset_index()
    df_daily_ns["final_usage_score"] = ((df_daily_ns["cpu_alloc_ch"] * W_CPU) + (df_daily_ns["mem_alloc_gh"] * W_MEM) + (df_daily_ns["pv_alloc_gh"] * W_PV)).round(1)
    df_daily_ns.to_parquet(MERGED_DIR / "daily_ns_usage.parquet", index=False)

    # 파레토 롤업
    df_ns = df_pod.groupby("namespace").agg(
        total_waste_core_hours=("cpu_waste_core_hours", "sum"), minutes_running_sum=("minutes_running", "sum"),
        container_cnt=("container", "count"), total_allocated_core_hours=("cpu_allocated_core_hours", "sum")
    ).reset_index().sort_values("total_waste_core_hours", ascending=False)
    global_total_waste = df_ns["total_waste_core_hours"].sum() or 0.1
    df_ns["waste_share_pct"] = (df_ns["total_waste_core_hours"] / global_total_waste * 100).round(2)
    df_ns["waste_cumsum_pct"] = df_ns["waste_share_pct"].cumsum().round(2)
    df_ns.to_parquet(MERGED_DIR / "pareto_fixed_ns.parquet", index=False)

    date_label = start_dt.strftime("%Y%m%d") + "_to_" + (end_dt - timedelta(days=1)).strftime("%Y%m%d")
    target_info = args.workload_domain if args.workload_domain else args.cluster
    with open(MERGED_DIR / "meta_run_info.txt", "w") as mf:
        mf.write(f"{target_info}@{date_label}")

    print(f"💾 요약 분석 가공 원부 배치 마감 완료. (위계: {target_info})")

    # 데이터Reprocessing이 완수되었으므로 RAW 캐시 소거
    print("\n✂️  [작업 공간 최적화] 다음 단계(차트/엑셀)를 위해 로컬 RAW 파일을 일괄 청소합니다.")
    for f in raw_files:
        try:
            f.unlink()
        except Exception:
            pass

    print("\n🏁 === [Step2 배포 마감] 로컬 무상태 디스크 무결성이 완벽히 복구되었습니다. ===")

if __name__ == "__main__":
    main()

0개의 댓글