"""
[2단계] 로컬 캐시 스마트 활용 + 데이터 변환 완수 후 RAW 일괄 휘발 정제 엔진
"""
import os
import argparse
import boto3
from botocore.exceptions import ClientError
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone
from pathlib import Path
import config
from config import (
RAW_DIR, MERGED_DIR, CLUSTER_NODE_PATTERNS,
get_workload_type, classify_node_infrastructure
)
KST = timezone(timedelta(hours=9))
W_CPU, W_MEM, W_PV = 1.0, 0.11, 0.02
def parse_arguments():
parser = argparse.ArgumentParser(description="FinOps Advanced Reprocessing Engine")
parser.add_argument("--days", type=int, default=None)
parser.add_argument("--start-date", type=str, default=None)
parser.add_argument("--end-date", type=str, default=None)
parser.add_argument("--cluster", type=str, default="ALL", choices=["COMPUTE", "STORAGE", "ALL"])
parser.add_argument("--workload-domain", type=str, default=None)
return parser.parse_args()
def get_minio_client():
return boto3.client(
"s3", endpoint_url=os.getenv("MINIO_ENDPOINT"), aws_access_key_id=os.getenv("MINIO_ACCESS_KEY"),
aws_secret_access_key=os.getenv("MINIO_SECRET_KEY"), region_name="us-east-1",
config=boto3.session.Config(signature_version="s3v4")
)
def filter_target_nodes(df, cluster_opt="ALL"):
c_opt = cluster_opt.upper()
if c_opt == "ALL":
combined_pattern = "|".join(CLUSTER_NODE_PATTERNS.values())
return df[df["node"].str.contains(combined_pattern, regex=True, na=False, case=False)]
else:
target_pattern = CLUSTER_NODE_PATTERNS.get(c_opt, ".*")
return df[df["node"].str.contains(target_pattern, regex=True, na=False, case=False)]
def sync_down_raw_from_minio(s3_client, bucket_name, start_dt, end_dt):
"""
💡 [스마트 캐시 레이어] 로컬에 이미 파일이 있다면 AIStor 원격 다운로드를 생략합니다.
"""
chunk_delta = timedelta(hours=1)
current_start = start_dt
print(f"\n📡 [AIStor 스마트 캐시 엔진] 원천 데이터 가용성 체크 시작...")
while current_start < end_dt:
chunk_str = current_start.strftime("%Y%m%d_%H")
f_name = f"prom_raw_{chunk_str}.parquet"
object_key = f"raw/{f_name}"
local_path = RAW_DIR / f_name
if local_path.exists() and local_path.stat().st_size > 0:
print(f" ⚡ [Cache Hit] 로컬 캐시 자산 활용 (다운로드 스킵): {f_name}")
else:
try:
s3_client.head_object(Bucket=bucket_name, Key=object_key)
print(f" 📥 [Cache Miss] AIStor 백업으로부터 다운로드: {object_key}")
s3_client.download_file(bucket_name, object_key, str(local_path))
except ClientError as e:
if e.response['Error']['Code'] != "404":
print(f" ❌ AIStor 통신 실패 에러 ({f_name}): {str(e)}")
except Exception as e:
print(f" ⚠️ 일반 다운로드 예외 발생 ({f_name}): {str(e)}")
current_start += chunk_delta
def main():
args = parse_arguments()
s3_client = get_minio_client()
bucket_name = os.getenv("MINIO_RAW_BUCKET", "devops-test")
now_kst = datetime.now(KST)
if args.start_date and args.end_date:
start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
end_dt = (datetime.strptime(args.end_date, "%Y-%m-%d") + timedelta(days=1)).replace(tzinfo=KST)
elif args.days:
start_dt = (now_kst - timedelta(days=args.days)).replace(hour=0, minute=0, second=0, microsecond=0)
end_dt = now_kst
else:
yesterday = (now_kst - timedelta(days=1)).date()
start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
end_dt = datetime.combine(yesterday, datetime.max.time()).replace(tzinfo=KST)
sync_down_raw_from_minio(s3_client, bucket_name, start_dt, end_dt)
raw_files = list(RAW_DIR.glob("prom_raw_*.parquet"))
if not raw_files:
print("\n🚨 [중단] 정산 가동할 물리 원천 파일이 존재하지 않습니다.")
return
print(f"\n📊 [데이터 병합] {len(raw_files)}개 시계열 파티션 로드 중...")
df_raw = pd.concat([pd.read_parquet(f) for f in raw_files], ignore_index=True)
df_raw["date"] = df_raw["timestamp"].dt.strftime("%Y-%m-%d")
df_raw = df_raw[(df_raw["timestamp"] >= start_dt.replace(tzinfo=None)) & (df_raw["timestamp"] < end_dt.replace(tzinfo=None))].reset_index(drop=True)
if df_raw.empty: return
df_raw = filter_target_nodes(df_raw, cluster_opt=args.cluster)
if df_raw.empty: return
infra_meta = df_raw["node"].apply(classify_node_infrastructure)
df_raw["cluster"] = [x[0] for x in infra_meta]
df_raw["workload_domain"] = [x[1] for x in infra_meta]
if args.workload_domain:
df_raw = df_raw[df_raw["workload_domain"] == args.workload_domain]
df_raw["workload_type"] = df_raw["pod"].apply(get_workload_type)
df_raw["pod"] = np.where(df_raw["workload_type"] == "SPARK_EXECUTOR", "spark-executor-pool", df_raw["pod"])
df_raw["pod"] = np.where(df_raw["workload_type"] == "AIRFLOW_WORKER", "airflow-worker-pool", df_raw["pod"])
df_raw["container"] = np.where(df_raw["workload_type"] == "SPARK_EXECUTOR", "executor", df_raw["container"])
df_raw["container"] = np.where(df_raw["workload_type"] == "AIRFLOW_WORKER", "worker", df_raw["container"])
target_fields = ["cpu_request", "cpu_limit", "cpu_usage", "cpu_throttled", "mem_request", "mem_limit", "mem_usage", "mem_rss", "oom_event", "pv_capacity", "pv_used"]
for col in target_fields:
if col not in df_raw.columns: df_raw[col] = 0.0
df_pod = df_raw.groupby(["date", "cluster", "workload_domain", "namespace", "workload_type", "node", "pod", "container"]).agg(
minutes_running = ("timestamp", "size"), cpu_request_max = ("cpu_request", "max"), cpu_limit_max = ("cpu_limit", "max"),
cpu_usage_p95 = ("cpu_usage", lambda x: x.quantile(0.95)), cpu_throttled_max = ("cpu_throttled", "max"),
mem_request_max = ("mem_request", "max"), mem_limit_max = ("mem_limit", "max"), mem_usage_p95 = ("mem_usage", lambda x: x.quantile(0.95)),
mem_rss_p95 = ("mem_rss", lambda x: x.quantile(0.95)), oom_strike_sum = ("oom_event", "sum"),
pv_capacity_max = ("pv_capacity", "max"), pv_used_p95 = ("pv_used", lambda x: x.quantile(0.95))
).reset_index().fillna(0)
for col in ["mem_request_max", "mem_limit_max", "mem_usage_p95", "mem_rss_p95", "pv_capacity_max", "pv_used_p95"]:
df_pod[col] = df_pod[col] / (1024**3)
df_pod["cpu_allocated_core_hours"] = df_pod["cpu_request_max"] * (df_pod["minutes_running"] / 60.0)
df_pod["cpu_usage_core_hours"] = df_pod["cpu_usage_p95"] * (df_pod["minutes_running"] / 60.0)
df_pod["cpu_waste_core_hours"] = (df_pod["cpu_allocated_core_hours"] - df_pod["cpu_usage_core_hours"]).clip(lower=0)
df_pod["mem_allocated_gb_hours"] = df_pod["mem_request_max"] * (df_pod["minutes_running"] / 60.0)
df_pod["mem_usage_gb_hours"] = df_pod["mem_usage_p95"] * (df_pod["minutes_running"] / 60.0)
df_pod["mem_waste_gb_hours"] = (df_pod["mem_allocated_gb_hours"] - df_pod["mem_usage_gb_hours"]).clip(lower=0)
df_pod["pv_allocated_gb_hours"] = df_pod["pv_capacity_max"] * (df_pod["minutes_running"] / 60.0)
df_pod["pv_usage_gb_hours"] = df_pod["pv_used_p95"] * (df_pod["minutes_running"] / 60.0)
df_pod["pv_waste_gb_hours"] = (df_pod["pv_allocated_gb_hours"] - df_pod["pv_usage_gb_hours"]).clip(lower=0)
df_pod["status"] = np.where(df_pod["oom_strike_sum"] > 0, "💥 OOM장애발생",
np.where((df_pod["cpu_usage_p95"] - df_pod["cpu_request_max"] > 0.5) | (df_pod["cpu_throttled_max"] > 0.2), "⚠️ Request부족",
np.where((df_pod["cpu_waste_core_hours"] > 10) | (df_pod["pv_waste_gb_hours"] > 50), "📉 과다할당", "✅ 최적화완료")))
df_pod.to_parquet(MERGED_DIR / "enriched_fixed_7d.parquet", index=False)
df_daily_ns = df_pod.groupby(["date", "namespace"]).agg(
cpu_used_ch=("cpu_usage_core_hours", "sum"), cpu_alloc_ch=("cpu_allocated_core_hours", "sum"), cpu_waste_ch=("cpu_waste_core_hours", "sum"),
mem_used_gh=("mem_usage_gb_hours", "sum"), mem_alloc_gh=("mem_allocated_gb_hours", "sum"),
pv_used_gh=("pv_usage_gb_hours", "sum"), pv_alloc_gh=("pv_allocated_gb_hours", "sum")
).reset_index()
df_daily_ns["final_usage_score"] = ((df_daily_ns["cpu_alloc_ch"] * W_CPU) + (df_daily_ns["mem_alloc_gh"] * W_MEM) + (df_daily_ns["pv_alloc_gh"] * W_PV)).round(1)
df_daily_ns.to_parquet(MERGED_DIR / "daily_ns_usage.parquet", index=False)
df_ns = df_pod.groupby("namespace").agg(
total_waste_core_hours=("cpu_waste_core_hours", "sum"), minutes_running_sum=("minutes_running", "sum"),
container_cnt=("container", "count"), total_allocated_core_hours=("cpu_allocated_core_hours", "sum")
).reset_index().sort_values("total_waste_core_hours", ascending=False)
global_total_waste = df_ns["total_waste_core_hours"].sum() or 0.1
df_ns["waste_share_pct"] = (df_ns["total_waste_core_hours"] / global_total_waste * 100).round(2)
df_ns["waste_cumsum_pct"] = df_ns["waste_share_pct"].cumsum().round(2)
df_ns.to_parquet(MERGED_DIR / "pareto_fixed_ns.parquet", index=False)
date_label = start_dt.strftime("%Y%m%d") + "_to_" + (end_dt - timedelta(days=1)).strftime("%Y%m%d")
target_info = args.workload_domain if args.workload_domain else args.cluster
with open(MERGED_DIR / "meta_run_info.txt", "w") as mf:
mf.write(f"{target_info}@{date_label}")
print(f"💾 요약 분석 가공 원부 배치 마감 완료. (위계: {target_info})")
print("\n✂️ [작업 공간 최적화] 다음 단계(차트/엑셀)를 위해 로컬 RAW 파일을 일괄 청소합니다.")
for f in raw_files:
try:
f.unlink()
print(f" - 로컬 RAW 파티션 휘발 완수: {f.name}")
except Exception as e:
print(f" - 파일 삭제 실패 주치의 경고 ({f.name}): {str(e)}")
print("\n🏁 === [Step2 배포 마감] 로컬 무상태 디스크 무결성이 완벽히 복구되었습니다. ===")
if __name__ == "__main__":
main()