26Y01b3

Young-Kyoo Kim·약 18시간 전
"""
step1_prom_fetch.py — 고정밀 FinOps 11대 리소스 지표 수집 및 데이터 레이크 적재 엔진
[인자 고도화 반영: --days, --start-date, --end-date 멀티 핸들링 패치]

실행 (기본 어제 하루): python step1_prom_fetch.py
실행 (특정 일수 지정): python step1_prom_fetch.py --days 7
실행 (특정 기간 지정): python step1_prom_fetch.py --start-date 2026-06-01 --end-date 2026-06-07
"""

import os
import sys
import argparse
import requests
import pandas as pd
from datetime import datetime, timedelta, timezone
from pathlib import Path

# config.py 설정 원부 로드
try:
    from config import RAW_DIR, DEFAULT_THANOS_URL, get_finops_promql_queries
except ImportError:
    print("❌ 오류: config.py 파일이 같은 디렉토리에 없습니다.")
    sys.exit(1)

KST = timezone(timedelta(hours=9))

def parse_arguments():
    parser = argparse.ArgumentParser(description="Thanos Resource Fetcher for Data Lakehouse")
    # ─── ⏱️ [요청 반영] 날짜 기간 추적 인자 전격 탑재 ───
    parser.add_argument("--days", type=int, default=None, help="수집할 과거 일수 (날짜 인자가 없을 때 활성화)")
    parser.add_argument("--start-date", type=str, default=None, help="조회 시작 일자 (KST 기준 YYYY-MM-DD)")
    parser.add_argument("--end-date", type=str, default=None, help="조회 종료 일자 (KST 기준 YYYY-MM-DD)")
    parser.add_argument("--step", type=str, default="1m", help="프로메테우스 시계열 해상도 주기")
    return parser.parse_args()

def query_thanos_matrix(thanos_url, query, start_ts, end_ts, step):
    url = f"{thanos_url}/api/v1/query_range"
    params = {
        "query": query,
        "start": start_ts,
        "end": end_ts,
        "step": step
    }
    try:
        response = requests.get(url, params=params, timeout=60)
        if response.status_code == 200:
            res_json = response.json()
            if res_json.get("status") == "success":
                return res_json.get("data", {}).get("result", [])
        return []
    except Exception as e:
        print(f"    ⚠️  Thanos API 통신 에러: {str(e)}")
        return []

def main():
    args = parse_arguments()
    thanos_url = os.getenv("THANOS_QUERY_URL", DEFAULT_THANOS_URL)
    now_kst = datetime.now(KST)
    
    # ─── ⏱️ [핵심 로직] 시간 인자 분기 처리 가드레일 ───
    if args.start_date and args.end_date:
        # 1. 특정 시작일과 종료일이 명시적으로 주어졌을 때
        start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
        # 종료일은 해당 날짜의 23시 59분 59초까지 긁어야 하므로 하루를 더해 데드라인 셋업
        end_dt = datetime.strptime(args.end_date, "%Y-%m-%d").replace(tzinfo=KST) + timedelta(days=1)
        print(f"🎯 [인자 확인] 사용자 지정 기간 정산 모드 기동")
    elif args.days:
        # 2. 일수 제어 인자만 주어졌을 때
        start_dt = (now_kst - timedelta(days=args.days)).replace(hour=0, minute=0, second=0, microsecond=0)
        end_dt = now_kst
        print(f"🎯 [인자 확인] 최근 {args.days}일 기준 시계열 스캔 모드 기동")
    else:
        # 3. 아무 인자도 주지 않았을 때 폴백 가드 (기본값: 어제 자정 ~ 오늘 자정 전까지)
        yesterday = (now_kst - timedelta(days=1)).date()
        start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
        end_dt = datetime.combine(now_kst.date(), datetime.min.time()).replace(tzinfo=KST)
        print(f"🎯 [인자 확인] 기본 폴백 어제자 전사 데이터 수집 모드 기동")

    print(f"🚀 [Step1 개시] Thanos 빅데이터 징수 파이프라인 구동")
    print(f"📅 조회 시간 범위 (KST): {start_dt.strftime('%Y-%m-%d %H:%M')} ➡️ {end_dt.strftime('%Y-%m-%d %H:%M')} (해상도: {args.step})")

    base_selector = '{container!="", namespace!~"kube-system|istio-system|monitoring"}'
    queries_matrix = get_finops_promql_queries(base_selector)

    # 6시간 분할 아카이빙 루프
    chunk_delta = timedelta(hours=6)
    current_chunk_start = start_dt

    while current_chunk_start < end_dt:
        current_chunk_end = min(current_chunk_start + chunk_delta, end_dt)
        chunk_str = current_chunk_start.strftime("%Y%m%d_%H")
        
        start_ts = int(current_chunk_start.timestamp())
        end_ts = int(current_chunk_end.timestamp())

        print(f"\n⏳ 파티션 청크 [{chunk_str}] (6시간 분량) 징수 프로세스 가동...")
        records_master = []

        for metric_name, query_str in queries_matrix.items():
            print(f"  -> 🔍 메트릭 징수 중: {metric_name}...")
            result_set = query_thanos_matrix(thanos_url, query_str, start_ts, end_ts, args.step)
            
            if not result_set:
                continue
                
            print(f"     ✅ 수집 완료: 고유 시계열 시퀀스 {len(result_set):,}개 검출")

            for item in result_set:
                metric_labels = item.get("metric", {})
                values_list = item.get("values", [])

                cluster   = metric_labels.get("cluster", "prod-cluster")
                namespace = metric_labels.get("namespace", "unknown")
                pod       = metric_labels.get("pod", "unknown")
                container = metric_labels.get("container", "all-volume")
                node      = metric_labels.get("node", metric_labels.get("instance", "unknown")).split(":")[0]

                for val_pair in values_list:
                    utc_ts = float(val_pair[0])
                    kst_dt = datetime.fromtimestamp(utc_ts, tz=timezone.utc).astimezone(KST).replace(tzinfo=None)
                    metric_value = float(val_pair[1])

                    records_master.append({
                        "timestamp": kst_dt,
                        "cluster": cluster,
                        "namespace": namespace,
                        "pod": pod,
                        "container": container,
                        "node": node,
                        "metric_type": metric_name,
                        "value": metric_value
                    })

        if records_master:
            print(f"💾 파티션 [{chunk_str}] 메모리 구조화 및 Parquet 피벗 가동 중...")
            df_chunk = pd.DataFrame(records_master)
            df_pivot = df_chunk.groupby(
                ["timestamp", "cluster", "namespace", "pod", "container", "node", "metric_type"]
            )["value"].max().unstack().reset_index()

            for expected_col in queries_matrix.keys():
                if expected_col not in df_pivot.columns:
                    df_pivot[expected_col] = 0.0

            out_file = RAW_DIR / f"prom_raw_{chunk_str}.parquet"
            df_pivot.to_parquet(str(out_file), index=False)
            print(f"📦 [적재 성공] 레이크하우스 아카이빙 성료: {out_file.name} ({out_file.stat().st_size/1024:.0f} KB)")
        else:
            print(f"⚠️  주의: 청크 [{chunk_str}] 기간에 Thanos 메트릭 수집 데이터가 전무합니다.")

        current_chunk_start = current_chunk_end

    print("\n🏁 === [Step1 성공 종료] 모든 지정 기간 타노스 메트릭 수집이 마감되었습니다. ===")

if __name__ == "__main__":
    main()
    
    
    
===
python -c "import pandas as pd; df = pd.read_parquet('./data/merged/enriched_fixed_7d.parquet'); print(df[['cluster_type', 'cluster', 'node']].drop_duplicates().to_string(index=False))"

0개의 댓글