26Y01c6

Young-Kyoo Kim·약 11시간 전
"""
step1_prom_fetch.py — 1시간 분할 징수 및 AIStor 업로드 후 로컬 자동 휘발 파이프라인 (Boto3 안정성 패치판)
"""
import os  # 💡 [교정 1] os 모듈을 다이렉트로 임포트하여 환경변수 스코프 무결성 확보
import sys
import argparse
import requests
import boto3
import pandas as pd
from datetime import datetime, timedelta, timezone

# config 로드하여 환경변수 가드 자동 활성화
import config
from config import RAW_DIR, get_finops_promql_queries

KST = timezone(timedelta(hours=9))

def parse_arguments():
    parser = argparse.ArgumentParser(description="Thanos Resource Fetcher for Data Lakehouse")
    parser.add_argument("--days", type=int, default=None)
    parser.add_argument("--start-date", type=str, default=None)
    parser.add_argument("--end-date", type=str, default=None)
    parser.add_argument("--step", type=str, default="1m")
    return parser.parse_args()

def query_thanos_matrix(thanos_url, query, start_ts, end_ts, step):
    url = f"{thanos_url}/api/v1/query_range"
    params = {"query": query, "start": start_ts, "end": end_ts, "step": step}
    try:
        response = requests.get(url, params=params, timeout=60)
        if response.status_code == 200:
            res_json = response.json()
            if res_json.get("status") == "success":
                return res_json.get("data", {}).get("result", [])
        return []
    except Exception as e:
        print(f"    ⚠️  Thanos API 에러: {str(e)}")
        return []

def main():
    args = parse_arguments()
    
    # 💡 [교정 2] os.getenv를 다이렉트로 쿼리하고 안전 보장용 2중 백업 default 값 매핑
    thanos_url  = os.getenv("THANOS_QUERY_URL", "http://thanos-query.internal.zone:9090")
    bucket_name = os.getenv("MINIO_RAW_BUCKET", "devops-test")
    endpoint    = os.getenv("MINIO_ENDPOINT", "http://minio-service.internal.zone:9000")
    access_key  = os.getenv("MINIO_ACCESS_KEY", "admin")
    secret_key  = os.getenv("MINIO_SECRET_KEY", "password")
    
    now_kst = datetime.now(KST)
    
    if args.start_date and args.end_date:
        start_dt = datetime.strptime(args.start_date, "%Y-%m-%d").replace(tzinfo=KST)
        end_dt = datetime.strptime(args.end_date, "%Y-%m-%d").replace(tzinfo=KST) + timedelta(days=1)
    elif args.days:
        start_dt = (now_kst - timedelta(days=args.days)).replace(hour=0, minute=0, second=0, microsecond=0)
        end_dt = now_kst
    else:
        yesterday = (now_kst - timedelta(days=1)).date()
        start_dt = datetime.combine(yesterday, datetime.min.time()).replace(tzinfo=KST)
        end_dt = datetime.combine(now_kst.date(), datetime.min.time()).replace(tzinfo=KST)

    print(f"🚀 [Step1] 타노스 지표 1시간 분할 수집 가동")
    print(f"📅 시간 범위 (KST): {start_dt} ➡️ {end_dt}")

    base_selector = '{container!="", namespace!~"kube-system|istio-system|monitoring"}'
    queries_matrix = get_finops_promql_queries(base_selector)

    # 💡 [교정 3] region_name="us-east-1"를 명시하여 botocore의 NoneType 정규식 파싱 에러를 원천 차단
    s3_client = boto3.client(
        "s3", 
        endpoint_url=endpoint,
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_key,
        region_name="us-east-1",  # ◀ 온프레미스 S3 호환 스토리지 연동의 필수 가드레일
        config=boto3.session.Config(signature_version="s3v4")
    )

    chunk_delta = timedelta(hours=1)
    current_chunk_start = start_dt

    while current_chunk_start < end_dt:
        chunk_str = current_chunk_start.strftime("%Y%m%d_%H")
        
        start_ts = int(current_chunk_start.timestamp())
        end_ts = int(current_chunk_end:=current_chunk_start + chunk_delta).timestamp()

        print(f"⏳ 파티션 세션 백업 중... [{chunk_str}]")
        records_master = []

        for metric_name, query_str in queries_matrix.items():
            result_set = query_thanos_matrix(thanos_url, query_str, start_ts, end_ts, args.step)
            if not result_set: continue

            for item in result_set:
                metric_labels = item.get("metric", {})
                values_list = item.get("values", [])

                cluster_label = metric_labels.get("cluster", "prod-cluster")
                namespace     = metric_labels.get("namespace", "unknown")
                pod           = metric_labels.get("pod", "unknown")
                container     = metric_labels.get("container", "all-volume")
                node          = metric_labels.get("node", metric_labels.get("instance", "unknown")).split(":")[0]

                for val_pair in values_list:
                    utc_ts = float(val_pair[0])
                    kst_dt = datetime.fromtimestamp(utc_ts, tz=timezone.utc).astimezone(KST).replace(tzinfo=None)
                    metric_value = float(val_pair[1])

                    records_master.append({
                        "timestamp": kst_dt, "cluster_label": cluster_label, "namespace": namespace,
                        "pod": pod, "container": container, "node": node,
                        "metric_type": metric_name, "value": metric_value
                    })

        if records_master:
            df_chunk = pd.DataFrame(records_master)
            df_pivot = df_chunk.groupby(
                ["timestamp", "cluster", "namespace", "pod", "container", "node", "metric_type"]
            )["value"].max().unstack().reset_index()

            for expected_col in queries_matrix.keys():
                if expected_col not in df_pivot.columns:
                    df_pivot[expected_col] = 0.0

            out_file = RAW_DIR / f"prom_raw_{chunk_str}.parquet"
            df_pivot.to_parquet(str(out_file), index=False)
            
            object_key = f"raw/prom_raw_{chunk_str}.parquet"
            print(f"   ⬆️  [오브젝트 스토리지 적재] Local ➡️ AIStor S3://{bucket_name}/{object_key}")
            
            # 💡 이제 부서명/버킷명/리전이 안전하게 바인딩되어 정상 업로드됩니다.
            s3_client.upload_file(str(out_path:=out_file), bucket_name, object_key)
            
            out_file.unlink()
            print(f"   ✂️  [작업공간 휘발 완수] 로컬 캐시 파일 {out_file.name} 완전히 소거됨.")
        else:
            print(f"   ⚠️  청크 [{chunk_str}] 수집 데이터가 없어 스킵합니다.")

        current_chunk_start = current_chunk_end

    print("\n🏁 === [Step1 완료] 1시간 단위 분할 수집 및 AIStor 영구 백업 완수 ===")

if __name__ == "__main__":
    main()

0개의 댓글