"""
step1_prom_fetch.py — 1시간 분할 징수 및 AIStor 업로드 후 로컬 자동 휘발 파이프라인 (Boto3 안정성 패치판)
"""
import os
import sys
import argparse
import requests
import boto3
import pandas as pd
from datetime import datetime, timedelta, timezone
import config
from config import RAW_DIR, get_finops_promql_queries
KST = timezone(timedelta(hours=9))
def parse_arguments():
parser = argparse.ArgumentParser(description="Thanos Resource Fetcher for Data Lakehouse")
parser.add_argument("--days", type=int, default=None)
parser.add_argument("--start-date", type=str, default=None)
parser.add_argument("--end-date", type=str, default=None)
parser.add_argument("--step", type=str, default="1m")
return parser.parse_args()
def query_thanos_matrix(thanos_url, query, start_ts, end_ts, step):
url = f"{thanos_url}/api/v1/query_range"
params = {"query": query, "start": start_ts, "end": end_ts, "step": step}
try:
response = requests.get(url, params=params, timeout=60)
if response.status_code == 200:
res_json = response.json()
if res_json.get("status") == "success":
return res_json.get("data", {}).get("result", [])
return []
except Exception as e:
print(f" ⚠️ Thanos API 에러: {str(e)}")
return []
def main():
args = parse_arguments()
thanos_url = os.getenv("THANOS_QUERY_URL", "http://thanos-query.internal.zone:9090")
bucket_name = os.getenv("MINIO_RAW_BUCKET", "devops-test")
endpoint = os.getenv("MINIO_ENDPOINT", "http://minio-service.internal.zone:9000")
access_key = os.getenv("MINIO_ACCESS_KEY", "admin")
secret_key = os.getenv("MINIO_SECRET_KEY", "password")
now_kst = datetime.now(KST)
if args.start_date and args.end_date:
start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
end_dt = datetime.strptime(args.end_date, "%Y-%m-%d").replace(tzinfo=KST) + timedelta(days=1)
elif args.days:
start_dt = (now_kst - timedelta(days=args.days)).replace(hour=0, minute=0, second=0, microsecond=0)
end_dt = now_kst
else:
yesterday = (now_kst - timedelta(days=1)).date()
start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
end_dt = datetime.combine(now_kst.date(), datetime.min.time()).replace(tzinfo=KST)
print(f"🚀 [Step1] 타노스 지표 1시간 분할 수집 가동")
print(f"📅 시간 범위 (KST): {start_dt} ➡️ {end_dt}")
base_selector = '{container!="", namespace!~"kube-system|istio-system|monitoring"}'
queries_matrix = get_finops_promql_queries(base_selector)
s3_client = boto3.client(
"s3",
endpoint_url=endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name="us-east-1",
config=boto3.session.Config(signature_version="s3v4")
)
chunk_delta = timedelta(hours=1)
current_chunk_start = start_dt
while current_chunk_start < end_dt:
chunk_str = current_chunk_start.strftime("%Y%m%d_%H")
start_ts = int(current_chunk_start.timestamp())
end_ts = int(current_chunk_end:=current_chunk_start + chunk_delta).timestamp()
print(f"⏳ 파티션 세션 백업 중... [{chunk_str}]")
records_master = []
for metric_name, query_str in queries_matrix.items():
result_set = query_thanos_matrix(thanos_url, query_str, start_ts, end_ts, args.step)
if not result_set: continue
for item in result_set:
metric_labels = item.get("metric", {})
values_list = item.get("values", [])
cluster_label = metric_labels.get("cluster", "prod-cluster")
namespace = metric_labels.get("namespace", "unknown")
pod = metric_labels.get("pod", "unknown")
container = metric_labels.get("container", "all-volume")
node = metric_labels.get("node", metric_labels.get("instance", "unknown")).split(":")[0]
for val_pair in values_list:
utc_ts = float(val_pair[0])
kst_dt = datetime.fromtimestamp(utc_ts, tz=timezone.utc).astimezone(KST).replace(tzinfo=None)
metric_value = float(val_pair[1])
records_master.append({
"timestamp": kst_dt, "cluster_label": cluster_label, "namespace": namespace,
"pod": pod, "container": container, "node": node,
"metric_type": metric_name, "value": metric_value
})
if records_master:
df_chunk = pd.DataFrame(records_master)
df_pivot = df_chunk.groupby(
["timestamp", "cluster", "namespace", "pod", "container", "node", "metric_type"]
)["value"].max().unstack().reset_index()
for expected_col in queries_matrix.keys():
if expected_col not in df_pivot.columns:
df_pivot[expected_col] = 0.0
out_file = RAW_DIR / f"prom_raw_{chunk_str}.parquet"
df_pivot.to_parquet(str(out_file), index=False)
object_key = f"raw/prom_raw_{chunk_str}.parquet"
print(f" ⬆️ [오브젝트 스토리지 적재] Local ➡️ AIStor S3://{bucket_name}/{object_key}")
s3_client.upload_file(str(out_path:=out_file), bucket_name, object_key)
out_file.unlink()
print(f" ✂️ [작업공간 휘발 완수] 로컬 캐시 파일 {out_file.name} 완전히 소거됨.")
else:
print(f" ⚠️ 청크 [{chunk_str}] 수집 데이터가 없어 스킵합니다.")
current_chunk_start = current_chunk_end
print("\n🏁 === [Step1 완료] 1시간 단위 분할 수집 및 AIStor 영구 백업 완수 ===")
if __name__ == "__main__":
main()