"""
step1_prom_fetch.py — 1시간 분할 징수 및 AIStor 업로드 후 로컬 캐시 보존 엔진
"""
import os
import sys
import argparse
import requests
import boto3
import pandas as pd
from datetime import datetime, timedelta, timezone
import config
from config import RAW_DIR, get_finops_promql_queries
KST = timezone(timedelta(hours=9))
def parse_arguments():
parser = argparse.ArgumentParser(description="Thanos Resource Fetcher for Data Lakehouse")
parser.add_argument("--days", type=int, default=None)
parser.add_argument("--start-date", type=str, default=None)
parser.add_argument("--end-date", type=str, default=None)
parser.add_argument("--step", type=str, default="1m")
return parser.parse_args()
def query_thanos_matrix(thanos_url, query, start_ts, end_ts, step):
url = f"{thanos_url}/api/v1/query_range"
params = {"query": query, "start": start_ts, "end": end_ts, "step": step}
try:
response = requests.get(url, params=params, timeout=60)
if response.status_code == 200:
res_json = response.json()
if res_json.get("status") == "success":
return res_json.get("data", {}).get("result", [])
return []
except Exception as e:
print(f" ⚠️ Thanos API 에러: {str(e)}")
return []
def main():
args = parse_arguments()
thanos_url = os.getenv("THANOS_QUERY_URL", "http://thanos-query.internal.zone:9090")
bucket_name = os.getenv("MINIO_RAW_BUCKET", "devops-test")
endpoint = os.getenv("MINIO_ENDPOINT", "http://minio-service.internal.zone:9000")
access_key = os.getenv("MINIO_ACCESS_KEY", "admin")
secret_key = os.getenv("MINIO_SECRET_KEY", "password")
now_kst = datetime.now(KST)
if args.start_date and args.end_date:
start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
end_dt = datetime.strptime(args.end_date, "%Y-%m-%d").replace(tzinfo=KST) + timedelta(days=1)
elif args.days:
start_dt = (now_kst - timedelta(days=args.days)).replace(hour=0, minute=0, second=0, microsecond=0)
end_dt = now_kst
else:
yesterday = (now_kst - timedelta(days=1)).date()
start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
end_dt = datetime.combine(now_kst.date(), datetime.min.time()).replace(tzinfo=KST)
print(f"🚀 [Step1 개시] Thanos 1시간 단위 분할 수집 가동")
base_selector = '{container!="", namespace!~"kube-system|istio-system|monitoring"}'
queries_matrix = get_finops_promql_queries(base_selector)
s3_client = boto3.client(
"s3", endpoint_url=endpoint, aws_access_key_id=access_key, aws_secret_access_key=secret_key,
region_name="us-east-1", config=boto3.session.Config(signature_version="s3v4")
)
chunk_delta = timedelta(hours=1)
current_chunk_start = start_dt
while current_chunk_start < end_dt:
current_chunk_end = min(current_chunk_start + chunk_delta, end_dt)
chunk_str = current_chunk_start.strftime("%Y%m%d_%H")
start_ts = int(current_chunk_start.timestamp())
end_ts = int(current_chunk_end.timestamp())
print(f"⏳ 파티션 징수 및 AIStor 업로드 중... [{chunk_str}]")
records_master = []
for metric_name, query_str in queries_matrix.items():
result_set = query_thanos_matrix(thanos_url, query_str, start_ts, end_ts, args.step)
if not result_set: continue
for item in result_set:
metric_labels = item.get("metric", {})
values_list = item.get("values", [])
cluster_label = metric_labels.get("cluster", "prod-cluster")
namespace = metric_labels.get("namespace", "unknown")
pod = metric_labels.get("pod", "unknown")
container = metric_labels.get("container", "all-volume")
node = metric_labels.get("node", metric_labels.get("instance", "unknown")).split(":")[0]
for val_pair in values_list:
records_master.append({
"timestamp": datetime.fromtimestamp(float(val_pair[0]), tz=timezone.utc).astimezone(KST).replace(tzinfo=None),
"cluster": cluster_label, "namespace": namespace, "pod": pod, "container": container, "node": node,
"metric_type": metric_name, "value": float(val_pair[1])
})
if records_master:
df_chunk = pd.DataFrame(records_master)
df_pivot = df_chunk.groupby(["timestamp", "cluster", "namespace", "pod", "container", "node", "metric_type"])["value"].max().unstack().reset_index()
for expected_col in queries_matrix.keys():
if expected_col not in df_pivot.columns: df_pivot[expected_col] = 0.0
out_file = RAW_DIR / f"prom_raw_{chunk_str}.parquet"
df_pivot.to_parquet(str(out_file), index=False)
object_key = f"raw/prom_raw_{chunk_str}.parquet"
s3_client.upload_file(str(out_file), bucket_name, object_key)
print(f" ✅ AIStor 백업 완료 ➡️ 로컬 캐시 지속 보존: {out_file.name}")
else:
print(f" ⚠️ 청크 [{chunk_str}] 데이터 공백 스킵.")
current_chunk_start = current_chunk_end
print("\n🏁 === [Step1 전송 성료] 로컬 작업공간에 원천 Parquet 풀이 보존되었습니다. ===")
if __name__ == "__main__":
main()