26J30m2

Young-Kyoo Kim·3일 전
"""
[2단계] 동적 인프라 정규식 필터링 및 카디널리티 압축 엔진
실행 (전체): python step2_pipeline.py
실행 (연산전용 포맷만): python step2_pipeline.py --cluster-type COMPUTE
실행 (스토리지포맷만): python step2_pipeline.py --cluster-type STORAGE
"""

import os
import re
import argparse
import boto3
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone
from pathlib import Path

# config.py 로드
from config import (
    RAW_DIR, MERGED_DIR, MINIO_RAW_BUCKET, 
    CLUSTER_NODE_PATTERNS, get_workload_type, classify_cluster_infrastructure
)

KST = timezone(timedelta(hours=9))

def parse_arguments():
    parser = argparse.ArgumentParser(description="FinOps Dynamic Gatekeeper Engine")
    parser.add_argument("--days", type=int, default=None)
    parser.add_argument("--start-date", type=str, default=None)
    parser.add_argument("--end-date", type=str, default=None)
    # 옵션 선택 레이어 (안하면 ALL이 기본값)
    parser.add_argument("--cluster-type", type=str, default="ALL", choices=["COMPUTE", "STORAGE", "ALL"])
    parser.add_argument("--cluster", type=str, default=None)
    return parser.parse_args()

def get_minio_client():
    return boto3.client(
        "s3", endpoint_url=os.getenv("MINIO_ENDPOINT"),
        aws_access_key_id=os.getenv("MINIO_ACCESS_KEY"),
        aws_secret_access_key=os.getenv("MINIO_SECRET_KEY"),
        config=boto3.session.Config(signature_version="s3v4")
    )

def filter_target_nodes(df, cluster_type="ALL"):
    """
    🌐 [동적 노드이름 포맷 필터] 
    선택한 클러스터 유형에 매핑된 config.py의 정규식 규격만 메모리 입구에서 통과시킵니다.
    """
    c_type = cluster_type.upper()
    initial_rows = len(df)
    
    print(f"\n⚙️  [인프라 필터 가동] 선택 옵션: [{c_type} CLUSTER]")
    
    # ─── 💡 [핵심 구현 분기] 옵션에 따른 정규식 동적 매핑 ───
    if c_type == "ALL":
        # COMPUTE와 STORAGE 정규식을 파이프(|) 기호로 결합하여 사내 전체 노드 포맷 수용
        combined_pattern = "|".join(CLUSTER_NODE_PATTERNS.values())
        print(f"🔍 전사 클러스터 통합 정규식 스캔 기동: '{combined_pattern}'")
        df_filtered = df[df["node"].str.contains(combined_pattern, regex=True, na=False, case=False)].reset_index(drop=True)
    else:
        # 콕 집어 선택한 클러스터 포맷만 저격 필터링
        target_pattern = CLUSTER_NODE_PATTERNS.get(c_type, ".*")
        print(f"🎯 [{c_type}] 전용 노드이름 형식 정밀 스캔 기동: '{target_pattern}'")
        df_filtered = df[df["node"].str.contains(target_pattern, regex=True, na=False, case=False)].reset_index(drop=True)

    dropped_rows = initial_rows - len(df_filtered)
    print(f"✂️  [필터 스캔 마감] 원천 {initial_rows:,}행 중 타겟 외 {dropped_rows:,}행 노이즈 즉시 사살 ➡️ {len(df_filtered):,}행 생존\n")
    return df_filtered

def sync_down_raw_from_minio(s3_client, start_dt, end_dt):
    chunk_delta = timedelta(hours=6)
    current_start = start_dt
    print(f"🪣 [MinIO 레이크 리싱크] {start_dt.strftime('%m-%d')} ~ {end_dt.strftime('%m-%d')} 파일 스캔...")

    while current_start < end_dt:
        chunk_str = current_start.strftime("%Y%m%d_%H")
        for f_name in [f"prom_raw_{chunk_str}.parquet", f"prom_raw_{chunk_str}_active.parquet"]:
            local_path = RAW_DIR / f_name
            try:
                s3_client.head_object(Bucket=MINIO_RAW_BUCKET, Key=f"raw/{f_name}")
                if not (local_path.exists() and local_path.stat().st_size > 0 and "_active" not in f_name):
                    s3_client.download_file(MINIO_RAW_BUCKET, f"raw/{f_name}", str(local_path))
            except Exception:
                pass
        current_start += chunk_delta

def main():
    args = parse_arguments()
    s3_client = get_minio_client()
    now_kst = datetime.now(KST)
    
    if args.start_date and args.end_date:
        start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
        end_dt = (datetime.strptime(args.end_date, "%Y-%m-%d") + timedelta(days=1)).replace(tzinfo=KST)
        date_label = f"{args.start_date}_to_{args.end_date}"
    elif args.days:
        start_dt = (now_kst - timedelta(days=args.days)).replace(hour=0, minute=0, second=0, microsecond=0)
        end_dt = now_kst
        date_label = f"recent_{args.days}days"
    else:
        yesterday = (now_kst - timedelta(days=1)).date()
        start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
        end_dt = datetime.combine(yesterday, datetime.max.time()).replace(tzinfo=KST)
        date_label = yesterday.strftime("%Y-%m-%d")

    sync_down_raw_from_minio(s3_client, start_dt, end_dt)

    raw_files = list(RAW_DIR.glob("prom_raw_*.parquet"))
    if not raw_files:
        print("⚠️ 처리할 원천 Parquet 파일이 레이크하우스에 없습니다.")
        return

    df_raw = pd.concat([pd.read_parquet(f) for f in raw_files], ignore_index=True)
    df_raw["date"] = df_raw["timestamp"].dt.strftime("%Y-%m-%d")
    
    # 타임스탬프 윈도우 락 클리핑
    df_raw = df_raw[(df_raw["timestamp"] >= start_dt.replace(tzinfo=None)) & (df_raw["timestamp"] < end_dt.replace(tzinfo=None))].reset_index(drop=True)

    # ─── 🛡️ [고도화 반영] 선택 조건에 따른 노드 이름 형식 메모리 입구 정밀 필터링 ───
    df_raw = filter_target_nodes(df_raw, cluster_type=args.cluster_type)
    if df_raw.empty:
        print("⚠️ 필터를 통과한 노드가 존재하지 않아 파이프라인을 조기 마감합니다.")
        return

    # 후속 물리 분류 매핑
    cluster_meta = df_raw["node"].apply(classify_cluster_infrastructure)
    df_raw["cluster"] = [x[0] for x in cluster_meta]
    df_raw["cluster_type"] = [x[1] for x in cluster_meta]

    # 세부 타겟팅 파라미터 락
    if args.cluster:
        df_raw = df_raw[df_raw["cluster"] == args.cluster].reset_index(drop=True)

    df_raw["workload_type"] = df_raw["pod"].apply(get_workload_type)

    # 카디널리티 마스킹 압축화
    df_raw["pod"] = np.where(df_raw["workload_type"] == "SPARK_EXECUTOR", "spark-executor-pool", df_raw["pod"])
    df_raw["pod"] = np.where(df_raw["workload_type"] == "AIRFLOW_WORKER", "airflow-worker-pool", df_raw["pod"])
    df_raw["container"] = np.where(df_raw["workload_type"] == "SPARK_EXECUTOR", "executor", df_raw["container"])
    df_raw["container"] = np.where(df_raw["workload_type"] == "AIRFLOW_WORKER", "worker", df_raw["container"])

    for col in ["cpu_limit", "mem_limit", "oom_event"]:
        if col not in df_raw.columns: df_raw[col] = 0.0

    print("📊 고정밀 시계열 기반 팟 단위 리밸런싱 집계 가동...")
    df_pod = df_raw.groupby(["date", "cluster", "cluster_type", "namespace", "workload_type", "node", "pod", "container"]).agg(
        minutes_running      = ("timestamp", "size"),
        cpu_request_max      = ("cpu_request", "max"),
        cpu_limit_max        = ("cpu_limit", "max"),
        cpu_usage_p95        = ("cpu_usage", lambda x: x.quantile(0.95)),
        mem_request_max      = ("mem_request", "max"),
        mem_limit_max        = ("mem_limit", "max"),
        mem_usage_p95        = ("mem_usage", lambda x: x.quantile(0.95)),
        oom_strike_sum       = ("oom_event", "sum")
    ).reset_index().fillna(0)

    for col in ["mem_request_max", "mem_limit_max", "mem_usage_p95"]:
        df_pod[col] = df_pod[col] / (1024**3)

    df_pod["cpu_allocated_core_hours"] = df_pod["cpu_request_max"] * (df_pod["minutes_running"] / 60.0)
    df_pod["cpu_usage_core_hours"]     = df_pod["cpu_usage_p95"] * (df_pod["minutes_running"] / 60.0)
    df_pod["cpu_waste_core_hours"]     = (df_pod["cpu_allocated_core_hours"] - df_pod["cpu_usage_core_hours"]).clip(lower=0)

    df_pod["mem_allocated_gb_hours"]   = df_pod["mem_request_max"] * (df_pod["minutes_running"] / 60.0)
    df_pod["mem_usage_gb_hours"]       = df_pod["mem_usage_p95"] * (df_pod["minutes_running"] / 60.0)
    df_pod["mem_waste_gb_hours"]       = (df_pod["mem_allocated_gb_hours"] - df_pod["mem_usage_gb_hours"]).clip(lower=0)

    df_pod["is_oom_killed"] = df_pod["oom_strike_sum"] > 0
    df_pod["has_no_request"] = (df_pod["cpu_request_max"] == 0) | (df_pod["mem_request_max"] == 0)
    df_pod["has_no_limit"] = (df_pod["cpu_limit_max"] == 0) | (df_pod["mem_limit_max"] == 0)

    df_pod["cpu_shortage_cores"] = (df_pod["cpu_usage_p95"] - df_pod["cpu_request_max"]).clip(lower=0)
    df_pod["status"] = np.where(df_pod["is_oom_killed"], "💥 OOM장애발생",
                        np.where(df_pod["cpu_shortage_cores"] > 0.5, "⚠️ Request부족", 
                        np.where(df_pod["cpu_waste_core_hours"] > 10, "📉 과다할당", "✅ 최적화완료")))

    df_pod.to_parquet(MERGED_DIR / "enriched_fixed_7d.parquet", index=False)

    df_ns = df_pod.groupby("namespace").agg(
        minutes_running_sum = ("minutes_running", "sum"),
        container_cnt       = ("container", "count"),
        total_allocated_core_hours = ("cpu_allocated_core_hours", "sum"),
        total_waste_core_hours     = ("cpu_waste_core_hours", "sum")
    ).reset_index().sort_values(by="total_waste_core_hours", ascending=False).reset_index(drop=True)

    global_total_waste = df_ns["total_waste_core_hours"].sum() if df_ns["total_waste_core_hours"].sum() > 0 else 0.1
    df_ns["waste_share_pct"] = (df_ns["total_waste_core_hours"] / global_total_waste * 100).round(2)
    df_ns["waste_cumsum_pct"] = df_ns["waste_share_pct"].cumsum().round(2)
    
    df_ns.to_parquet(MERGED_DIR / "pareto_fixed_ns.parquet", index=False)
    
    with open(MERGED_DIR / "meta_run_info.txt", "w") as mf:
        target_info = args.cluster if args.cluster else args.cluster_type
        mf.write(f"{target_info}@{date_label}")

    print(f"💾 [{target_info}] 전용 포맷 기준 중간 정산 데이터 마감 완료.\n")

if __name__ == "__main__":
    main()

0개의 댓글