"""
[2단계] 동적 인프라 정규식 필터링 및 카디널리티 압축 엔진
실행 (전체): python step2_pipeline.py
실행 (연산전용 포맷만): python step2_pipeline.py --cluster-type COMPUTE
실행 (스토리지포맷만): python step2_pipeline.py --cluster-type STORAGE
"""
import os
import re
import argparse
import boto3
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone
from pathlib import Path
from config import (
RAW_DIR, MERGED_DIR, MINIO_RAW_BUCKET,
CLUSTER_NODE_PATTERNS, get_workload_type, classify_cluster_infrastructure
)
KST = timezone(timedelta(hours=9))
def parse_arguments():
parser = argparse.ArgumentParser(description="FinOps Dynamic Gatekeeper Engine")
parser.add_argument("--days", type=int, default=None)
parser.add_argument("--start-date", type=str, default=None)
parser.add_argument("--end-date", type=str, default=None)
parser.add_argument("--cluster-type", type=str, default="ALL", choices=["COMPUTE", "STORAGE", "ALL"])
parser.add_argument("--cluster", type=str, default=None)
return parser.parse_args()
def get_minio_client():
return boto3.client(
"s3", endpoint_url=os.getenv("MINIO_ENDPOINT"),
aws_access_key_id=os.getenv("MINIO_ACCESS_KEY"),
aws_secret_access_key=os.getenv("MINIO_SECRET_KEY"),
config=boto3.session.Config(signature_version="s3v4")
)
def filter_target_nodes(df, cluster_type="ALL"):
"""
🌐 [동적 노드이름 포맷 필터]
선택한 클러스터 유형에 매핑된 config.py의 정규식 규격만 메모리 입구에서 통과시킵니다.
"""
c_type = cluster_type.upper()
initial_rows = len(df)
print(f"\n⚙️ [인프라 필터 가동] 선택 옵션: [{c_type} CLUSTER]")
if c_type == "ALL":
combined_pattern = "|".join(CLUSTER_NODE_PATTERNS.values())
print(f"🔍 전사 클러스터 통합 정규식 스캔 기동: '{combined_pattern}'")
df_filtered = df[df["node"].str.contains(combined_pattern, regex=True, na=False, case=False)].reset_index(drop=True)
else:
target_pattern = CLUSTER_NODE_PATTERNS.get(c_type, ".*")
print(f"🎯 [{c_type}] 전용 노드이름 형식 정밀 스캔 기동: '{target_pattern}'")
df_filtered = df[df["node"].str.contains(target_pattern, regex=True, na=False, case=False)].reset_index(drop=True)
dropped_rows = initial_rows - len(df_filtered)
print(f"✂️ [필터 스캔 마감] 원천 {initial_rows:,}행 중 타겟 외 {dropped_rows:,}행 노이즈 즉시 사살 ➡️ {len(df_filtered):,}행 생존\n")
return df_filtered
def sync_down_raw_from_minio(s3_client, start_dt, end_dt):
chunk_delta = timedelta(hours=6)
current_start = start_dt
print(f"🪣 [MinIO 레이크 리싱크] {start_dt.strftime('%m-%d')} ~ {end_dt.strftime('%m-%d')} 파일 스캔...")
while current_start < end_dt:
chunk_str = current_start.strftime("%Y%m%d_%H")
for f_name in [f"prom_raw_{chunk_str}.parquet", f"prom_raw_{chunk_str}_active.parquet"]:
local_path = RAW_DIR / f_name
try:
s3_client.head_object(Bucket=MINIO_RAW_BUCKET, Key=f"raw/{f_name}")
if not (local_path.exists() and local_path.stat().st_size > 0 and "_active" not in f_name):
s3_client.download_file(MINIO_RAW_BUCKET, f"raw/{f_name}", str(local_path))
except Exception:
pass
current_start += chunk_delta
def main():
args = parse_arguments()
s3_client = get_minio_client()
now_kst = datetime.now(KST)
if args.start_date and args.end_date:
start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
end_dt = (datetime.strptime(args.end_date, "%Y-%m-%d") + timedelta(days=1)).replace(tzinfo=KST)
date_label = f"{args.start_date}_to_{args.end_date}"
elif args.days:
start_dt = (now_kst - timedelta(days=args.days)).replace(hour=0, minute=0, second=0, microsecond=0)
end_dt = now_kst
date_label = f"recent_{args.days}days"
else:
yesterday = (now_kst - timedelta(days=1)).date()
start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
end_dt = datetime.combine(yesterday, datetime.max.time()).replace(tzinfo=KST)
date_label = yesterday.strftime("%Y-%m-%d")
sync_down_raw_from_minio(s3_client, start_dt, end_dt)
raw_files = list(RAW_DIR.glob("prom_raw_*.parquet"))
if not raw_files:
print("⚠️ 처리할 원천 Parquet 파일이 레이크하우스에 없습니다.")
return
df_raw = pd.concat([pd.read_parquet(f) for f in raw_files], ignore_index=True)
df_raw["date"] = df_raw["timestamp"].dt.strftime("%Y-%m-%d")
df_raw = df_raw[(df_raw["timestamp"] >= start_dt.replace(tzinfo=None)) & (df_raw["timestamp"] < end_dt.replace(tzinfo=None))].reset_index(drop=True)
df_raw = filter_target_nodes(df_raw, cluster_type=args.cluster_type)
if df_raw.empty:
print("⚠️ 필터를 통과한 노드가 존재하지 않아 파이프라인을 조기 마감합니다.")
return
cluster_meta = df_raw["node"].apply(classify_cluster_infrastructure)
df_raw["cluster"] = [x[0] for x in cluster_meta]
df_raw["cluster_type"] = [x[1] for x in cluster_meta]
if args.cluster:
df_raw = df_raw[df_raw["cluster"] == args.cluster].reset_index(drop=True)
df_raw["workload_type"] = df_raw["pod"].apply(get_workload_type)
df_raw["pod"] = np.where(df_raw["workload_type"] == "SPARK_EXECUTOR", "spark-executor-pool", df_raw["pod"])
df_raw["pod"] = np.where(df_raw["workload_type"] == "AIRFLOW_WORKER", "airflow-worker-pool", df_raw["pod"])
df_raw["container"] = np.where(df_raw["workload_type"] == "SPARK_EXECUTOR", "executor", df_raw["container"])
df_raw["container"] = np.where(df_raw["workload_type"] == "AIRFLOW_WORKER", "worker", df_raw["container"])
for col in ["cpu_limit", "mem_limit", "oom_event"]:
if col not in df_raw.columns: df_raw[col] = 0.0
print("📊 고정밀 시계열 기반 팟 단위 리밸런싱 집계 가동...")
df_pod = df_raw.groupby(["date", "cluster", "cluster_type", "namespace", "workload_type", "node", "pod", "container"]).agg(
minutes_running = ("timestamp", "size"),
cpu_request_max = ("cpu_request", "max"),
cpu_limit_max = ("cpu_limit", "max"),
cpu_usage_p95 = ("cpu_usage", lambda x: x.quantile(0.95)),
mem_request_max = ("mem_request", "max"),
mem_limit_max = ("mem_limit", "max"),
mem_usage_p95 = ("mem_usage", lambda x: x.quantile(0.95)),
oom_strike_sum = ("oom_event", "sum")
).reset_index().fillna(0)
for col in ["mem_request_max", "mem_limit_max", "mem_usage_p95"]:
df_pod[col] = df_pod[col] / (1024**3)
df_pod["cpu_allocated_core_hours"] = df_pod["cpu_request_max"] * (df_pod["minutes_running"] / 60.0)
df_pod["cpu_usage_core_hours"] = df_pod["cpu_usage_p95"] * (df_pod["minutes_running"] / 60.0)
df_pod["cpu_waste_core_hours"] = (df_pod["cpu_allocated_core_hours"] - df_pod["cpu_usage_core_hours"]).clip(lower=0)
df_pod["mem_allocated_gb_hours"] = df_pod["mem_request_max"] * (df_pod["minutes_running"] / 60.0)
df_pod["mem_usage_gb_hours"] = df_pod["mem_usage_p95"] * (df_pod["minutes_running"] / 60.0)
df_pod["mem_waste_gb_hours"] = (df_pod["mem_allocated_gb_hours"] - df_pod["mem_usage_gb_hours"]).clip(lower=0)
df_pod["is_oom_killed"] = df_pod["oom_strike_sum"] > 0
df_pod["has_no_request"] = (df_pod["cpu_request_max"] == 0) | (df_pod["mem_request_max"] == 0)
df_pod["has_no_limit"] = (df_pod["cpu_limit_max"] == 0) | (df_pod["mem_limit_max"] == 0)
df_pod["cpu_shortage_cores"] = (df_pod["cpu_usage_p95"] - df_pod["cpu_request_max"]).clip(lower=0)
df_pod["status"] = np.where(df_pod["is_oom_killed"], "💥 OOM장애발생",
np.where(df_pod["cpu_shortage_cores"] > 0.5, "⚠️ Request부족",
np.where(df_pod["cpu_waste_core_hours"] > 10, "📉 과다할당", "✅ 최적화완료")))
df_pod.to_parquet(MERGED_DIR / "enriched_fixed_7d.parquet", index=False)
df_ns = df_pod.groupby("namespace").agg(
minutes_running_sum = ("minutes_running", "sum"),
container_cnt = ("container", "count"),
total_allocated_core_hours = ("cpu_allocated_core_hours", "sum"),
total_waste_core_hours = ("cpu_waste_core_hours", "sum")
).reset_index().sort_values(by="total_waste_core_hours", ascending=False).reset_index(drop=True)
global_total_waste = df_ns["total_waste_core_hours"].sum() if df_ns["total_waste_core_hours"].sum() > 0 else 0.1
df_ns["waste_share_pct"] = (df_ns["total_waste_core_hours"] / global_total_waste * 100).round(2)
df_ns["waste_cumsum_pct"] = df_ns["waste_share_pct"].cumsum().round(2)
df_ns.to_parquet(MERGED_DIR / "pareto_fixed_ns.parquet", index=False)
with open(MERGED_DIR / "meta_run_info.txt", "w") as mf:
target_info = args.cluster if args.cluster else args.cluster_type
mf.write(f"{target_info}@{date_label}")
print(f"💾 [{target_info}] 전용 포맷 기준 중간 정산 데이터 마감 완료.\n")
if __name__ == "__main__":
main()