"""
step1_prom_fetch.py — 1시간 분할 징수 및 AIStor 업로드 후 로컬 자동 휘발 파이프라인 (연산 및 스키마 무결성 완치판)
"""
import os
import sys
import argparse
import requests
import boto3
import pandas as pd
from datetime import datetime, timedelta, timezone
import config
from config import RAW_DIR, get_finops_promql_queries
KST = timezone(timedelta(hours=9))
def parse_arguments():
parser = argparse.ArgumentParser(description="Thanos Resource Fetcher for Data Lakehouse")
parser.add_argument("--days", type=int, default=None)
parser.add_argument("--start-date", type=str, default=None)
parser.add_argument("--end-date", type=str, default=None)
parser.add_argument("--step", type=str, default="1m")
return parser.parse_args()
def query_thanos_matrix(thanos_url, query, start_ts, end_ts, step):
url = f"{thanos_url}/api/v1/query_range"
params = {"query": query, "start": start_ts, "end": end_ts, "step": step}
try:
response = requests.get(url, params=params, timeout=60)
if response.status_code == 200:
res_json = response.json()
if res_json.get("status") == "success":
return res_json.get("data", {}).get("result", [])
return []
except Exception as e:
print(f" ⚠️ Thanos API 에러: {str(e)}")
return []
def main():
args = parse_arguments()
thanos_url = os.getenv("THANOS_QUERY_URL", "http://thanos-query.internal.zone:9090")
bucket_name = os.getenv("MINIO_RAW_BUCKET", "devops-test")
endpoint = os.getenv("MINIO_ENDPOINT", "http://minio-service.internal.zone:9000")
access_key = os.getenv("MINIO_ACCESS_KEY", "admin")
secret_key = os.getenv("MINIO_SECRET_KEY", "password")
now_kst = datetime.now(KST)
if args.start_date and args.end_date:
start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
end_dt = datetime.strptime(args.end_date, "%Y-%m-%d").replace(tzinfo=KST) + timedelta(days=1)
elif args.days:
start_dt = (now_kst - timedelta(days=args.days)).replace(hour=0, minute=0, second=0, microsecond=0)
end_dt = now_kst
else:
yesterday = (now_kst - timedelta(days=1)).date()
start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
end_dt = datetime.combine(now_kst.date(), datetime.min.time()).replace(tzinfo=KST)
print(f"🚀 [Step1] 타노스 지표 1시간 분할 수집 가동")
print(f"📅 시간 범위 (KST): {start_dt} ➡️ {end_dt}")
base_selector = '{container!="", namespace!~"kube-system|istio-system|monitoring"}'
queries_matrix = get_finops_promql_queries(base_selector)
s3_client = boto3.client(
"s3",
endpoint_url=endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name="us-east-1",
config=boto3.session.Config(signature_version="s3v4")
)
chunk_delta = timedelta(hours=1)
current_chunk_start = start_dt
while current_chunk_start < end_dt:
current_chunk_end = min(current_chunk_start + chunk_delta, end_dt)
chunk_str = current_chunk_start.strftime("%Y%m%d_%H")
start_ts = int(current_chunk_start.timestamp())
end_ts = int(current_chunk_end.timestamp())
print(f"⏳ 파티션 세션 백업 중... [{chunk_str}]")
records_master = []
for metric_name, query_str in queries_matrix.items():
result_set = query_thanos_matrix(thanos_url, query_str, start_ts, end_ts, args.step)
if not result_set: continue
for item in result_set:
metric_labels = item.get("metric", {})
values_list = item.get("values", [])
cluster_label = metric_labels.get("cluster", "prod-cluster")
namespace = metric_labels.get("namespace", "unknown")
pod = metric_labels.get("pod", "unknown")
container = metric_labels.get("container", "all-volume")
node = metric_labels.get("node", metric_labels.get("instance", "unknown")).split(":")[0]
for val_pair in values_list:
utc_ts = float(val_pair[0])
kst_dt = datetime.fromtimestamp(utc_ts, tz=timezone.utc).astimezone(KST).replace(tzinfo=None)
metric_value = float(val_pair[1])
records_master.append({
"timestamp": kst_dt,
"cluster": cluster_label,
"namespace": namespace,
"pod": pod,
"container": container,
"node": node,
"metric_type": metric_name,
"value": metric_value
})
if records_master:
df_chunk = pd.DataFrame(records_master)
df_pivot = df_chunk.groupby(
["timestamp", "cluster", "namespace", "pod", "container", "node", "metric_type"]
)["value"].max().unstack().reset_index()
for expected_col in queries_matrix.keys():
if expected_col not in df_pivot.columns:
df_pivot[expected_col] = 0.0
out_file = RAW_DIR / f"prom_raw_{chunk_str}.parquet"
df_pivot.to_parquet(str(out_file), index=False)
object_key = f"raw/prom_raw_{chunk_str}.parquet"
print(f" ⬆️ [오브젝트 스토리지 적재] Local ➡️ AIStor S3://{bucket_name}/{object_key}")
s3_client.upload_file(str(out_file), bucket_name, object_key)
out_file.unlink()
print(f" ✂️ [작업공간 휘발 완수] 로컬 캐시 파일 {out_file.name} 완전히 소거됨.")
else:
print(f" ⚠️ 청크 [{chunk_str}] 수집 데이터가 없어 스킵합니다.")
current_chunk_start = current_chunk_end
print("\n🏁 === [Step1 완료] 1시간 단위 분할 수집 및 AIStor 영구 백업 완수 ===")
if __name__ == "__main__":
main()