"""
[2단계] 날짜 + 클러스터(COMPUTE/STORAGE) 복합 파티셔닝 및 데이터 마감 정산 엔진 (ILM 외주화 버전)
"""
import os
import argparse
import boto3
from botocore.exceptions import ClientError
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone
from pathlib import Path
import config
from config import (
RAW_DIR, MERGED_DIR, CLUSTER_NODE_PATTERNS,
get_workload_type, classify_node_infrastructure
)
KST = timezone(timedelta(hours=9))
W_CPU, W_MEM, W_PV = 1.0, 0.11, 0.02
def parse_arguments():
parser = argparse.ArgumentParser(description="FinOps Advanced Reprocessing Engine")
parser.add_argument("--days", type=int, default=None)
parser.add_argument("--start-date", type=str, default=None)
parser.add_argument("--end-date", type=str, default=None)
parser.add_argument("--cluster", type=str, default="ALL", choices=["COMPUTE", "STORAGE", "ALL"])
parser.add_argument("--workload-domain", type=str, default=None)
return parser.parse_args()
def get_minio_client():
return boto3.client(
"s3", endpoint_url=os.getenv("MINIO_ENDPOINT"), aws_access_key_id=os.getenv("MINIO_ACCESS_KEY"),
aws_secret_access_key=os.getenv("MINIO_SECRET_KEY"), region_name="us-east-1",
config=boto3.session.Config(signature_version="s3v4")
)
def sync_down_raw_from_minio(s3_client, bucket_name, start_dt, end_dt):
chunk_delta = timedelta(hours=1)
current_start = start_dt
print(f"\n📡 [AIStor 스마트 캐시 엔진] 원천 데이터 가용성 체크 시작...")
while current_start < end_dt:
chunk_str = current_start.strftime("%Y%m%d_%H")
f_name = f"prom_raw_{chunk_str}.parquet"
object_key = f"raw/{f_name}"
local_path = RAW_DIR / f_name
if local_path.exists() and local_path.stat().st_size > 0:
print(f" ⚡ [Cache Hit] 로컬 캐시 자산 활용 (다운로드 스킵): {f_name}")
else:
try:
s3_client.head_object(Bucket=bucket_name, Key=object_key)
print(f" 📥 [Cache Miss] AIStor 백업으로부터 다운로드: {object_key}")
s3_client.download_file(bucket_name, object_key, str(local_path))
except ClientError as e:
if e.response['Error']['Code'] != "404":
print(f" ❌ AIStor 통신 실패 에러 ({f_name}): {str(e)}")
except Exception as e:
print(f" ⚠️ 일반 다운로드 예외 발생 ({f_name}): {str(e)}")
current_start += chunk_delta
def main():
args = parse_arguments()
s3_client = get_minio_client()
bucket_name = os.getenv("MINIO_RAW_BUCKET", "devops-test")
now_kst = datetime.now(KST)
if args.start_date and args.end_date:
start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
end_dt = (datetime.strptime(args.end_date, "%Y-%m-%d") + timedelta(days=1)).replace(tzinfo=KST)
elif args.days:
start_dt = (now_kst - timedelta(days=args.days)).replace(hour=0, minute=0, second=0, microsecond=0)
end_dt = now_kst
else:
yesterday = (now_kst - timedelta(days=1)).date()
start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
end_dt = datetime.combine(now_kst.date(), datetime.min.time()).replace(tzinfo=KST)
sync_down_raw_from_minio(s3_client, bucket_name, start_dt, end_dt)
raw_files = list(RAW_DIR.glob("prom_raw_*.parquet"))
if not raw_files:
print("\n🚨 [중단] 정산 가동할 물리 원천 파일이 존재하지 않습니다.")
return
print(f"\n📊 [데이터 병합 및 인프라 매핑 일원화 스캔] {len(raw_files)}개 파티션 로드 중...")
df_raw = pd.concat([pd.read_parquet(f) for f in raw_files], ignore_index=True)
df_raw["date"] = df_raw["timestamp"].dt.strftime("%Y%m%d")
df_raw = df_raw[(df_raw["timestamp"] >= start_dt.replace(tzinfo=None)) & (df_raw["timestamp"] < end_dt.replace(tzinfo=None))].reset_index(drop=True)
if df_raw.empty:
print("⚠️ 해당 기간 내 정산할 데이터 로우가 없습니다.")
return
infra_meta = df_raw["node"].apply(classify_node_infrastructure)
df_raw["cluster_type"] = [x[0] for x in infra_meta]
df_raw["workload_domain"] = [x[1] for x in infra_meta]
df_raw["workload_type"] = df_raw["pod"].apply(get_workload_type)
target_dates = df_raw["date"].unique()
target_clusters = ["COMPUTE", "STORAGE"] if args.cluster.upper() == "ALL" else [args.cluster.upper()]
print(f"🎯 분할 연산 타겟 스케줄러 가동 -> 일자 풀: {target_dates} | 클러스터 풀: {target_clusters}")
for date_chunk in target_dates:
for cluster_chunk in target_clusters:
df_slice = df_raw[(df_raw["date"] == date_chunk) & (df_raw["cluster_type"] == cluster_chunk)]
if df_slice.empty:
continue
print(f" ⏳ [정산 컴파일 중] {date_chunk} ➡️ {cluster_chunk} 그룹 연산 기동...")
if args.workload_domain:
df_slice = df_slice[df_slice["workload_domain"] == args.workload_domain]
if df_slice.empty: continue
target_fields = ["cpu_request", "cpu_limit", "cpu_usage", "cpu_throttled", "mem_request", "mem_limit", "mem_usage", "mem_rss", "oom_event", "pv_capacity", "pv_used"]
for col in target_fields:
if col not in df_slice.columns: df_slice[col] = 0.0
df_pod = df_slice.groupby(["date", "cluster_type", "workload_domain", "namespace", "workload_type", "node", "pod", "container"]).agg(
minutes_running = ("timestamp", "size"), cpu_request_max = ("cpu_request", "max"), cpu_limit_max = ("cpu_limit", "max"),
cpu_usage_p95 = ("cpu_usage", lambda x: x.quantile(0.95)), cpu_throttled_max = ("cpu_throttled", "max"),
mem_request_max = ("mem_request", "max"), mem_limit_max = ("mem_limit", "max"), mem_usage_p95 = ("mem_usage", lambda x: x.quantile(0.95)),
mem_rss_p95 = ("mem_rss", lambda x: x.quantile(0.95)), oom_strike_sum = ("oom_event", "sum"),
pv_capacity_max = ("pv_capacity", "max"), pv_used_p95 = ("pv_used", lambda x: x.quantile(0.95))
).reset_index().fillna(0)
for col in ["mem_request_max", "mem_limit_max", "mem_usage_p95", "mem_rss_p95", "pv_capacity_max", "pv_used_p95"]:
df_pod[col] = df_pod[col] / (1024**3)
df_pod["cpu_allocated_core_hours"] = df_pod["cpu_request_max"] * (df_pod["minutes_running"] / 60.0)
df_pod["cpu_usage_core_hours"] = df_pod["cpu_usage_p95"] * (df_pod["minutes_running"] / 60.0)
df_pod["cpu_waste_core_hours"] = (df_pod["cpu_allocated_core_hours"] - df_pod["cpu_usage_core_hours"]).clip(lower=0)
df_pod["mem_allocated_gb_hours"] = df_pod["mem_request_max"] * (df_pod["minutes_running"] / 60.0)
df_pod["mem_usage_gb_hours"] = df_pod["mem_usage_p95"] * (df_pod["minutes_running"] / 60.0)
df_pod["mem_waste_gb_hours"] = (df_pod["mem_allocated_gb_hours"] - df_pod["mem_usage_gb_hours"]).clip(lower=0)
df_pod["pv_allocated_gb_hours"] = df_pod["pv_capacity_max"] * (df_pod["minutes_running"] / 60.0)
df_pod["pv_usage_gb_hours"] = df_pod["pv_used_p95"] * (df_pod["minutes_running"] / 60.0)
df_pod["pv_waste_gb_hours"] = (df_pod["pv_allocated_gb_hours"] - df_pod["pv_usage_gb_hours"]).clip(lower=0)
df_pod["is_oom_killed"] = df_pod["oom_strike_sum"] > 0
df_pod["has_no_request"] = (df_pod["cpu_request_max"] == 0) | (df_pod["mem_request_max"] == 0)
df_pod["has_no_limit"] = (df_pod["cpu_limit_max"] == 0) | (df_pod["mem_limit_max"] == 0)
df_pod["cpu_shortage_cores"] = (df_pod["cpu_usage_p95"] - df_pod["cpu_request_max"]).clip(lower=0)
df_pod["status"] = np.where(df_pod["is_oom_killed"], "💥 OOM장애발생",
np.where((df_pod["cpu_shortage_cores"] > 0.5) | (df_pod["cpu_throttled_max"] > 0.2), "⚠️ Request부족",
np.where((df_pod["cpu_waste_core_hours"] > 10) | (df_pod["pv_waste_gb_hours"] > 50), "📉 과다할당", "✅ 최적화완료")))
cluster_partition_dir = MERGED_DIR / cluster_chunk
cluster_partition_dir.mkdir(parents=True, exist_ok=True)
target_output_file = cluster_partition_dir / f"daily_enriched_{cluster_chunk}_{date_chunk}.parquet"
df_pod.to_parquet(target_output_file, index=False)
print(f" 💾 [저장 성료] 인프라 자산 정산 원부 생성됨 ➡️ {target_output_file.name}")
df_daily_ns = df_pod.groupby(["date", "namespace"]).agg(
cpu_used_ch=("cpu_usage_core_hours", "sum"), cpu_alloc_ch=("cpu_allocated_core_hours", "sum"), cpu_waste_ch=("cpu_waste_core_hours", "sum"),
mem_used_gh=("mem_usage_gb_hours", "sum"), mem_alloc_gh=("mem_allocated_gb_hours", "sum"),
pv_used_gh=("pv_usage_gb_hours", "sum"), pv_alloc_gh=("pv_allocated_gb_hours", "sum")
).reset_index()
df_daily_ns["final_usage_score"] = ((df_daily_ns["cpu_alloc_ch"] * W_CPU) + (df_daily_ns["mem_alloc_gh"] * W_MEM) + (df_daily_ns["pv_alloc_gh"] * W_PV)).round(1)
df_daily_ns.to_parquet(cluster_partition_dir / f"daily_ns_usage_{cluster_chunk}_{date_chunk}.parquet", index=False)
df_ns = df_pod.groupby("namespace").agg(
total_waste_core_hours=("cpu_waste_core_hours", "sum"), minutes_running_sum=("minutes_running", "sum"),
container_cnt=("container", "count"), total_allocated_core_hours=("cpu_allocated_core_hours", "sum")
).reset_index().sort_values("total_waste_core_hours", ascending=False)
global_total_waste = df_ns["total_waste_core_hours"].sum() or 0.1
df_ns["waste_share_pct"] = (df_ns["total_waste_core_hours"] / global_total_waste * 100).round(2)
df_ns["waste_cumsum_pct"] = df_ns["waste_share_pct"].cumsum().round(2)
df_ns.to_parquet(cluster_partition_dir / f"pareto_ns_{cluster_chunk}_{date_chunk}.parquet", index=False)
print("\nℹ️ [안내] 원천 RAW 파일 자동 삭제 세션이 종료되었습니다.")
print(" - 로컬 디스크 및 AIStor S3 버킷의 RAW 데이터는 사내 스토리지 인프라 ILM 정책에 의해 자동 순환 소거됩니다.")
print("\n🏁 === [Step2 분할 정복 배치 완료] 날짜 및 클러스터별 파티셔닝 구조가 완벽히 구축되었습니다. ===")
if __name__ == "__main__":
main()