26A09f

Young-Kyoo Kim·2026년 4월 9일

MinIO AIStor(Enterprise 버전)에서 제공하는 Inventory API는 수백만 개 이상의 객체를 보유한 버킷에서 파일 수와 용량을 계산할 때 가장 성능 부하가 적고 정확한 방법입니다.

기존의 mc du나 SDK의 list_objects는 실시간으로 모든 객체를 전수 조사하기 때문에 데이터가 많아질수록 서버 CPU와 네트워크에 큰 부담을 주지만, Inventory API는 백그라운드에서 주기적으로 객체 목록과 메타데이터(크기, 태그 등)를 스캔하여 보고서(CSV, Parquet 등) 형태로 생성해 둡니다.


1. Inventory 작업 설정 (YAML 사용)

MinIO AIStor에서는 YAML 설정 파일을 통해 어떤 데이터를 추출할지 정의합니다.

# inventory-config.yaml
apiVersion: v1
id: "daily-prefix-summary"    # 작업 ID
type: inventory
source:
  bucket: "my-data-bucket"    # 대상 버킷
  prefix: "specific-prefix/"  # 계산할 특정 경로
destination:
  bucket: "reports-bucket"    # 결과 보고서가 저장될 버킷
  format: "parquet"           # 분석에 용이한 Parquet 형식 추천 (CSV도 가능)
schedule:
  daily: "00:00"              # 매일 자정에 실행
fields:
  - size                      # 용량 계산을 위해 필수
  - lastModified
  - etag
  - storageClass

2. 작업을 API/CLI로 등록

작성한 YAML 파일을 MinIO에 등록하여 실행 예약(Job)을 생성합니다.

# mc 명령어를 사용하여 작업 생성
mc admin inventory add alias/ inventory-config.yaml

3. 결과 데이터 분석 (용량 및 파일 수 계산)

설정된 주기에 따라 reports-bucket에 보고서 파일이 생성됩니다. 이 파일에는 해당 Prefix 아래의 모든 객체 리스트와 각각의 size가 포함되어 있습니다.

효율적인 활용 방법:

  • DuckDB 또는 Trino 활용: 생성된 Parquet 파일을 SQL로 즉시 쿼리할 수 있습니다.
    -- DuckDB 예시
    SELECT count(*), sum(size) 
    FROM read_parquet('s3://reports-bucket/daily-prefix-summary/2026-04-09/*.parquet');
  • 성능 이점: 10억 개의 객체가 있더라도 실시간 리스팅 없이 미리 계산된 보고서 파일만 읽으면 되므로, 서버 성능에 영향을 주지 않고 수 초 내에 결과를 얻을 수 있습니다.

💡 어떤 상황에서 써야 할까요?

  • 실시간성보다 정확도와 성능이 중요할 때 (예: 전날 사용량 정산, 월간 리포트)
  • 객체 수가 1,000만 개 이상일 때 (일반적인 list 호출은 타임아웃이 발생할 수 있음)
  • 필터링 조건이 복잡할 때 (특정 태그나 특정 크기 이상의 파일만 골라낼 때)

최근 AIStor PoC를 진행 중이시라면, 이 Inventory API 기능을 통해 데이터 레이크하우스 구축 시의 스토리지 가시성을 확보하시는 것을 추천드립니다.


MinIO AIStor의 Inventory API는 대규모 데이터셋의 메타데이터를 비즈니스 로직에 맞게 가공하는 가장 표준적인 방법입니다. 요청하신 대로 [인벤토리 생성 → Python 후처리 → 통계 저장]으로 이어지는 전체 파이프라인과 자동화 코드를 설계해 드립니다.


1. 아키텍처 설계

  1. MinIO Inventory Job: 매일 특정 시각에 전체 객체 목록을 devops/ 경로에 Parquet(혹은 CSV) 형태로 저장합니다.
  2. Python Worker: devops/에 새로 생성된 리포트 파일을 읽어 하위 Prefix별로 그룹화(Group By) 연산을 수행합니다.
  3. Result Upload: 계산된 통계 결과(JSON 또는 CSV)를 stats/ 경로에 저장합니다.

2. Inventory Job 설정 (YAML)

먼저 MinIO 서버에 인벤토리 생성을 예약해야 합니다. 분석 효율을 위해 parquet 형식을 권장합니다.

# inventory-config.yaml
apiVersion: v1
id: "daily-devops-inventory"
type: inventory
source:
  bucket: "main-data"         # 분석 대상 버킷
  prefix: ""                  # 전체 혹은 특정 상위 경로
destination:
  bucket: "devops"            # 리포트가 저장될 위치
  format: "parquet"           # Python(Pandas/PyArrow)에서 읽기 매우 빠름
schedule:
  daily: "01:00"              # 매일 새벽 1시 실행
fields:
  - size
  - lastModified
  - etag

등록 명령어: mc admin inventory add alias/ inventory-config.yaml


3. Python 후처리 프로그램

pandaspyarrow 라이브러리를 사용하면 수백만 행의 데이터도 매우 빠르게 처리할 수 있습니다.

import pandas as pd
from minio import Minio
import io
import json
from datetime import datetime

# 1. MinIO 클라이언트 설정
client = Minio(
    "minio-endpoint:9000",
    access_key="YOUR_ACCESS_KEY",
    secret_key="YOUR_SECRET_KEY",
    secure=False
)

def process_inventory():
    # 2. devops 버킷에서 가장 최근의 인벤토리 파일 가져오기
    # (실제 환경에서는 날짜별 경로를 탐색하는 로직 추가 권장)
    objects = client.list_objects("devops", recursive=True)
    latest_report = sorted([obj.object_name for obj in objects if obj.object_name.endswith('.parquet')])[-1]
    
    print(f"Processing report: {latest_report}")
    
    # 3. 인벤토리 파일 읽기 (메모리 효율을 위해 BytesIO 사용)
    response = client.get_object("devops", latest_report)
    df = pd.read_parquet(io.BytesIO(response.read()))
    
    # 4. 하위 Prefix 추출 로직 (예: depth 2까지)
    # 파일명이 'a/b/c.txt'일 경우 'a/b'를 prefix로 간주
    def get_prefix(path):
        parts = path.split('/')
        return '/'.join(parts[:2]) if len(parts) > 2 else 'root'

    df['prefix_group'] = df['name'].apply(get_prefix)
    
    # 5. 통계 계산
    stats = df.groupby('prefix_group').agg(
        file_count=('name', 'count'),
        total_size_bytes=('size', 'sum')
    ).reset_index()
    
    # 6. 결과를 JSON으로 변환하여 stats/ 경로에 저장
    today = datetime.now().strftime('%Y-%m-%d')
    result_json = stats.to_json(orient='records', indent=4)
    result_bytes = result_json.encode('utf-8')
    
    target_path = f"stats/daily_summary_{today}.json"
    client.put_object(
        "main-data", 
        target_path, 
        io.BytesIO(result_bytes), 
        len(result_bytes),
        content_type="application/json"
    )
    
    print(f"Success! Statistics saved to {target_path}")

if __name__ == "__main__":
    process_inventory()

4. 운영 팁

  • n8n 또는 Airflow 연동: 이 Python 스크립트를 도커라이징하여 n8nCode NodeAirflowPythonOperator로 스케줄링하면 실패 시 알람을 받는 등 관리가 훨씬 쉬워집니다.
  • 증분 분석: 데이터가 너무 크다면 전체 인벤토리가 아닌 lastModified 필드를 이용해 최근 24시간 내 변동분만 계산하도록 로직을 고도화할 수 있습니다.
  • 시각화: stats/ 경로에 저장된 JSON은 추후 Grafana의 JSON API Datasource를 통해 대시보드로 구성하기 매우 좋습니다.

이 파이프라인을 구축해 두시면 매일 아침 어떤 부서나 프로젝트에서 스토리지를 얼마나 점유하고 있는지 자동으로 리포트를 받으실 수 있을 거예요. 궁금한 구현 디테일이 있다면 말씀해 주세요!

0개의 댓글