26A09g

Young-Kyoo Kim·2026년 4월 9일
import pandas as pd
from minio import Minio
import io
import json
from datetime import datetime

# 1. MinIO 클라이언트 설정 (교수님 환경에 맞게 수정)
client = Minio(
    "10.20.30.200:9000", # 앞서 말씀하신 IP 대역 중 마스터 노드
    access_key="YOUR_ACCESS_KEY",
    secret_key="YOUR_SECRET_KEY",
    secure=False
)

def process_inventory():
    try:
        # 2. devops 버킷에서 최신 리포트 찾기
        objects = client.list_objects("devops", recursive=True)
        # 시간순 정렬을 위해 리스트 확보 (파일명에 날짜/시간이 포함된 경우 기준)
        report_files = sorted([obj.object_name for obj in objects if obj.object_name.endswith('.parquet')])
        
        if not report_files:
            print("No inventory files found in 'devops' bucket.")
            return

        latest_report = report_files[-1]
        print(f"Processing report: {latest_report}")
        
        # 3. Parquet 읽기
        response = client.get_object("devops", latest_report)
        df = pd.read_parquet(io.BytesIO(response.read()))
        
        # [참고] 컬럼명 확인용 (필요시 주석 해제)
        # print("Columns found:", df.columns.tolist())

        # 4. 데이터 집계 (이미 prefix와 total_bytes가 있는 경우)
        # depth 필드를 활용해 특정 깊이의 데이터만 추출하거나 전체 합산
        # 여기서는 prefix별로 합산하는 예시입니다.
        stats = df.groupby('prefix').agg({
            'object_count': 'sum',
            'total_bytes': 'sum'
        }).reset_index()

        # 5. 결과 변환 및 저장
        today = datetime.now().strftime('%Y-%m-%d')
        result_json = stats.to_json(orient='records', indent=4, force_ascii=False)
        result_bytes = result_json.encode('utf-8')
        
        # 'main-data' 버킷의 'stats/' 경로에 저장
        target_path = f"stats/daily_summary_{today}.json"
        client.put_object(
            "main-data", 
            target_path, 
            io.BytesIO(result_bytes), 
            len(result_bytes),
            content_type="application/json"
        )
        
        print(f"Success! Statistics saved to {target_path}")
        print(stats.head()) # 콘솔에서 결과 일부 확인

    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == "__main__":
    process_inventory()

0개의 댓글