"""
[2단계 - 고도화 메트릭 통합판] 인프라 정규식 필터 + 휘발성 팟 압축 + 신규 4종 메트릭(Throttling, RSS, PV) 정산 엔진
실행 (기본 일배치): python step2_pipeline.py
실행 (임시 요구사항): python step2_pipeline.py --cluster-type COMPUTE --start-date 2026-06-01 --end-date 2026-06-07
"""
import os
import re
import argparse
import boto3
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone
from pathlib import Path
from config import (
RAW_DIR, MERGED_DIR, MINIO_RAW_BUCKET,
CLUSTER_NODE_PATTERNS, get_workload_type, classify_cluster_infrastructure
)
KST = timezone(timedelta(hours=9))
def parse_arguments():
parser = argparse.ArgumentParser(description="FinOps Advanced Reprocessing Engine")
parser.add_argument("--days", type=int, default=None)
parser.add_argument("--start-date", type=str, default=None)
parser.add_argument("--end-date", type=str, default=None)
parser.add_argument("--cluster-type", type=str, default="ALL", choices=["COMPUTE", "STORAGE", "ALL"])
parser.add_argument("--cluster", type=str, default=None)
return parser.parse_args()
def get_minio_client():
return boto3.client(
"s3", endpoint_url=os.getenv("MINIO_ENDPOINT"),
aws_access_key_id=os.getenv("MINIO_ACCESS_KEY"),
aws_secret_access_key=os.getenv("MINIO_SECRET_KEY"),
config=boto3.session.Config(signature_version="s3v4")
)
def filter_target_nodes(df, cluster_type="ALL"):
"""
🌐 [동적 노드이름 포맷 필터]
선택한 클러스터 유형에 매핑된 config.py의 정규식 규격만 메모리 입구에서 통과시킵니다.
"""
c_type = cluster_type.upper()
initial_rows = len(df)
print(f"\n⚙️ [인프라 필터 가동] 선택 옵션: [{c_type} CLUSTER]")
if c_type == "ALL":
combined_pattern = "|".join(CLUSTER_NODE_PATTERNS.values())
print(f"🔍 전사 클러스터 통합 정규식 스캔 기동: '{combined_pattern}'")
df_filtered = df[df["node"].str.contains(combined_pattern, regex=True, na=False, case=False)].reset_index(drop=True)
else:
target_pattern = CLUSTER_NODE_PATTERNS.get(c_type, ".*")
print(f"🎯 [{c_type}] 전용 노드이름 형식 정밀 스캔 기동: '{target_pattern}'")
df_filtered = df[df["node"].str.contains(target_pattern, regex=True, na=False, case=False)].reset_index(drop=True)
dropped_rows = initial_rows - len(df_filtered)
print(f"✂️ [필터 스캔 마감] 원천 {initial_rows:,}행 중 타겟 외 {dropped_rows:,}행 제거 ➡️ {len(df_filtered):,}행 생존")
return df_filtered
def sync_down_raw_from_minio(s3_client, start_dt, end_dt):
chunk_delta = timedelta(hours=6)
current_start = start_dt
print(f"🪣 [MinIO 레이크 리싱크] {start_dt.strftime('%m-%d')} ~ {end_dt.strftime('%m-%d')} 파티션 다운로드 스캔...")
while current_start < end_dt:
chunk_str = current_start.strftime("%Y%m%d_%H")
for f_name in [f"prom_raw_{chunk_str}.parquet", f"prom_raw_{chunk_str}_active.parquet"]:
local_path = RAW_DIR / f_name
try:
s3_client.head_object(Bucket=MINIO_RAW_BUCKET, Key=f"raw/{f_name}")
if not (local_path.exists() and local_path.stat().st_size > 0 and "_active" not in f_name):
s3_client.download_file(MINIO_RAW_BUCKET, f"raw/{f_name}", str(local_path))
except Exception:
pass
current_start += chunk_delta
def main():
args = parse_arguments()
s3_client = get_minio_client()
now_kst = datetime.now(KST)
if args.start_date and args.end_date:
start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
end_dt = (datetime.strptime(args.end_date, "%Y-%m-%d") + timedelta(days=1)).replace(tzinfo=KST)
date_label = f"{args.start_date}_to_{args.end_date}"
elif args.days:
start_dt = (now_kst - timedelta(days=args.days)).replace(hour=0, minute=0, second=0, microsecond=0)
end_dt = now_kst
date_label = f"recent_{args.days}days"
else:
yesterday = (now_kst - timedelta(days=1)).date()
start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
end_dt = datetime.combine(yesterday, datetime.max.time()).replace(tzinfo=KST)
date_label = yesterday.strftime("%Y-%m-%d")
sync_down_raw_from_minio(s3_client, start_dt, end_dt)
raw_files = list(RAW_DIR.glob("prom_raw_*.parquet"))
if not raw_files:
print("⚠️ 처리할 원천 Parquet 파일이 레이크하우스에 없습니다.")
return
df_raw = pd.concat([pd.read_parquet(f) for f in raw_files], ignore_index=True)
df_raw["date"] = df_raw["timestamp"].dt.strftime("%Y-%m-%d")
df_raw = df_raw[(df_raw["timestamp"] >= start_dt.replace(tzinfo=None)) & (df_raw["timestamp"] < end_dt.replace(tzinfo=None))].reset_index(drop=True)
df_raw = filter_target_nodes(df_raw, cluster_type=args.cluster_type)
if df_raw.empty:
print("⚠️ 필터를 통과한 노드가 존재하지 않아 파이프라인을 조기 마감합니다.")
return
cluster_meta = df_raw["node"].apply(classify_cluster_infrastructure)
df_raw["cluster"] = [x[0] for x in cluster_meta]
df_raw["cluster_type"] = [x[1] for x in cluster_meta]
if args.cluster:
df_raw = df_raw[df_raw["cluster"] == args.cluster].reset_index(drop=True)
df_raw["workload_type"] = df_raw["pod"].apply(get_workload_type)
df_raw["pod"] = np.where(df_raw["workload_type"] == "SPARK_EXECUTOR", "spark-executor-pool", df_raw["pod"])
df_raw["pod"] = np.where(df_raw["workload_type"] == "AIRFLOW_WORKER", "airflow-worker-pool", df_raw["pod"])
df_raw["container"] = np.where(df_raw["workload_type"] == "SPARK_EXECUTOR", "executor", df_raw["container"])
df_raw["container"] = np.where(df_raw["workload_type"] == "AIRFLOW_WORKER", "worker", df_raw["container"])
target_fields = [
"cpu_request", "cpu_limit", "cpu_usage", "cpu_throttled",
"mem_request", "mem_limit", "mem_usage", "mem_rss",
"oom_event", "pv_capacity", "pv_used"
]
for col in target_fields:
if col not in df_raw.columns:
df_raw[col] = 0.0
print("📊 고정밀 시계열 기반 팟 단위 다차원 FinOps 집계 연산 가동...")
df_pod = df_raw.groupby(["date", "cluster", "cluster_type", "namespace", "workload_type", "node", "pod", "container"]).agg(
minutes_running = ("timestamp", "size"),
cpu_request_max = ("cpu_request", "max"),
cpu_limit_max = ("cpu_limit", "max"),
cpu_usage_p95 = ("cpu_usage", lambda x: x.quantile(0.95)),
cpu_throttled_max = ("cpu_throttled", "max"),
mem_request_max = ("mem_request", "max"),
mem_limit_max = ("mem_limit", "max"),
mem_usage_p95 = ("mem_usage", lambda x: x.quantile(0.95)),
mem_rss_p95 = ("mem_rss", lambda x: x.quantile(0.95)),
oom_strike_sum = ("oom_event", "sum"),
pv_capacity_max = ("pv_capacity", "max"),
pv_used_p95 = ("pv_used", lambda x: x.quantile(0.95))
).reset_index().fillna(0)
byte_to_gb_cols = ["mem_request_max", "mem_limit_max", "mem_usage_p95", "mem_rss_p95", "pv_capacity_max", "pv_used_p95"]
for col in byte_to_gb_cols:
df_pod[col] = df_pod[col] / (1024**3)
df_pod["cpu_allocated_core_hours"] = df_pod["cpu_request_max"] * (df_pod["minutes_running"] / 60.0)
df_pod["cpu_usage_core_hours"] = df_pod["cpu_usage_p95"] * (df_pod["minutes_running"] / 60.0)
df_pod["cpu_waste_core_hours"] = (df_pod["cpu_allocated_core_hours"] - df_pod["cpu_usage_core_hours"]).clip(lower=0)
df_pod["mem_allocated_gb_hours"] = df_pod["mem_request_max"] * (df_pod["minutes_running"] / 60.0)
df_pod["mem_usage_gb_hours"] = df_pod["mem_usage_p95"] * (df_pod["minutes_running"] / 60.0)
df_pod["mem_waste_gb_hours"] = (df_pod["mem_allocated_gb_hours"] - df_pod["mem_usage_gb_hours"]).clip(lower=0)
df_pod["pv_allocated_gb_hours"] = df_pod["pv_capacity_max"] * (df_pod["minutes_running"] / 60.0)
df_pod["pv_usage_gb_hours"] = df_pod["pv_used_p95"] * (df_pod["minutes_running"] / 60.0)
df_pod["pv_waste_gb_hours"] = (df_pod["pv_allocated_gb_hours"] - df_pod["pv_usage_gb_hours"]).clip(lower=0)
df_pod["is_oom_killed"] = df_pod["oom_strike_sum"] > 0
df_pod["has_no_request"] = (df_pod["cpu_request_max"] == 0) | (df_pod["mem_request_max"] == 0)
df_pod["has_no_limit"] = (df_pod["cpu_limit_max"] == 0) | (df_pod["mem_limit_max"] == 0)
df_pod["cpu_shortage_cores"] = (df_pod["cpu_usage_p95"] - df_pod["cpu_request_max"]).clip(lower=0)
df_pod["status"] = np.where(df_pod["is_oom_killed"], "💥 OOM장애발생",
np.where((df_pod["cpu_shortage_cores"] > 0.5) | (df_pod["cpu_throttled_max"] > 0.2), "⚠️ Request부족",
np.where((df_pod["cpu_waste_core_hours"] > 10) | (df_pod["pv_waste_gb_hours"] > 50), "📉 과다할당", "✅ 최적화완료")))
df_pod.to_parquet(MERGED_DIR / "enriched_fixed_7d.parquet", index=False)
df_ns = df_pod.groupby("namespace").agg(
minutes_running_sum = ("minutes_running", "sum"),
container_cnt = ("container", "count"),
total_allocated_core_hours = ("cpu_allocated_core_hours", "sum"),
total_waste_core_hours = ("cpu_waste_core_hours", "sum")
).reset_index().sort_values(by="total_waste_core_hours", ascending=False).reset_index(drop=True)
global_total_waste = df_ns["total_waste_core_hours"].sum() if df_ns["total_waste_core_hours"].sum() > 0 else 0.1
df_ns["waste_share_pct"] = (df_ns["total_waste_core_hours"] / global_total_waste * 100).round(2)
df_ns["waste_cumsum_pct"] = df_ns["waste_share_pct"].cumsum().round(2)
df_ns.to_parquet(MERGED_DIR / "pareto_fixed_ns.parquet", index=False)
with open(MERGED_DIR / "meta_run_info.txt", "w") as mf:
target_info = args.cluster if args.cluster else args.cluster_type
mf.write(f"{target_info}@{date_label}")
print(f"💾 [{target_info}] 전용 노드 형식 기준 중간 정산 데이터 파구조화 및 마감 완수.\n")
if __name__ == "__main__":
main()