26Y01c3

Young-Kyoo Kim·약 16시간 전
"""
step1_prom_fetch.py — 1시간 분할 징수 및 AIStor 업로드 후 로컬 자동 휘발 파이프라인
"""
import sys
import argparse
import requests
import boto3
import pandas as pd
from datetime import datetime, timedelta, timezone

# config 로드하여 강제 변수 마스킹
import config
from config import RAW_DIR, get_finops_promql_queries

KST = timezone(timedelta(hours=9))

def parse_arguments():
    parser = argparse.ArgumentParser(description="Thanos Resource Fetcher for Data Lakehouse")
    parser.add_argument("--days", type=int, default=None)
    parser.add_argument("--start-date", type=str, default=None)
    parser.add_argument("--end-date", type=str, default=None)
    parser.add_argument("--step", type=str, default="1m")
    return parser.parse_args()

def query_thanos_matrix(thanos_url, query, start_ts, end_ts, step):
    url = f"{thanos_url}/api/v1/query_range"
    params = {"query": query, "start": start_ts, "end": end_ts, "step": step}
    try:
        response = requests.get(url, params=params, timeout=60)
        if response.status_code == 200:
            res_json = response.json()
            if res_json.get("status") == "success":
                return res_json.get("data", {}).get("result", [])
        return []
    except Exception as e:
        print(f"    ⚠️  Thanos API 에러: {str(e)}")
        return []

def main():
    args = parse_arguments()
    thanos_url = config.os.getenv("THANOS_QUERY_URL", "http://thanos-query.internal.zone:9090")
    bucket_name = config.os.getenv("MINIO_RAW_BUCKET")
    now_kst = datetime.now(KST)
    
    if args.start_date and args.end_date:
        start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
        end_dt = datetime.strptime(args.end_date, "%Y-%m-%d").replace(tzinfo=KST) + timedelta(days=1)
    elif args.days:
        start_dt = (now_kst - timedelta(days=args.days)).replace(hour=0, minute=0, second=0, microsecond=0)
        end_dt = now_kst
    else:
        yesterday = (now_kst - timedelta(days=1)).date()
        start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
        end_dt = datetime.combine(now_kst.date(), datetime.min.time()).replace(tzinfo=KST)

    print(f"🚀 [Step1] 타노스 지표 1시간 분할 수집 가동")
    print(f"📅 시간 범위 (KST): {start_dt} ➡️ {end_dt}")

    base_selector = '{container!="", namespace!~"kube-system|istio-system|monitoring"}'
    queries_matrix = get_finops_promql_queries(base_selector)

    s3_client = boto3.client(
        "s3", endpoint_url=config.os.getenv("MINIO_ENDPOINT"),
        aws_access_key_id=config.os.getenv("MINIO_ACCESS_KEY"),
        aws_secret_access_key=config.os.getenv("MINIO_SECRET_KEY"),
        config=boto3.session.Config(signature_version="s3v4")
    )

    # 💡 [요청 반영] 게이트웨이 타임아웃 예방을 위해 무조건 1시간 단위 분할 루프 기동
    chunk_delta = timedelta(hours=1)
    current_chunk_start = start_dt

    while current_chunk_start < end_dt:
        current_chunk_end = min(current_chunk_start + chunk_delta, end_dt)
        chunk_str = current_chunk_start.strftime("%Y%m%d_%H")
        
        start_ts = int(current_chunk_start.timestamp())
        end_ts = int(current_chunk_end.timestamp())

        print(f"⏳ 파티션 세션 백업 중... [{chunk_str}]")
        records_master = []

        for metric_name, query_str in queries_matrix.items():
            result_set = query_thanos_matrix(thanos_url, query_str, start_ts, end_ts, args.step)
            if not result_set: continue

            for item in result_set:
                metric_labels = item.get("metric", {})
                values_list = item.get("values", [])

                cluster_label = metric_labels.get("cluster", "prod-cluster")
                namespace     = metric_labels.get("namespace", "unknown")
                pod           = metric_labels.get("pod", "unknown")
                container     = metric_labels.get("container", "all-volume")
                node          = metric_labels.get("node", metric_labels.get("instance", "unknown")).split(":")[0]

                for val_pair in values_list:
                    utc_ts = float(val_pair[0])
                    kst_dt = datetime.fromtimestamp(utc_ts, tz=timezone.utc).astimezone(KST).replace(tzinfo=None)
                    metric_value = float(val_pair[1])

                    records_master.append({
                        "timestamp": kst_dt, "cluster_label": cluster_label, "namespace": namespace,
                        "pod": pod, "container": container, "node": node,
                        "metric_type": metric_name, "value": metric_value
                    })

        if records_master:
            df_chunk = pd.DataFrame(records_master)
            df_pivot = df_chunk.groupby(
                ["timestamp", "cluster_label", "namespace", "pod", "container", "node", "metric_type"]
            )["value"].max().unstack().reset_index()

            for expected_col in queries_matrix.keys():
                if expected_col not in df_pivot.columns:
                    df_pivot[expected_col] = 0.0

            out_file = RAW_DIR / f"prom_raw_{chunk_str}.parquet"
            df_pivot.to_parquet(str(out_file), index=False)
            
            object_key = f"raw/prom_raw_{chunk_str}.parquet"
            s3_client.upload_file(str(out_path:=out_file), bucket_name, object_key)
            out_file.unlink() # 로컬 즉시 삭제
        else:
            print(f"   ⚠️  청크 [{chunk_str}] 데이터 공백 스킵.")

        current_chunk_start = current_chunk_end

    print("\n🏁 === [Step1 완료] 1시간 단위 최적화 전송 마감 ===")

if __name__ == "__main__":
    main()

0개의 댓글