26Y01b1

Young-Kyoo Kim·약 14시간 전
"""
[2단계] 노드 정규식 필터 + 카디널리티 압축 + 11대 메트릭 집계 + [신규] 일단위 Namespace 통합 사용량 정산 엔진
실행 (기본 일배치): python step2_pipeline.py
실행 (임시 요구사항): python step2_pipeline.py --cluster-type COMPUTE --start-date 2026-06-01 --end-date 2026-06-07
"""

import os
import re
import argparse
import boto3
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone
from pathlib import Path

# config.py 설정 원부로부터 매트릭 구조 로드
from config import (
    RAW_DIR, MERGED_DIR, MINIO_RAW_BUCKET, 
    CLUSTER_NODE_PATTERNS, get_workload_type, classify_cluster_infrastructure
)

KST = timezone(timedelta(hours=9))

# ─── ⚙️ [가중치 설정 원부] 사내 비용 배분 및 청산 정책에 따라 수정 가능 ───
W_CPU = 1.0     # 1 Core-Hour = 1.0 점
W_MEM = 0.11    # 1 GB-Hour   = 0.11 점
W_PV  = 0.02    # 1 GB-Hour   = 0.02 점

def parse_arguments():
    parser = argparse.ArgumentParser(description="FinOps Advanced Reprocessing Engine")
    parser.add_argument("--days", type=int, default=None)
    parser.add_argument("--start-date", type=str, default=None)
    parser.add_argument("--end-date", type=str, default=None)
    parser.add_argument("--cluster-type", type=str, default="ALL", choices=["COMPUTE", "STORAGE", "ALL"])
    parser.add_argument("--cluster", type=str, default=None)
    return parser.parse_args()

def get_minio_client():
    return boto3.client(
        "s3", endpoint_url=os.getenv("MINIO_ENDPOINT"),
        aws_access_key_id=os.getenv("MINIO_ACCESS_KEY"),
        aws_secret_access_key=os.getenv("MINIO_SECRET_KEY"),
        config=boto3.session.Config(signature_version="s3v4")
    )

def filter_target_nodes(df, cluster_type="ALL"):
    c_type = cluster_type.upper()
    initial_rows = len(df)
    print(f"\n⚙️  [인프라 필터 가동] 선택 옵션: [{c_type} CLUSTER]")
    
    if c_type == "ALL":
        combined_pattern = "|".join(CLUSTER_NODE_PATTERNS.values())
        print(f"🔍 전사 클러스터 통합 정규식 스캔 기동: '{combined_pattern}'")
        df_filtered = df[df["node"].str.contains(combined_pattern, regex=True, na=False, case=False)].reset_index(drop=True)
    else:
        target_pattern = CLUSTER_NODE_PATTERNS.get(c_type, ".*")
        print(f"🎯 [{c_type}] 전용 노드이름 형식 정밀 스캔 기동: '{target_pattern}'")
        df_filtered = df[df["node"].str.contains(target_pattern, regex=True, na=False, case=False)].reset_index(drop=True)

    dropped_rows = initial_rows - len(df_filtered)
    print(f"✂️  [필터 스캔 마감] 원천 {initial_rows:,}행 중 타겟 외 {dropped_rows:,}행 제거 ➡️ {len(df_filtered):,}행 생존")
    return df_filtered

def sync_down_raw_from_minio(s3_client, start_dt, end_dt):
    chunk_delta = timedelta(hours=6)
    current_start = start_dt
    print(f"🪣 [MinIO 레이크 리싱크] {start_dt.strftime('%m-%d')} ~ {end_dt.strftime('%m-%d')} 파티션 다운로드 스캔...")

    while current_start < end_dt:
        chunk_str = current_start.strftime("%Y%m%d_%H")
        for f_name in [f"prom_raw_{chunk_str}.parquet", f"prom_raw_{chunk_str}_active.parquet"]:
            local_path = RAW_DIR / f_name
            try:
                s3_client.head_object(Bucket=MINIO_RAW_BUCKET, Key=f"raw/{f_name}")
                if not (local_path.exists() and local_path.stat().st_size > 0 and "_active" not in f_name):
                    s3_client.download_file(MINIO_RAW_BUCKET, f"raw/{f_name}", str(local_path))
            except Exception:
                pass
        current_start += chunk_delta

def main():
    args = parse_arguments()
    s3_client = get_minio_client()
    now_kst = datetime.now(KST)
    
    if args.start_date and args.end_date:
        start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
        end_dt = (datetime.strptime(args.end_date, "%Y-%m-%d") + timedelta(days=1)).replace(tzinfo=KST)
        date_label = f"{args.start_date}_to_{args.end_date}"
    elif args.days:
        start_dt = (now_kst - timedelta(days=args.days)).replace(hour=0, minute=0, second=0, microsecond=0)
        end_dt = now_kst
        date_label = f"recent_{args.days}days"
    else:
        yesterday = (now_kst - timedelta(days=1)).date()
        start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
        end_dt = datetime.combine(yesterday, datetime.max.time()).replace(tzinfo=KST)
        date_label = yesterday.strftime("%Y-%m-%d")

    sync_down_raw_from_minio(s3_client, start_dt, end_dt)

    raw_files = list(RAW_DIR.glob("prom_raw_*.parquet"))
    if not raw_files:
        print("⚠️ 처리할 원천 Parquet 파일이 레이크하우스에 없습니다.")
        return

    df_raw = pd.concat([pd.read_parquet(f) for f in raw_files], ignore_index=True)
    df_raw["date"] = df_raw["timestamp"].dt.strftime("%Y-%m-%d")
    df_raw = df_raw[(df_raw["timestamp"] >= start_dt.replace(tzinfo=None)) & (df_raw["timestamp"] < end_dt.replace(tzinfo=None))].reset_index(drop=True)

    df_raw = filter_target_nodes(df_raw, cluster_type=args.cluster_type)
    if df_raw.empty:
        print("⚠️ 필터를 통과한 노드가 존재하지 않아 파이프라인을 조기 마감합니다.")
        return

    cluster_meta = df_raw["node"].apply(classify_cluster_infrastructure)
    df_raw["cluster"] = [x[0] for x in cluster_meta]
    df_raw["cluster_type"] = [x[1] for x in cluster_meta]

    if args.cluster:
        df_raw = df_raw[df_raw["cluster"] == args.cluster].reset_index(drop=True)

    df_raw["workload_type"] = df_raw["pod"].apply(get_workload_type)

    # 휘발성 배치 팟 카디널리티 마스킹 압축화
    df_raw["pod"] = np.where(df_raw["workload_type"] == "SPARK_EXECUTOR", "spark-executor-pool", df_raw["pod"])
    df_raw["pod"] = np.where(df_raw["workload_type"] == "AIRFLOW_WORKER", "airflow-worker-pool", df_raw["pod"])
    df_raw["container"] = np.where(df_raw["workload_type"] == "SPARK_EXECUTOR", "executor", df_raw["container"])
    df_raw["container"] = np.where(df_raw["workload_type"] == "AIRFLOW_WORKER", "worker", df_raw["container"])

    target_fields = [
        "cpu_request", "cpu_limit", "cpu_usage", "cpu_throttled", 
        "mem_request", "mem_limit", "mem_usage", "mem_rss", 
        "oom_event", "pv_capacity", "pv_used"
    ]
    for col in target_fields:
        if col not in df_raw.columns:
            df_raw[col] = 0.0

    print("📊 고정밀 시계열 기반 팟 단위 다차원 FinOps 집계 연산 가동...")
    df_pod = df_raw.groupby(["date", "cluster", "cluster_type", "namespace", "workload_type", "node", "pod", "container"]).agg(
        minutes_running      = ("timestamp", "size"),
        cpu_request_max      = ("cpu_request", "max"),
        cpu_limit_max        = ("cpu_limit", "max"),
        cpu_usage_p95        = ("cpu_usage", lambda x: x.quantile(0.95)),
        cpu_throttled_max    = ("cpu_throttled", "max"),
        mem_request_max      = ("mem_request", "max"),
        mem_limit_max        = ("mem_limit", "max"),
        mem_usage_p95        = ("mem_usage", lambda x: x.quantile(0.95)),
        mem_rss_p95          = ("mem_rss", lambda x: x.quantile(0.95)),
        oom_strike_sum       = ("oom_event", "sum"),
        pv_capacity_max      = ("pv_capacity", "max"),
        pv_used_p95          = ("pv_used", lambda x: x.quantile(0.95))
    ).reset_index().fillna(0)

    # 단위 정규화 (Bytes -> GB)
    byte_to_gb_cols = ["mem_request_max", "mem_limit_max", "mem_usage_p95", "mem_rss_p95", "pv_capacity_max", "pv_used_p95"]
    for col in byte_to_gb_cols:
        df_pod[col] = df_pod[col] / (1024**3)

    # 시간 가중치 계산 레이어 (Core-Hours & GB-Hours)
    df_pod["cpu_allocated_core_hours"] = df_pod["cpu_request_max"] * (df_pod["minutes_running"] / 60.0)
    df_pod["cpu_usage_core_hours"]     = df_pod["cpu_usage_p95"] * (df_pod["minutes_running"] / 60.0)
    df_pod["cpu_waste_core_hours"]     = (df_pod["cpu_allocated_core_hours"] - df_pod["cpu_usage_core_hours"]).clip(lower=0)

    df_pod["mem_allocated_gb_hours"]   = df_pod["mem_request_max"] * (df_pod["minutes_running"] / 60.0)
    df_pod["mem_usage_gb_hours"]       = df_pod["mem_usage_p95"] * (df_pod["minutes_running"] / 60.0)
    df_pod["mem_waste_gb_hours"]       = (df_pod["mem_allocated_gb_hours"] - df_pod["mem_usage_gb_hours"]).clip(lower=0)

    df_pod["pv_allocated_gb_hours"]    = df_pod["pv_capacity_max"] * (df_pod["minutes_running"] / 60.0)
    df_pod["pv_usage_gb_hours"]        = df_pod["pv_used_p95"] * (df_pod["minutes_running"] / 60.0)
    df_pod["pv_waste_gb_hours"]        = (df_pod["pv_allocated_gb_hours"] - df_pod["pv_usage_gb_hours"]).clip(lower=0)

    df_pod["is_oom_killed"] = df_pod["oom_strike_sum"] > 0
    df_pod["has_no_request"] = (df_pod["cpu_request_max"] == 0) | (df_pod["mem_request_max"] == 0)
    df_pod["has_no_limit"] = (df_pod["cpu_limit_max"] == 0) | (df_pod["mem_limit_max"] == 0)
    df_pod["cpu_shortage_cores"] = (df_pod["cpu_usage_p95"] - df_pod["cpu_request_max"]).clip(lower=0)

    # 거버넌스 상태 매핑 개정
    df_pod["status"] = np.where(df_pod["is_oom_killed"], "💥 OOM장애발생",
                        np.where((df_pod["cpu_shortage_cores"] > 0.5) | (df_pod["cpu_throttled_max"] > 0.2), "⚠️ Request부족", 
                        np.where((df_pod["cpu_waste_core_hours"] > 10) | (df_pod["pv_waste_gb_hours"] > 50), "📉 과다할당", "✅ 최적화완료")))

    df_pod.to_parquet(MERGED_DIR / "enriched_fixed_7d.parquet", index=False)

    # ─── 🧮 [신규 반영] 일단위 Namespace 자원 청산 지표 및 통합 사용량 스코어 계산 ───
    print("✨ [FinOps 마스터] 표준 가중합 모델 기반 일단위 Namespace 자원 소모 점수 정산 중...")
    df_daily_ns = df_pod.groupby(["date", "namespace"]).agg(
        cpu_used_ch  = ("cpu_usage_core_hours", "sum"),
        cpu_alloc_ch = ("cpu_allocated_core_hours", "sum"),
        cpu_waste_ch = ("cpu_waste_core_hours", "sum"),
        mem_used_gh  = ("mem_usage_gb_hours", "sum"),
        mem_alloc_gh = ("mem_allocated_gb_hours", "sum"),
        pv_used_gh   = ("pv_usage_gb_hours", "sum"),
        pv_alloc_gh  = ("pv_allocated_gb_hours", "sum")
    ).reset_index()

    # 전사 표준 가중치 스코어 공식 융합
    df_daily_ns["final_usage_score"] = (
        (df_daily_ns["cpu_alloc_ch"] * W_CPU) +
        (df_daily_ns["mem_alloc_gh"] * W_MEM) +
        (df_daily_ns["pv_alloc_gh"]  * W_PV)
    ).round(1)

    df_daily_ns.to_parquet(MERGED_DIR / "daily_ns_usage.parquet", index=False)

    # 네임스페이스 파레토 요약 세이브
    df_ns = df_pod.groupby("namespace").agg(
        minutes_running_sum = ("minutes_running", "sum"),
        container_cnt       = ("container", "count"),
        total_allocated_core_hours = ("cpu_allocated_core_hours", "sum"),
        total_waste_core_hours     = ("cpu_waste_core_hours", "sum")
    ).reset_index().sort_values(by="total_waste_core_hours", ascending=False).reset_index(drop=True)

    global_total_waste = df_ns["total_waste_core_hours"].sum() if df_ns["total_waste_core_hours"].sum() > 0 else 0.1
    df_ns["waste_share_pct"] = (df_ns["total_waste_core_hours"] / global_total_waste * 100).round(2)
    df_ns["waste_cumsum_pct"] = df_ns["waste_share_pct"].cumsum().round(2)
    df_ns.to_parquet(MERGED_DIR / "pareto_fixed_ns.parquet", index=False)
    
    with open(MERGED_DIR / "meta_run_info.txt", "w") as mf:
        target_info = args.cluster if args.cluster else args.cluster_type
        mf.write(f"{target_info}@{date_label}")

    print(f"💾 [{target_info}] 전용 포맷 기준 일단위 테넌트 정산 Parquet 아카이빙 완수.\n")

if __name__ == "__main__":
    main()

0개의 댓글