26Y01a2

Young-Kyoo Kim·약 16시간 전
"""
step1_prom_fetch.py — 고정밀 FinOps 11대 리소스 지표 수집 및 데이터 레이크 적재 엔진
실행: python step1_prom_fetch.py --days 1 --step 1m
"""

import os
import sys
import argparse
import requests
import pandas as pd
from datetime import datetime, timedelta, timezone
from pathlib import Path

# config.py 설정 원부 로드
try:
    from config import RAW_DIR, DEFAULT_THANOS_URL, get_finops_promql_queries
except ImportError:
    print("❌ 오류: config.py 파일이 같은 디렉토리에 없습니다.")
    sys.exit(1)

KST = timezone(timedelta(hours=9))

def parse_arguments():
    parser = argparse.ArgumentParser(description="Thanos Resource Fetcher for Data Lakehouse")
    parser.add_argument("--days", type=int, default=1, help="수집할 과거 일수")
    parser.add_argument("--start-date", type=str, default=None, help="KST YYYY-MM-DD")
    parser.add_argument("--end-date", type=str, default=None, help="KST YYYY-MM-DD")
    parser.add_argument("--step", type=str, default="1m", help="프로메테우스 시계열 해상도 주기")
    return parser.parse_args()

def query_thanos_matrix(thanos_url, query, start_ts, end_ts, step):
    """
    Thanos Query API를 호출하여 시계열 매트릭스(Matrix) 데이터를 가져옵니다.
    """
    url = f"{thanos_url}/api/v1/query_range"
    params = {
        "query": query,
        "start": start_ts,
        "end": end_ts,
        "step": step
    }
    try:
        response = requests.get(url, params=params, timeout=60)
        if response.status_code == 200:
            res_json = response.json()
            if res_json.get("status") == "success":
                return res_json.get("data", {}).get("result", [])
        return []
    except Exception as e:
        print(f"    ⚠️  Thanos API 통신 에러: {str(e)}")
        return []

def main():
    args = parse_arguments()
    thanos_url = os.getenv("THANOS_QUERY_URL", DEFAULT_THANOS_URL)
    
    # ⏱️ 1. 시간 윈도우 계산 (KST 기준 자정 정렬)
    now_kst = datetime.now(KST)
    if args.start_date and args.end_date:
        start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
        end_dt = datetime.strptime(args.end_date, "%Y-%m-%d").replace(tzinfo=KST) + timedelta(days=1)
    else:
        yesterday = (now_kst - timedelta(days=args.days)).date()
        start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
        end_dt = datetime.combine(now_kst.date(), datetime.min.time()).replace(tzinfo=KST)

    print(f"🚀 [Step1 개시] Thanos 빅데이터 징수 파이프라인 구동")
    print(f"🌐 타겟 Thanos Query: {thanos_url}")
    print(f"📅 조회 시간 범위 (KST): {start_dt} ➡️ {end_dt} (해상도 주기: {args.step})")

    # 사내 멀티테넌트 전체 격리 수집용 셀렉터 기본값 정의
    base_selector = '{container!="", namespace!~"kube-system|istio-system|monitoring"}'
    queries_matrix = get_finops_promql_queries(base_selector)

    # 6시간 단위 분할 청크 루프 기동 (Thanos 커널 부하 분산)
    chunk_delta = timedelta(hours=6)
    current_chunk_start = start_dt

    while current_chunk_start < end_dt:
        current_chunk_end = min(current_chunk_start + chunk_delta, end_dt)
        chunk_str = current_chunk_start.strftime("%Y%m%d_%H")
        
        # UTC 타임스탬프 초 단위 변환
        start_ts = int(current_chunk_start.timestamp())
        end_ts = int(current_chunk_end.timestamp())

        print(f"\n⏳ 파티션 청크 [{chunk_str}] (6시간 분량) 징수 프로세스 가동...")
        
        records_master = []

        # 11대 메트릭 동적 순회 수집
        for metric_name, query_str in queries_matrix.items():
            print(f"  -> 🔍 메트릭 징수 중: {metric_name}...")
            result_set = query_thanos_matrix(thanos_url, query_str, start_ts, end_ts, args.step)
            
            if not result_set:
                continue
                
            print(f"     ✅ 수집 완료: 고유 시계열 시퀀스 {len(result_set):,}개 검출")

            for item in result_set:
                metric_labels = item.get("metric", {})
                values_list = item.get("values", [])

                # 라벨 필드 유실 대비 디폴트 폴백 수렴
                cluster   = metric_labels.get("cluster", "prod-cluster")
                namespace = metric_labels.get("namespace", "unknown")
                pod       = metric_labels.get("pod", "unknown")
                container = metric_labels.get("container", "all-volume") # PV 메트릭용 폴백
                node      = metric_labels.get("node", metric_labels.get("instance", "unknown")).split(":")[0]

                for val_pair in values_list:
                    utc_ts = float(val_pair[0])
                    # 시계열 타임스탬프를 정산 가독성을 위해 KST형 datetime 객체로 전환
                    kst_dt = datetime.fromtimestamp(utc_ts, tz=timezone.utc).astimezone(KST).replace(tzinfo=None)
                    metric_value = float(val_pair[1])

                    records_master.append({
                        "timestamp": kst_dt,
                        "cluster": cluster,
                        "namespace": namespace,
                        "pod": pod,
                        "container": container,
                        "node": node,
                        "metric_type": metric_name,
                        "value": metric_value
                    })

        # Parquet 파일 피벗 및 쓰기 레이어
        if records_master:
            print(f"💾 파티션 [{chunk_str}] 메모리 구조화 및 Parquet 피벗 가동 중...")
            df_chunk = pd.DataFrame(records_master)
            
            # 메트릭 유형별 컬럼 분할 피벗 연산 (Pivot)
            df_pivot = df_chunk.pivot_values = df_chunk.groupby(
                ["timestamp", "cluster", "namespace", "pod", "container", "node", "metric_type"]
            )["value"].max().unstack().reset_index()

            # 신규 수집된 컬럼 누락 방지 안전망 선언
            for expected_col in queries_matrix.keys():
                if expected_col not in df_pivot.columns:
                    df_pivot[expected_col] = 0.0

            out_file = RAW_DIR / f"prom_raw_{chunk_str}.parquet"
            df_pivot.to_parquet(out_path:=str(out_file), index=False)
            print(f"📦 [적재 성공] 레이크하우스 아카이빙 성료: {out_file.name} ({out_file.stat().st_size/1024:.0f} KB)")
        else:
            print(f"⚠️  주의: 청크 [{chunk_str}] 기간에 Thanos 메트릭 수집 데이터가 전무합니다.")

        current_chunk_start = current_chunk_end

    print("\n🏁 === [Step1 성공 종료] 11대 거버넌스 메트릭 자원 레이크가 완벽히 완공되었습니다. ===")

if __name__ == "__main__":
    main()

0개의 댓글