26Y01d2

Young-Kyoo Kim·약 13시간 전
"""
[2단계] 로컬 캐시 스마트 활용 + 데이터 변환 완수 후 RAW 일괄 휘발 정제 엔진
"""
import os
import argparse
import boto3
from botocore.exceptions import ClientError
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone
from pathlib import Path

import config
from config import (
    RAW_DIR, MERGED_DIR, CLUSTER_NODE_PATTERNS, 
    get_workload_type, classify_node_infrastructure
)

KST = timezone(timedelta(hours=9))
W_CPU, W_MEM, W_PV = 1.0, 0.11, 0.02

def parse_arguments():
    parser = argparse.ArgumentParser(description="FinOps Advanced Reprocessing Engine")
    parser.add_argument("--days", type=int, default=None)
    parser.add_argument("--start-date", type=str, default=None)
    parser.add_argument("--end-date", type=str, default=None)
    parser.add_argument("--cluster", type=str, default="ALL", choices=["COMPUTE", "STORAGE", "ALL"])
    parser.add_argument("--workload-domain", type=str, default=None)
    return parser.parse_args()

def get_minio_client():
    return boto3.client(
        "s3", endpoint_url=os.getenv("MINIO_ENDPOINT"), aws_access_key_id=os.getenv("MINIO_ACCESS_KEY"),
        aws_secret_access_key=os.getenv("MINIO_SECRET_KEY"), region_name="us-east-1",
        config=boto3.session.Config(signature_version="s3v4")
    )

def filter_target_nodes(df, cluster_opt="ALL"):
    c_opt = cluster_opt.upper()
    if c_opt == "ALL":
        combined_pattern = "|".join(CLUSTER_NODE_PATTERNS.values())
        return df[df["node"].str.contains(combined_pattern, regex=True, na=False, case=False)]
    else:
        target_pattern = CLUSTER_NODE_PATTERNS.get(c_opt, ".*")
        return df[df["node"].str.contains(target_pattern, regex=True, na=False, case=False)]

def sync_down_raw_from_minio(s3_client, bucket_name, start_dt, end_dt):
    """
    💡 [스마트 캐시 레이어] 로컬에 이미 파일이 있다면 AIStor 원격 다운로드를 생략합니다.
    """
    chunk_delta = timedelta(hours=1)
    current_start = start_dt
    print(f"\n📡 [AIStor 스마트 캐시 엔진] 원천 데이터 가용성 체크 시작...")

    while current_start < end_dt:
        chunk_str = current_start.strftime("%Y%m%d_%H")
        f_name = f"prom_raw_{chunk_str}.parquet"
        object_key = f"raw/{f_name}"
        local_path = RAW_DIR / f_name
        
        if local_path.exists() and local_path.stat().st_size > 0:
            # 🎯 [Cache Hit] step1이 이미 생성해 둔 경우 트래픽 유실 차단
            print(f"   ⚡ [Cache Hit] 로컬 캐시 자산 활용 (다운로드 스킵): {f_name}")
        else:
            # 🎯 [Cache Miss] 로컬에 없을 때만 원격 AIStor에서 다운로드 수행
            try:
                s3_client.head_object(Bucket=bucket_name, Key=object_key)
                print(f"   📥 [Cache Miss] AIStor 백업으로부터 다운로드: {object_key}")
                s3_client.download_file(bucket_name, object_key, str(local_path))
            except ClientError as e:
                if e.response['Error']['Code'] != "404":
                    print(f"   ❌ AIStor 통신 실패 에러 ({f_name}): {str(e)}")
            except Exception as e:
                print(f"   ⚠️ 일반 다운로드 예외 발생 ({f_name}): {str(e)}")
                
        current_start += chunk_delta

def main():
    args = parse_arguments()
    s3_client = get_minio_client()
    bucket_name = os.getenv("MINIO_RAW_BUCKET", "devops-test")
    now_kst = datetime.now(KST)
    
    if args.start_date and args.end_date:
        start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
        end_dt = (datetime.strptime(args.end_date, "%Y-%m-%d") + timedelta(days=1)).replace(tzinfo=KST)
    elif args.days:
        start_dt = (now_kst - timedelta(days=args.days)).replace(hour=0, minute=0, second=0, microsecond=0)
        end_dt = now_kst
    else:
        yesterday = (now_kst - timedelta(days=1)).date()
        start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
        end_dt = datetime.combine(yesterday, datetime.max.time()).replace(tzinfo=KST)

    # 1. 원천 데이터 체크 및 다운로드 동기화 기동
    sync_down_raw_from_minio(s3_client, bucket_name, start_dt, end_dt)

    # 2. 이번 배치 연산 대상 로컬 파일 목록 확보
    raw_files = list(RAW_DIR.glob("prom_raw_*.parquet"))
    if not raw_files:
        print("\n🚨 [중단] 정산 가동할 물리 원천 파일이 존재하지 않습니다.")
        return

    print(f"\n📊 [데이터 병합] {len(raw_files)}개 시계열 파티션 로드 중...")
    df_raw = pd.concat([pd.read_parquet(f) for f in raw_files], ignore_index=True)
    
    df_raw["date"] = df_raw["timestamp"].dt.strftime("%Y-%m-%d")
    df_raw = df_raw[(df_raw["timestamp"] >= start_dt.replace(tzinfo=None)) & (df_raw["timestamp"] < end_dt.replace(tzinfo=None))].reset_index(drop=True)
    if df_raw.empty: return

    # 노드 규칙 필터링
    df_raw = filter_target_nodes(df_raw, cluster_opt=args.cluster)
    if df_raw.empty: return

    # 위계 매핑
    infra_meta = df_raw["node"].apply(classify_node_infrastructure)
    df_raw["cluster"]         = [x[0] for x in infra_meta]
    df_raw["workload_domain"] = [x[1] for x in infra_meta]

    if args.workload_domain:
        df_raw = df_raw[df_raw["workload_domain"] == args.workload_domain]

    df_raw["workload_type"] = df_raw["pod"].apply(get_workload_type)

    # 카디널리티 마스킹
    df_raw["pod"] = np.where(df_raw["workload_type"] == "SPARK_EXECUTOR", "spark-executor-pool", df_raw["pod"])
    df_raw["pod"] = np.where(df_raw["workload_type"] == "AIRFLOW_WORKER", "airflow-worker-pool", df_raw["pod"])
    df_raw["container"] = np.where(df_raw["workload_type"] == "SPARK_EXECUTOR", "executor", df_raw["container"])
    df_raw["container"] = np.where(df_raw["workload_type"] == "AIRFLOW_WORKER", "worker", df_raw["container"])

    target_fields = ["cpu_request", "cpu_limit", "cpu_usage", "cpu_throttled", "mem_request", "mem_limit", "mem_usage", "mem_rss", "oom_event", "pv_capacity", "pv_used"]
    for col in target_fields:
        if col not in df_raw.columns: df_raw[col] = 0.0

    df_pod = df_raw.groupby(["date", "cluster", "workload_domain", "namespace", "workload_type", "node", "pod", "container"]).agg(
        minutes_running      = ("timestamp", "size"), cpu_request_max      = ("cpu_request", "max"), cpu_limit_max        = ("cpu_limit", "max"),
        cpu_usage_p95        = ("cpu_usage", lambda x: x.quantile(0.95)), cpu_throttled_max    = ("cpu_throttled", "max"),
        mem_request_max      = ("mem_request", "max"), mem_limit_max        = ("mem_limit", "max"), mem_usage_p95        = ("mem_usage", lambda x: x.quantile(0.95)),
        mem_rss_p95          = ("mem_rss", lambda x: x.quantile(0.95)), oom_strike_sum       = ("oom_event", "sum"),
        pv_capacity_max      = ("pv_capacity", "max"), pv_used_p95          = ("pv_used", lambda x: x.quantile(0.95))
    ).reset_index().fillna(0)

    for col in ["mem_request_max", "mem_limit_max", "mem_usage_p95", "mem_rss_p95", "pv_capacity_max", "pv_used_p95"]:
        df_pod[col] = df_pod[col] / (1024**3)

    df_pod["cpu_allocated_core_hours"] = df_pod["cpu_request_max"] * (df_pod["minutes_running"] / 60.0)
    df_pod["cpu_usage_core_hours"]     = df_pod["cpu_usage_p95"] * (df_pod["minutes_running"] / 60.0)
    df_pod["cpu_waste_core_hours"]     = (df_pod["cpu_allocated_core_hours"] - df_pod["cpu_usage_core_hours"]).clip(lower=0)
    df_pod["mem_allocated_gb_hours"]   = df_pod["mem_request_max"] * (df_pod["minutes_running"] / 60.0)
    df_pod["mem_usage_gb_hours"]       = df_pod["mem_usage_p95"] * (df_pod["minutes_running"] / 60.0)
    df_pod["mem_waste_gb_hours"]       = (df_pod["mem_allocated_gb_hours"] - df_pod["mem_usage_gb_hours"]).clip(lower=0)
    df_pod["pv_allocated_gb_hours"]    = df_pod["pv_capacity_max"] * (df_pod["minutes_running"] / 60.0)
    df_pod["pv_usage_gb_hours"]        = df_pod["pv_used_p95"] * (df_pod["minutes_running"] / 60.0)
    df_pod["pv_waste_gb_hours"]        = (df_pod["pv_allocated_gb_hours"] - df_pod["pv_usage_gb_hours"]).clip(lower=0)

    df_pod["status"] = np.where(df_pod["oom_strike_sum"] > 0, "💥 OOM장애발생",
                        np.where((df_pod["cpu_usage_p95"] - df_pod["cpu_request_max"] > 0.5) | (df_pod["cpu_throttled_max"] > 0.2), "⚠️ Request부족", 
                        np.where((df_pod["cpu_waste_core_hours"] > 10) | (df_pod["pv_waste_gb_hours"] > 50), "📉 과다할당", "✅ 최적화완료")))

    df_pod.to_parquet(MERGED_DIR / "enriched_fixed_7d.parquet", index=False)

    # Namespace 일단위 롤업
    df_daily_ns = df_pod.groupby(["date", "namespace"]).agg(
        cpu_used_ch=("cpu_usage_core_hours", "sum"), cpu_alloc_ch=("cpu_allocated_core_hours", "sum"), cpu_waste_ch=("cpu_waste_core_hours", "sum"),
        mem_used_gh=("mem_usage_gb_hours", "sum"), mem_alloc_gh=("mem_allocated_gb_hours", "sum"),
        pv_used_gh=("pv_usage_gb_hours", "sum"), pv_alloc_gh=("pv_allocated_gb_hours", "sum")
    ).reset_index()
    df_daily_ns["final_usage_score"] = ((df_daily_ns["cpu_alloc_ch"] * W_CPU) + (df_daily_ns["mem_alloc_gh"] * W_MEM) + (df_daily_ns["pv_alloc_gh"] * W_PV)).round(1)
    df_daily_ns.to_parquet(MERGED_DIR / "daily_ns_usage.parquet", index=False)

    # 파레토 롤업
    df_ns = df_pod.groupby("namespace").agg(
        total_waste_core_hours=("cpu_waste_core_hours", "sum"), minutes_running_sum=("minutes_running", "sum"),
        container_cnt=("container", "count"), total_allocated_core_hours=("cpu_allocated_core_hours", "sum")
    ).reset_index().sort_values("total_waste_core_hours", ascending=False)
    global_total_waste = df_ns["total_waste_core_hours"].sum() or 0.1
    df_ns["waste_share_pct"] = (df_ns["total_waste_core_hours"] / global_total_waste * 100).round(2)
    df_ns["waste_cumsum_pct"] = df_ns["waste_share_pct"].cumsum().round(2)
    df_ns.to_parquet(MERGED_DIR / "pareto_fixed_ns.parquet", index=False)

    date_label = start_dt.strftime("%Y%m%d") + "_to_" + (end_dt - timedelta(days=1)).strftime("%Y%m%d")
    target_info = args.workload_domain if args.workload_domain else args.cluster
    with open(MERGED_DIR / "meta_run_info.txt", "w") as mf:
        mf.write(f"{target_info}@{date_label}")

    print(f"💾 요약 분석 가공 원부 배치 마감 완료. (위계: {target_info})")

    # ─── 💡 [요청 반영 핵심] 데이터Reprocessing이 성공 완수되었으므로 무겁던 RAW 캐시 강제 소거 ───
    print("\n✂️  [작업 공간 최적화] 다음 단계(차트/엑셀)를 위해 로컬 RAW 파일을 일괄 청소합니다.")
    for f in raw_files:
        try:
            f.unlink()
            print(f"   - 로컬 RAW 파티션 휘발 완수: {f.name}")
        except Exception as e:
            print(f"   - 파일 삭제 실패 주치의 경고 ({f.name}): {str(e)}")

    print("\n🏁 === [Step2 배포 마감] 로컬 무상태 디스크 무결성이 완벽히 복구되었습니다. ===")

if __name__ == "__main__":
    main()

0개의 댓글