"""
[2단계] AIStor 역풀링 + 정규식 일원화 기반 새 위계(cluster / workload_domain) 정산 엔진
"""
import argparse
import boto3
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone
from pathlib import Path
import config
from config import (
RAW_DIR, MERGED_DIR, CLUSTER_NODE_PATTERNS,
get_workload_type, classify_node_infrastructure
)
KST = timezone(timedelta(hours=9))
W_CPU, W_MEM, W_PV = 1.0, 0.11, 0.02
def parse_arguments():
parser = argparse.ArgumentParser(description="FinOps Advanced Reprocessing Engine")
parser.add_argument("--days", type=int, default=None)
parser.add_argument("--start-date", type=str, default=None)
parser.add_argument("--end-date", type=str, default=None)
parser.add_argument("--cluster", type=str, default="ALL", choices=["COMPUTE", "STORAGE", "ALL"])
parser.add_argument("--workload-domain", type=str, default=None)
return parser.parse_args()
def get_minio_client():
return boto3.client(
"s3", endpoint_url=config.os.getenv("MINIO_ENDPOINT"),
aws_access_key_id=config.os.getenv("MINIO_ACCESS_KEY"),
aws_secret_access_key=config.os.getenv("MINIO_SECRET_KEY"),
config=boto3.session.Config(signature_version="s3v4")
)
def filter_target_nodes(df, cluster_opt="ALL"):
c_opt = cluster_opt.upper()
initial_rows = len(df)
if c_opt == "ALL":
combined_pattern = "|".join(CLUSTER_NODE_PATTERNS.values())
df_filtered = df[df["node"].str.contains(combined_pattern, regex=True, na=False, case=False)].reset_index(drop=True)
else:
target_pattern = CLUSTER_NODE_PATTERNS.get(c_opt, ".*")
df_filtered = df[df["node"].str.contains(target_pattern, regex=True, na=False, case=False)].reset_index(drop=True)
return df_filtered
def sync_down_raw_from_minio(s3_client, bucket_name, start_dt, end_dt):
chunk_delta = timedelta(hours=1)
current_start = start_dt
print(f"📥 [AIStor 레이크하우스 풀링] 1시간 단위 파일 다운로드...")
while current_start < end_dt:
chunk_str = current_start.strftime("%Y%m%d_%H")
f_name = f"prom_raw_{chunk_str}.parquet"
local_path = RAW_DIR / f_name
try:
s3_client.head_object(Bucket=bucket_name, Key=f"raw/{f_name}")
if not local_path.exists():
s3_client.download_file(bucket_name, f"raw/{f_name}", str(local_path))
except Exception:
pass
current_start += chunk_delta
def main():
args = parse_arguments()
s3_client = get_minio_client()
bucket_name = config.os.getenv("MINIO_RAW_BUCKET")
now_kst = datetime.now(KST)
if args.start_date and args.end_date:
start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
end_dt = (datetime.strptime(args.end_date, "%Y-%m-%d") + timedelta(days=1)).replace(tzinfo=KST)
date_label = f"{args.start_date}_to_{args.end_date}"
elif args.days:
start_dt = (now_kst - timedelta(days=args.days)).replace(hour=0, minute=0, second=0, microsecond=0)
end_dt = now_kst
date_label = f"recent_{args.days}days"
else:
yesterday = (now_kst - timedelta(days=1)).date()
start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
end_dt = datetime.combine(yesterday, datetime.max.time()).replace(tzinfo=KST)
date_label = yesterday.strftime("%Y-%m-%d")
sync_down_raw_from_minio(s3_client, bucket_name, start_dt, end_dt)
raw_files = list(RAW_DIR.glob("prom_raw_*.parquet"))
if not raw_files: return
df_raw = pd.concat([pd.read_parquet(f) for f in raw_files], ignore_index=True)
df_raw["date"] = df_raw["timestamp"].dt.strftime("%Y-%m-%d")
df_raw = df_raw[(df_raw["timestamp"] >= start_dt.replace(tzinfo=None)) & (df_raw["timestamp"] < end_dt.replace(tzinfo=None))].reset_index(drop=True)
df_raw = filter_target_nodes(df_raw, cluster_opt=args.cluster)
if df_raw.empty: return
infra_meta = df_raw["node"].apply(classify_node_infrastructure)
df_raw["cluster"] = [x[0] for x in infra_meta]
df_raw["workload_domain"] = [x[1] for x in infra_meta]
if args.workload_domain:
df_raw = df_raw[df_raw["workload_domain"] == args.workload_domain].reset_index(drop=True)
df_raw["workload_type"] = df_raw["pod"].apply(get_workload_type)
df_raw["pod"] = np.where(df_raw["workload_type"] == "SPARK_EXECUTOR", "spark-executor-pool", df_raw["pod"])
df_raw["pod"] = np.where(df_raw["workload_type"] == "AIRFLOW_WORKER", "airflow-worker-pool", df_raw["pod"])
df_raw["container"] = np.where(df_raw["workload_type"] == "SPARK_EXECUTOR", "executor", df_raw["container"])
df_raw["container"] = np.where(df_raw["workload_type"] == "AIRFLOW_WORKER", "worker", df_raw["container"])
target_fields = ["cpu_request", "cpu_limit", "cpu_usage", "cpu_throttled", "mem_request", "mem_limit", "mem_usage", "mem_rss", "oom_event", "pv_capacity", "pv_used"]
for col in target_fields:
if col not in df_raw.columns: df_raw[col] = 0.0
print("📊 새로운 인프라 위계 기준 팟 단위 다차원 집계 가동...")
df_pod = df_raw.groupby(["date", "cluster", "workload_domain", "namespace", "workload_type", "node", "pod", "container"]).agg(
minutes_running = ("timestamp", "size"),
cpu_request_max = ("cpu_request", "max"),
cpu_limit_max = ("cpu_limit", "max"),
cpu_usage_p95 = ("cpu_usage", lambda x: x.quantile(0.95)),
cpu_throttled_max = ("cpu_throttled", "max"),
mem_request_max = ("mem_request", "max"),
mem_limit_max = ("mem_limit", "max"),
mem_usage_p95 = ("mem_usage", lambda x: x.quantile(0.95)),
mem_rss_p95 = ("mem_rss", lambda x: x.quantile(0.95)),
oom_strike_sum = ("oom_event", "sum"),
pv_capacity_max = ("pv_capacity", "max"),
pv_used_p95 = ("pv_used", lambda x: x.quantile(0.95))
).reset_index().fillna(0)
for col in ["mem_request_max", "mem_limit_max", "mem_usage_p95", "mem_rss_p95", "pv_capacity_max", "pv_used_p95"]:
df_pod[col] = df_pod[col] / (1024**3)
df_pod["cpu_allocated_core_hours"] = df_pod["cpu_request_max"] * (df_pod["minutes_running"] / 60.0)
df_pod["cpu_usage_core_hours"] = df_pod["cpu_usage_p95"] * (df_pod["minutes_running"] / 60.0)
df_pod["cpu_waste_core_hours"] = (df_pod["cpu_allocated_core_hours"] - df_pod["cpu_usage_core_hours"]).clip(lower=0)
df_pod["mem_allocated_gb_hours"] = df_pod["mem_request_max"] * (df_pod["minutes_running"] / 60.0)
df_pod["mem_usage_gb_hours"] = df_pod["mem_usage_p95"] * (df_pod["minutes_running"] / 60.0)
df_pod["mem_waste_gb_hours"] = (df_pod["mem_allocated_gb_hours"] - df_pod["mem_usage_gb_hours"]).clip(lower=0)
df_pod["pv_allocated_gb_hours"] = df_pod["pv_capacity_max"] * (df_pod["minutes_running"] / 60.0)
df_pod["pv_usage_gb_hours"] = df_pod["pv_used_p95"] * (df_pod["minutes_running"] / 60.0)
df_pod["pv_waste_gb_hours"] = (df_pod["pv_allocated_gb_hours"] - df_pod["pv_usage_gb_hours"]).clip(lower=0)
df_pod["is_oom_killed"] = df_pod["oom_strike_sum"] > 0
df_pod["has_no_request"] = (df_pod["cpu_request_max"] == 0) | (df_pod["mem_request_max"] == 0)
df_pod["has_no_limit"] = (df_pod["cpu_limit_max"] == 0) | (df_pod["mem_limit_max"] == 0)
df_pod["cpu_shortage_cores"] = (df_pod["cpu_usage_p95"] - df_pod["cpu_request_max"]).clip(lower=0)
df_pod["status"] = np.where(df_pod["is_oom_killed"], "💥 OOM장애발생",
np.where((df_pod["cpu_shortage_cores"] > 0.5) | (df_pod["cpu_throttled_max"] > 0.2), "⚠️ Request부족",
np.where((df_pod["cpu_waste_core_hours"] > 10) | (df_pod["pv_waste_gb_hours"] > 50), "📉 과다할당", "✅ 최적화완료")))
df_pod.to_parquet(MERGED_DIR / "enriched_fixed_7d.parquet", index=False)
df_daily_ns = df_pod.groupby(["date", "namespace"]).agg(
cpu_used_ch = ("cpu_usage_core_hours", "sum"), cpu_alloc_ch = ("cpu_allocated_core_hours", "sum"), cpu_waste_ch = ("cpu_waste_core_hours", "sum"),
mem_used_gh = ("mem_usage_gb_hours", "sum"), mem_alloc_gh = ("mem_allocated_gb_hours", "sum"),
pv_used_gh = ("pv_usage_gb_hours", "sum"), pv_alloc_gh = ("pv_allocated_gb_hours", "sum")
).reset_index()
df_daily_ns["final_usage_score"] = ((df_daily_ns["cpu_alloc_ch"] * W_CPU) + (df_daily_ns["mem_alloc_gh"] * W_MEM) + (df_daily_ns["pv_alloc_gh"] * W_PV)).round(1)
df_daily_ns.to_parquet(MERGED_DIR / "daily_ns_usage.parquet", index=False)
df_ns = df_pod.groupby("namespace").agg(
minutes_running_sum = ("minutes_running", "sum"), container_cnt = ("container", "count"),
total_allocated_core_hours = ("cpu_allocated_core_hours", "sum"), total_waste_core_hours = ("cpu_waste_core_hours", "sum")
).reset_index().sort_values(by="total_waste_core_hours", ascending=False).reset_index(drop=True)
global_total_waste = df_ns["total_waste_core_hours"].sum() if df_ns["total_waste_core_hours"].sum() > 0 else 0.1
df_ns["waste_share_pct"] = (df_ns["total_waste_core_hours"] / global_total_waste * 100).round(2)
df_ns["waste_cumsum_pct"] = df_ns["waste_share_pct"].cumsum().round(2)
df_ns.to_parquet(MERGED_DIR / "pareto_fixed_ns.parquet", index=False)
with open(MERGED_DIR / "meta_run_info.txt", "w") as mf:
target_info = args.workload_domain if args.workload_domain else args.cluster
mf.write(f"{target_info}@{date_label}")
print(f"💾 [{target_info}] 새 아키텍처 규격 기준 정산 캐시 빌드 마감.\n")
if __name__ == "__main__":
main()