k8s_pod_scan.py

진웅·2025년 7월 3일

K8S Basics

목록 보기
15/39

#!/usr/bin/env python3
"""
Kubernetes Pod Image Information Collector
이 스크립트는 K8s 클러스터의 모든 Pod에서 사용하는 이미지 정보를 수집합니다.
"""

import subprocess
import json
import sys
from typing import List, Dict, Tuple
from collections import defaultdict
import argparse

def run_kubectl_command(command: List[str]) -> Tuple[bool, str]:
"""kubectl 명령어를 실행하고 결과를 반환합니다."""
try:
result = subprocess.run(
command,
capture_output=True,
text=True,
check=True
)
return True, result.stdout
except subprocess.CalledProcessError as e:
print(f"Error running kubectl command: {e}")
print(f"Error output: {e.stderr}")
return False, ""
except FileNotFoundError:
print("kubectl command not found. Please install kubectl and ensure it's in PATH.")
return False, ""

def get_image_size(image_name: str) -> str:
"""Docker inspect를 사용하여 이미지 크기를 가져옵니다."""
try:

    # 먼저 이미지를 pull 시도 (이미 있다면 무시됨)
    subprocess.run(
        ["docker", "pull", image_name], 
        capture_output=True, 
        text=True, 
        timeout=60
    )
    
    # 이미지 크기 정보 가져오기
    result = subprocess.run(
        ["docker", "inspect", image_name, "--format={{.Size}}"], 
        capture_output=True, 
        text=True, 
        check=True
    )
    
    size_bytes = int(result.stdout.strip())
    return format_size(size_bytes)
    
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, ValueError):
    return "N/A"
except FileNotFoundError:
    return "Docker not available"

def format_size(size_bytes: int) -> str:
"""바이트를 읽기 쉬운 형태로 변환합니다."""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} PB"

def get_pods_info(namespace: str = None) -> List[Dict]:
"""모든 Pod의 정보를 가져옵니다."""
command = ["kubectl", "get", "pods", "-o", "json"]

if namespace:
    command.extend(["-n", namespace])
else:
    command.append("--all-namespaces")

success, output = run_kubectl_command(command)

if not success:
    return []

try:
    pods_data = json.loads(output)
    return pods_data.get("items", [])
except json.JSONDecodeError as e:
    print(f"Error parsing JSON: {e}")
    return []

def extract_image_info(pods: List[Dict], get_sizes: bool = False) -> List[Dict]:
"""Pod 정보에서 이미지 정보를 추출합니다."""
image_info = []
processed_images = set()

for pod in pods:
    pod_name = pod.get("metadata", {}).get("name", "Unknown")
    namespace = pod.get("metadata", {}).get("namespace", "default")
    
    # Init containers 처리
    init_containers = pod.get("spec", {}).get("initContainers", [])
    for container in init_containers:
        image = container.get("image", "")
        if image and image not in processed_images:
            size = get_image_size(image) if get_sizes else "Not checked"
            image_info.append({
                "pod_name": pod_name,
                "namespace": namespace,
                "container_name": container.get("name", ""),
                "container_type": "init",
                "image": image,
                "size": size
            })
            processed_images.add(image)
    
    # Main containers 처리
    containers = pod.get("spec", {}).get("containers", [])
    for container in containers:
        image = container.get("image", "")
        if image and image not in processed_images:
            size = get_image_size(image) if get_sizes else "Not checked"
            image_info.append({
                "pod_name": pod_name,
                "namespace": namespace,
                "container_name": container.get("name", ""),
                "container_type": "main",
                "image": image,
                "size": size
            })
            processed_images.add(image)

return image_info

def summarize_images(image_info: List[Dict]) -> Dict:
"""이미지 사용 통계를 생성합니다."""
image_count = defaultdict(int)
registry_count = defaultdict(int)

for info in image_info:
    image = info["image"]
    image_count[image] += 1
    
    # 레지스트리 추출 (first part before /)
    if "/" in image:
        registry = image.split("/")[0]
        if "." in registry or ":" in registry:  # 레지스트리로 보이는 경우
            registry_count[registry] += 1
        else:
            registry_count["docker.io"] += 1  # Docker Hub
    else:
        registry_count["docker.io"] += 1  # Docker Hub

return {
    "total_unique_images": len(image_count),
    "most_used_images": dict(sorted(image_count.items(), key=lambda x: x[1], reverse=True)[:10]),
    "registry_distribution": dict(registry_count)
}

def print_results(image_info: List[Dict], summary: Dict, output_format: str = "table"):
"""결과를 출력합니다."""
if output_format == "json":
output = {
"images": image_info,
"summary": summary
}
print(json.dumps(output, indent=2, ensure_ascii=False))
return

# Table format
print(f"\n{'='*80}")
print(f"📊 Kubernetes Pod Image Information Report")
print(f"{'='*80}")

# Summary
print(f"\n📈 Summary:")
print(f"  • Total unique images: {summary['total_unique_images']}")
print(f"  • Registry distribution:")
for registry, count in summary['registry_distribution'].items():
    print(f"    - {registry}: {count} images")

# Most used images
print(f"\n🔥 Most used images:")
for image, count in list(summary['most_used_images'].items())[:5]:
    print(f"    - {image} (used {count} times)")

# Detailed list
print(f"\n📋 Detailed Image List:")
print(f"{'Namespace':<15} {'Pod Name':<25} {'Container':<20} {'Type':<8} {'Size':<12} {'Image'}")
print(f"{'-'*15} {'-'*25} {'-'*20} {'-'*8} {'-'*12} {'-'*50}")

for info in sorted(image_info, key=lambda x: (x['namespace'], x['pod_name'])):
    print(f"{info['namespace']:<15} {info['pod_name']:<25} {info['container_name']:<20} "
          f"{info['container_type']:<8} {info['size']:<12} {info['image']}")

def main():
parser = argparse.ArgumentParser(description="List Kubernetes pod images with sizes")
parser.add_argument("-n", "--namespace", help="Specific namespace to check")
parser.add_argument("-s", "--sizes", action="store_true",
help="Get actual image sizes (requires Docker)")
parser.add_argument("-f", "--format", choices=["table", "json"], default="table",
help="Output format")
parser.add_argument("--no-summary", action="store_true",
help="Skip summary statistics")

args = parser.parse_args()

print("🔍 Collecting pod information from Kubernetes cluster...")

# kubectl 연결 확인
success, _ = run_kubectl_command(["kubectl", "cluster-info"])
if not success:
    print("❌ Cannot connect to Kubernetes cluster. Please check your kubectl configuration.")
    sys.exit(1)

# Pod 정보 수집
pods = get_pods_info(args.namespace)
if not pods:
    print("❌ No pods found or unable to retrieve pod information.")
    sys.exit(1)

print(f"📦 Found {len(pods)} pods. Extracting image information...")

if args.sizes:
    print("⚠️  Getting image sizes... This may take a while as it requires Docker pulls.")

# 이미지 정보 추출
image_info = extract_image_info(pods, args.sizes)

if not image_info:
    print("❌ No image information found.")
    sys.exit(1)

# 통계 생성
summary = summarize_images(image_info) if not args.no_summary else {}

# 결과 출력
print_results(image_info, summary, args.format)

print(f"\n✅ Collected information for {len(image_info)} unique images.")

if name == "main":
main()

profile
bytebliss

0개의 댓글