"""
step1_prom_fetch.py — 고정밀 FinOps 11대 리소스 지표 수집 및 데이터 레이크 적재 엔진
실행: python step1_prom_fetch.py --days 1 --step 1m
"""
import os
import sys
import argparse
import requests
import pandas as pd
from datetime import datetime, timedelta, timezone
from pathlib import Path
try:
from config import RAW_DIR, DEFAULT_THANOS_URL, get_finops_promql_queries
except ImportError:
print("❌ 오류: config.py 파일이 같은 디렉토리에 없습니다.")
sys.exit(1)
KST = timezone(timedelta(hours=9))
def parse_arguments():
parser = argparse.ArgumentParser(description="Thanos Resource Fetcher for Data Lakehouse")
parser.add_argument("--days", type=int, default=1, help="수집할 과거 일수")
parser.add_argument("--start-date", type=str, default=None, help="KST YYYY-MM-DD")
parser.add_argument("--end-date", type=str, default=None, help="KST YYYY-MM-DD")
parser.add_argument("--step", type=str, default="1m", help="프로메테우스 시계열 해상도 주기")
return parser.parse_args()
def query_thanos_matrix(thanos_url, query, start_ts, end_ts, step):
"""
Thanos Query API를 호출하여 시계열 매트릭스(Matrix) 데이터를 가져옵니다.
"""
url = f"{thanos_url}/api/v1/query_range"
params = {
"query": query,
"start": start_ts,
"end": end_ts,
"step": step
}
try:
response = requests.get(url, params=params, timeout=60)
if response.status_code == 200:
res_json = response.json()
if res_json.get("status") == "success":
return res_json.get("data", {}).get("result", [])
return []
except Exception as e:
print(f" ⚠️ Thanos API 통신 에러: {str(e)}")
return []
def main():
args = parse_arguments()
thanos_url = os.getenv("THANOS_QUERY_URL", DEFAULT_THANOS_URL)
now_kst = datetime.now(KST)
if args.start_date and args.end_date:
start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
end_dt = datetime.strptime(args.end_date, "%Y-%m-%d").replace(tzinfo=KST) + timedelta(days=1)
else:
yesterday = (now_kst - timedelta(days=args.days)).date()
start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
end_dt = datetime.combine(now_kst.date(), datetime.min.time()).replace(tzinfo=KST)
print(f"🚀 [Step1 개시] Thanos 빅데이터 징수 파이프라인 구동")
print(f"🌐 타겟 Thanos Query: {thanos_url}")
print(f"📅 조회 시간 범위 (KST): {start_dt} ➡️ {end_dt} (해상도 주기: {args.step})")
base_selector = '{container!="", namespace!~"kube-system|istio-system|monitoring"}'
queries_matrix = get_finops_promql_queries(base_selector)
chunk_delta = timedelta(hours=6)
current_chunk_start = start_dt
while current_chunk_start < end_dt:
current_chunk_end = min(current_chunk_start + chunk_delta, end_dt)
chunk_str = current_chunk_start.strftime("%Y%m%d_%H")
start_ts = int(current_chunk_start.timestamp())
end_ts = int(current_chunk_end.timestamp())
print(f"\n⏳ 파티션 청크 [{chunk_str}] (6시간 분량) 징수 프로세스 가동...")
records_master = []
for metric_name, query_str in queries_matrix.items():
print(f" -> 🔍 메트릭 징수 중: {metric_name}...")
result_set = query_thanos_matrix(thanos_url, query_str, start_ts, end_ts, args.step)
if not result_set:
continue
print(f" ✅ 수집 완료: 고유 시계열 시퀀스 {len(result_set):,}개 검출")
for item in result_set:
metric_labels = item.get("metric", {})
values_list = item.get("values", [])
cluster = metric_labels.get("cluster", "prod-cluster")
namespace = metric_labels.get("namespace", "unknown")
pod = metric_labels.get("pod", "unknown")
container = metric_labels.get("container", "all-volume")
node = metric_labels.get("node", metric_labels.get("instance", "unknown")).split(":")[0]
for val_pair in values_list:
utc_ts = float(val_pair[0])
kst_dt = datetime.fromtimestamp(utc_ts, tz=timezone.utc).astimezone(KST).replace(tzinfo=None)
metric_value = float(val_pair[1])
records_master.append({
"timestamp": kst_dt,
"cluster": cluster,
"namespace": namespace,
"pod": pod,
"container": container,
"node": node,
"metric_type": metric_name,
"value": metric_value
})
if records_master:
print(f"💾 파티션 [{chunk_str}] 메모리 구조화 및 Parquet 피벗 가동 중...")
df_chunk = pd.DataFrame(records_master)
df_pivot = df_chunk.pivot_values = df_chunk.groupby(
["timestamp", "cluster", "namespace", "pod", "container", "node", "metric_type"]
)["value"].max().unstack().reset_index()
for expected_col in queries_matrix.keys():
if expected_col not in df_pivot.columns:
df_pivot[expected_col] = 0.0
out_file = RAW_DIR / f"prom_raw_{chunk_str}.parquet"
df_pivot.to_parquet(out_path:=str(out_file), index=False)
print(f"📦 [적재 성공] 레이크하우스 아카이빙 성료: {out_file.name} ({out_file.stat().st_size/1024:.0f} KB)")
else:
print(f"⚠️ 주의: 청크 [{chunk_str}] 기간에 Thanos 메트릭 수집 데이터가 전무합니다.")
current_chunk_start = current_chunk_end
print("\n🏁 === [Step1 성공 종료] 11대 거버넌스 메트릭 자원 레이크가 완벽히 완공되었습니다. ===")
if __name__ == "__main__":
main()