Kubernetes deployment에서 이미지 정보를 추출

진웅·2025년 7월 9일

K8S Basics

목록 보기
19/41

Kubernetes deployment에서 이미지 정보를 추출하여 Excel 파일로 만드는 Python 스크립트를 작성해드리겠습니다.이 스크립트를 실행하기 위해 필요한 패키지를 설치하는 requirements.txt 파일도 만들어드리겠습니다:## 사용법:

  1. 패키지 설치:
    pip install -r requirements.txt

requirements.txt

pandas>=1.5.0
openpyxl>=3.0.0
requests>=2.28.0
  1. 스크립트 실행:
    python k8s_image_extractor.py

주요 기능:

  1. 모든 네임스페이스의 deployment 조회: kubectl get deployments --all-namespaces로 모든 deployment를 가져옵니다.

  2. 이미지 정보 파싱:

    • Docker Hub 이미지 (nginx:latest)
    • 프라이빗 레지스트리 이미지 (gcr.io/project/image:tag)
    • 포트가 있는 레지스트리 (localhost:5000/image:tag)
  3. 이미지 크기 조회: Docker Hub API를 통해 이미지 크기를 가져옵니다.

  4. Excel 파일 생성:

    • 컬럼: Namespace, Deployment, Container, Registry, Repository, Tag, Size, Full Image
    • 자동 컬럼 너비 조정
    • 타임스탬프가 포함된 파일명

출력 예시:

Excel 파일에는 다음과 같은 정보가 포함됩니다:

  • Repository: library/nginx, gcr.io/project/app
  • Tag: latest, v1.0.0, stable
  • Size: 142.3 MB, 1.2 GB

제한사항:

  • Docker Hub 이외의 프라이빗 레지스트리는 인증이 필요하여 크기 정보를 가져올 수 없습니다.
  • 프라이빗 레지스트리의 경우 해당 레지스트리의 API를 사용하도록 코드를 수정해야 합니다.

소스

#!/usr/bin/env python3
"""
Kubernetes Deployment Image Information Extractor

This script extracts image information from all Kubernetes deployments,
gets image sizes, and exports the data to an Excel file.

Requirements:
- kubectl configured and accessible
- Python packages: pandas, openpyxl, requests
"""

import subprocess
import json
import pandas as pd
import re
import requests
from urllib.parse import urlparse
import sys
from datetime import datetime

def run_kubectl_command(command):
    """Execute kubectl command and return the output"""
    try:
        result = subprocess.run(
            command, 
            shell=True, 
            capture_output=True, 
            text=True, 
            check=True
        )
        return result.stdout
    except subprocess.CalledProcessError as e:
        print(f"Error executing kubectl command: {e}")
        print(f"Error output: {e.stderr}")
        return None

def get_all_deployments():
    """Get all deployments from all namespaces"""
    print("Getting all deployments...")
    command = "kubectl get deployments --all-namespaces -o json"
    output = run_kubectl_command(command)
    
    if output is None:
        return []
    
    try:
        deployments_data = json.loads(output)
        return deployments_data.get('items', [])
    except json.JSONDecodeError as e:
        print(f"Error parsing JSON: {e}")
        return []

def parse_image_info(image_string):
    """Parse image string to extract repository, tag, and registry info"""
    # Handle different image formats:
    # - nginx:latest
    # - docker.io/library/nginx:latest
    # - gcr.io/project/image:tag
    # - localhost:5000/image:tag
    
    if ':' in image_string:
        # Split by last colon to separate tag
        parts = image_string.rsplit(':', 1)
        image_repo = parts[0]
        tag = parts[1]
    else:
        image_repo = image_string
        tag = 'latest'
    
    # Extract registry and repository
    if '/' in image_repo:
        parts = image_repo.split('/')
        if '.' in parts[0] or ':' in parts[0]:  # Likely a registry
            registry = parts[0]
            repository = '/'.join(parts[1:])
        else:
            registry = 'docker.io'
            repository = image_repo
    else:
        registry = 'docker.io'
        repository = f'library/{image_repo}'
    
    return {
        'registry': registry,
        'repository': repository,
        'tag': tag,
        'full_image': image_string
    }

def get_dockerhub_image_size(repository, tag):
    """Get image size from Docker Hub API"""
    try:
        # Docker Hub API v2
        if repository.startswith('library/'):
            repo_name = repository.replace('library/', '')
            url = f"https://hub.docker.com/v2/repositories/library/{repo_name}/tags/{tag}"
        else:
            url = f"https://hub.docker.com/v2/repositories/{repository}/tags/{tag}"
        
        response = requests.get(url, timeout=10)
        if response.status_code == 200:
            data = response.json()
            if 'full_size' in data:
                return data['full_size']
    except Exception as e:
        print(f"Error getting size for {repository}:{tag}: {e}")
    
    return None

def get_image_size(registry, repository, tag):
    """Get image size based on registry"""
    if registry == 'docker.io':
        return get_dockerhub_image_size(repository, tag)
    else:
        # For other registries, we can't easily get size without authentication
        # You might need to implement specific logic for your registries
        return None

def format_size(size_bytes):
    """Format size in bytes to human readable format"""
    if size_bytes is None:
        return "Unknown"
    
    for unit in ['B', 'KB', 'MB', 'GB']:
        if size_bytes < 1024.0:
            return f"{size_bytes:.1f} {unit}"
        size_bytes /= 1024.0
    return f"{size_bytes:.1f} TB"

def extract_images_from_deployments(deployments):
    """Extract image information from deployments"""
    images = []
    
    for deployment in deployments:
        namespace = deployment.get('metadata', {}).get('namespace', 'default')
        deployment_name = deployment.get('metadata', {}).get('name', 'unknown')
        
        spec = deployment.get('spec', {})
        template = spec.get('template', {})
        pod_spec = template.get('spec', {})
        
        # Extract from containers
        containers = pod_spec.get('containers', [])
        for container in containers:
            if 'image' in container:
                image_info = parse_image_info(container['image'])
                images.append({
                    'namespace': namespace,
                    'deployment': deployment_name,
                    'container': container.get('name', 'unknown'),
                    'registry': image_info['registry'],
                    'repository': image_info['repository'],
                    'tag': image_info['tag'],
                    'full_image': image_info['full_image']
                })
        
        # Extract from init containers
        init_containers = pod_spec.get('initContainers', [])
        for container in init_containers:
            if 'image' in container:
                image_info = parse_image_info(container['image'])
                images.append({
                    'namespace': namespace,
                    'deployment': deployment_name,
                    'container': f"{container.get('name', 'unknown')} (init)",
                    'registry': image_info['registry'],
                    'repository': image_info['repository'],
                    'tag': image_info['tag'],
                    'full_image': image_info['full_image']
                })
    
    return images

def main():
    """Main function"""
    print("Kubernetes Deployment Image Extractor")
    print("=" * 40)
    
    # Check if kubectl is available
    if run_kubectl_command("kubectl version --client") is None:
        print("Error: kubectl is not available or not configured properly")
        sys.exit(1)
    
    # Get all deployments
    deployments = get_all_deployments()
    if not deployments:
        print("No deployments found or error occurred")
        sys.exit(1)
    
    print(f"Found {len(deployments)} deployments")
    
    # Extract image information
    images = extract_images_from_deployments(deployments)
    if not images:
        print("No images found in deployments")
        sys.exit(1)
    
    print(f"Found {len(images)} images")
    
    # Get image sizes (this might take a while for many images)
    print("Getting image sizes...")
    for i, image in enumerate(images):
        print(f"Processing {i+1}/{len(images)}: {image['full_image']}")
        size = get_image_size(image['registry'], image['repository'], image['tag'])
        image['size_bytes'] = size
        image['size_formatted'] = format_size(size)
    
    # Create DataFrame
    df = pd.DataFrame(images)
    
    # Sort by repository, then by tag
    df = df.sort_values(['repository', 'tag'])
    
    # Prepare final columns for Excel
    final_columns = [
        'namespace',
        'deployment', 
        'container',
        'registry',
        'repository',
        'tag',
        'size_formatted',
        'full_image'
    ]
    
    df_final = df[final_columns].copy()
    df_final.columns = [
        'Namespace',
        'Deployment',
        'Container',
        'Registry',
        'Repository',
        'Tag',
        'Size',
        'Full Image'
    ]
    
    # Generate filename with timestamp
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"k8s_deployment_images_{timestamp}.xlsx"
    
    # Export to Excel
    print(f"Exporting to {filename}...")
    with pd.ExcelWriter(filename, engine='openpyxl') as writer:
        df_final.to_excel(writer, sheet_name='Deployment Images', index=False)
        
        # Auto-adjust column widths
        worksheet = writer.sheets['Deployment Images']
        for column in worksheet.columns:
            max_length = 0
            column_letter = column[0].column_letter
            for cell in column:
                try:
                    if len(str(cell.value)) > max_length:
                        max_length = len(str(cell.value))
                except:
                    pass
            adjusted_width = min(max_length + 2, 50)
            worksheet.column_dimensions[column_letter].width = adjusted_width
    
    print(f"Successfully exported {len(df_final)} image records to {filename}")
    
    # Print summary
    print("\nSummary:")
    print(f"Total images: {len(df_final)}")
    print(f"Unique repositories: {df_final['Repository'].nunique()}")
    print(f"Registries used: {', '.join(df_final['Registry'].unique())}")
    
    # Show sample data
    print("\nSample data:")
    print(df_final.head(10).to_string(index=False))

if __name__ == "__main__":
    main()

export NEXUS_USERNAME="myuser"
export NEXUS_PASSWORD="mypass"

소스 2 (인증 정보 추가 )

#!/usr/bin/env python3
"""
Kubernetes Deployment Image Information Extractor

This script extracts image information from all Kubernetes deployments,
gets image sizes, and exports the data to an Excel file.

Requirements:
- kubectl configured and accessible
- Python packages: pandas, openpyxl, requests
"""

import subprocess
import json
import pandas as pd
import re
import requests
from urllib.parse import urlparse
import sys
import os
from datetime import datetime

def run_kubectl_command(command):
    """Execute kubectl command and return the output"""
    try:
        result = subprocess.run(
            command, 
            shell=True, 
            capture_output=True, 
            text=True, 
            check=True
        )
        return result.stdout
    except subprocess.CalledProcessError as e:
        print(f"Error executing kubectl command: {e}")
        print(f"Error output: {e.stderr}")
        return None

def get_all_deployments():
    """Get all deployments from all namespaces"""
    print("Getting all deployments...")
    command = "kubectl get deployments --all-namespaces -o json"
    output = run_kubectl_command(command)
    
    if output is None:
        return []
    
    try:
        deployments_data = json.loads(output)
        return deployments_data.get('items', [])
    except json.JSONDecodeError as e:
        print(f"Error parsing JSON: {e}")
        return []

def parse_image_info(image_string):
    """Parse image string to extract repository, tag, and registry info"""
    # Handle different image formats:
    # - nginx:latest
    # - docker.io/library/nginx:latest
    # - gcr.io/project/image:tag
    # - localhost:5000/image:tag
    
    if ':' in image_string:
        # Split by last colon to separate tag
        parts = image_string.rsplit(':', 1)
        image_repo = parts[0]
        tag = parts[1]
    else:
        image_repo = image_string
        tag = 'latest'
    
    # Extract registry and repository
    if '/' in image_repo:
        parts = image_repo.split('/')
        if '.' in parts[0] or ':' in parts[0]:  # Likely a registry
            registry = parts[0]
            repository = '/'.join(parts[1:])
        else:
            registry = 'docker.io'
            repository = image_repo
    else:
        registry = 'docker.io'
        repository = f'library/{image_repo}'
    
    return {
        'registry': registry,
        'repository': repository,
        'tag': tag,
        'full_image': image_string
    }

def get_dockerhub_image_size(repository, tag):
    """Get image size from Docker Hub API"""
    try:
        # Docker Hub API v2
        if repository.startswith('library/'):
            repo_name = repository.replace('library/', '')
            url = f"https://hub.docker.com/v2/repositories/library/{repo_name}/tags/{tag}"
        else:
            url = f"https://hub.docker.com/v2/repositories/{repository}/tags/{tag}"
        
        response = requests.get(url, timeout=10)
        if response.status_code == 200:
            data = response.json()
            if 'full_size' in data:
                return data['full_size']
    except Exception as e:
        print(f"Error getting size for {repository}:{tag}: {e}")
    
    return None

def get_nexus_image_size(registry, repository, tag, username=None, password=None):
    """Get image size from Nexus Registry API"""
    try:
        # Nexus Registry API v2
        # URL format: https://nexus.example.com/repository/docker-repo/v2/{repository}/manifests/{tag}
        
        # Clean up repository name for API call
        repo_name = repository.replace('/', '%2F')  # URL encode slashes
        
        # Try different API endpoints that Nexus commonly uses
        possible_urls = [
            f"https://{registry}/repository/docker-hosted/v2/{repository}/manifests/{tag}",
            f"https://{registry}/repository/docker-proxy/v2/{repository}/manifests/{tag}",
            f"https://{registry}/repository/docker-group/v2/{repository}/manifests/{tag}",
            f"https://{registry}/v2/{repository}/manifests/{tag}",
            f"https://{registry}/service/rest/v1/search?repository=docker-hosted&name={repository}&version={tag}",
            f"https://{registry}/service/rest/v1/search?repository=docker-proxy&name={repository}&version={tag}",
            f"https://{registry}/service/rest/v1/search?repository=docker-group&name={repository}&version={tag}"
        ]
        
        # Set up authentication
        auth = None
        if username and password:
            auth = (username, password)
        
        headers = {
            'Accept': 'application/vnd.docker.distribution.manifest.v2+json, application/vnd.docker.distribution.manifest.v1+json'
        }
        
        for url in possible_urls:
            try:
                response = requests.get(url, auth=auth, headers=headers, timeout=10, verify=False)
                
                if response.status_code == 200:
                    data = response.json()
                    
                    # Handle different response formats
                    if 'items' in data and len(data['items']) > 0:
                        # Nexus search API response
                        item = data['items'][0]
                        if 'assets' in item and len(item['assets']) > 0:
                            # Sum up all layer sizes
                            total_size = 0
                            for asset in item['assets']:
                                if 'fileSize' in asset:
                                    total_size += asset['fileSize']
                            return total_size
                    
                    elif 'config' in data:
                        # Docker manifest v2 format
                        config_size = data['config'].get('size', 0)
                        layers_size = sum(layer.get('size', 0) for layer in data.get('layers', []))
                        return config_size + layers_size
                    
                    elif 'architecture' in data:
                        # Docker manifest v1 format  
                        return data.get('size', 0)
                        
            except Exception as e:
                print(f"Error trying URL {url}: {e}")
                continue
        
        # If direct API calls fail, try to get manifest and calculate size
        manifest_url = f"https://{registry}/v2/{repository}/manifests/{tag}"
        response = requests.get(manifest_url, auth=auth, headers=headers, timeout=10, verify=False)
        
        if response.status_code == 200:
            manifest = response.json()
            if 'layers' in manifest:
                return sum(layer.get('size', 0) for layer in manifest['layers'])
        
    except Exception as e:
        print(f"Error getting size for {registry}/{repository}:{tag}: {e}")
    
    return None

def get_image_size(registry, repository, tag):
    """Get image size based on registry"""
    
    # Configuration for Nexus registry
    NEXUS_REGISTRIES = {
        # Add your Nexus registry configurations here
        # Format: 'registry_host': {'username': 'your_username', 'password': 'your_password'}
        # Example:
        # 'nexus.company.com': {'username': 'admin', 'password': 'admin123'},
        # 'nexus.company.com:8443': {'username': 'admin', 'password': 'admin123'},
    }
    
    if registry == 'docker.io':
        return get_dockerhub_image_size(repository, tag)
    elif registry in NEXUS_REGISTRIES:
        # Use configured credentials for Nexus
        creds = NEXUS_REGISTRIES[registry]
        return get_nexus_image_size(registry, repository, tag, 
                                   creds.get('username'), creds.get('password'))
    else:
        # Try to get credentials from environment variables
        username = os.getenv(f'NEXUS_USERNAME_{registry.replace(".", "_").replace(":", "_").upper()}')
        password = os.getenv(f'NEXUS_PASSWORD_{registry.replace(".", "_").replace(":", "_").upper()}')
        
        if not username:
            username = os.getenv('NEXUS_USERNAME')
            password = os.getenv('NEXUS_PASSWORD')
        
        # Try to detect if it's a Nexus registry and attempt without auth first
        if 'nexus' in registry.lower() or any(port in registry for port in [':8081', ':8443', ':5000']):
            # Try without authentication first
            size = get_nexus_image_size(registry, repository, tag)
            if size is not None:
                return size
            
            # Try with environment variables
            if username and password:
                size = get_nexus_image_size(registry, repository, tag, username, password)
                if size is not None:
                    return size
            
            # If no size found, try with common default credentials
            common_creds = [
                ('admin', 'admin123'),
                ('admin', 'admin'),
                ('nexus', 'nexus'),
                ('docker', 'docker')
            ]
            
            for username, password in common_creds:
                size = get_nexus_image_size(registry, repository, tag, username, password)
                if size is not None:
                    return size
        
        # For other registries, we can't easily get size without authentication
        return None

def format_size(size_bytes):
    """Format size in bytes to human readable format"""
    if size_bytes is None:
        return "Unknown"
    
    for unit in ['B', 'KB', 'MB', 'GB']:
        if size_bytes < 1024.0:
            return f"{size_bytes:.1f} {unit}"
        size_bytes /= 1024.0
    return f"{size_bytes:.1f} TB"

def extract_images_from_deployments(deployments):
    """Extract image information from deployments"""
    images = []
    
    for deployment in deployments:
        namespace = deployment.get('metadata', {}).get('namespace', 'default')
        deployment_name = deployment.get('metadata', {}).get('name', 'unknown')
        
        spec = deployment.get('spec', {})
        template = spec.get('template', {})
        pod_spec = template.get('spec', {})
        
        # Extract from containers
        containers = pod_spec.get('containers', [])
        for container in containers:
            if 'image' in container:
                image_info = parse_image_info(container['image'])
                images.append({
                    'namespace': namespace,
                    'deployment': deployment_name,
                    'container': container.get('name', 'unknown'),
                    'registry': image_info['registry'],
                    'repository': image_info['repository'],
                    'tag': image_info['tag'],
                    'full_image': image_info['full_image']
                })
        
        # Extract from init containers
        init_containers = pod_spec.get('initContainers', [])
        for container in init_containers:
            if 'image' in container:
                image_info = parse_image_info(container['image'])
                images.append({
                    'namespace': namespace,
                    'deployment': deployment_name,
                    'container': f"{container.get('name', 'unknown')} (init)",
                    'registry': image_info['registry'],
                    'repository': image_info['repository'],
                    'tag': image_info['tag'],
                    'full_image': image_info['full_image']
                })
    
    return images

def main():
    """Main function"""
    print("Kubernetes Deployment Image Extractor")
    print("=" * 40)
    
    # Check if kubectl is available
    if run_kubectl_command("kubectl version --client") is None:
        print("Error: kubectl is not available or not configured properly")
        sys.exit(1)
    
    # Get all deployments
    deployments = get_all_deployments()
    if not deployments:
        print("No deployments found or error occurred")
        sys.exit(1)
    
    print(f"Found {len(deployments)} deployments")
    
    # Extract image information
    images = extract_images_from_deployments(deployments)
    if not images:
        print("No images found in deployments")
        sys.exit(1)
    
    print(f"Found {len(images)} images")
    
    # Get image sizes (this might take a while for many images)
    print("Getting image sizes...")
    for i, image in enumerate(images):
        print(f"Processing {i+1}/{len(images)}: {image['full_image']}")
        size = get_image_size(image['registry'], image['repository'], image['tag'])
        image['size_bytes'] = size
        image['size_formatted'] = format_size(size)
    
    # Create DataFrame
    df = pd.DataFrame(images)
    
    # Sort by repository, then by tag
    df = df.sort_values(['repository', 'tag'])
    
    # Prepare final columns for Excel
    final_columns = [
        'namespace',
        'deployment', 
        'container',
        'registry',
        'repository',
        'tag',
        'size_formatted',
        'full_image'
    ]
    
    df_final = df[final_columns].copy()
    df_final.columns = [
        'Namespace',
        'Deployment',
        'Container',
        'Registry',
        'Repository',
        'Tag',
        'Size',
        'Full Image'
    ]
    
    # Generate filename with timestamp
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"k8s_deployment_images_{timestamp}.xlsx"
    
    # Export to Excel
    print(f"Exporting to {filename}...")
    with pd.ExcelWriter(filename, engine='openpyxl') as writer:
        df_final.to_excel(writer, sheet_name='Deployment Images', index=False)
        
        # Auto-adjust column widths
        worksheet = writer.sheets['Deployment Images']
        for column in worksheet.columns:
            max_length = 0
            column_letter = column[0].column_letter
            for cell in column:
                try:
                    if len(str(cell.value)) > max_length:
                        max_length = len(str(cell.value))
                except:
                    pass
            adjusted_width = min(max_length + 2, 50)
            worksheet.column_dimensions[column_letter].width = adjusted_width
    
    print(f"Successfully exported {len(df_final)} image records to {filename}")
    
    # Print summary
    print("\nSummary:")
    print(f"Total images: {len(df_final)}")
    print(f"Unique repositories: {df_final['Repository'].nunique()}")
    print(f"Registries used: {', '.join(df_final['Registry'].unique())}")
    
    # Show sample data
    print("\nSample data:")
    print(df_final.head(10).to_string(index=False))

if __name__ == "__main__":
    main()

소스3 (우회에서 사이즈 추측)

#!/usr/bin/env python3
"""
Kubernetes Deployment Image Information Extractor

This script extracts image information from all Kubernetes deployments,
gets image sizes, and exports the data to an Excel file.

Requirements:
- kubectl configured and accessible
- Python packages: pandas, openpyxl, requests
"""

import subprocess
import json
import pandas as pd
import re
import requests
from urllib.parse import urlparse
import sys
import os
from datetime import datetime

def run_kubectl_command(command):
    """Execute kubectl command and return the output"""
    try:
        result = subprocess.run(
            command, 
            shell=True, 
            capture_output=True, 
            text=True, 
            check=True
        )
        return result.stdout
    except subprocess.CalledProcessError as e:
        print(f"Error executing kubectl command: {e}")
        print(f"Error output: {e.stderr}")
        return None

def get_all_deployments():
    """Get all deployments from all namespaces"""
    print("Getting all deployments...")
    command = "kubectl get deployments --all-namespaces -o json"
    output = run_kubectl_command(command)
    
    if output is None:
        return []
    
    try:
        deployments_data = json.loads(output)
        return deployments_data.get('items', [])
    except json.JSONDecodeError as e:
        print(f"Error parsing JSON: {e}")
        return []

def parse_image_info(image_string):
    """Parse image string to extract repository, tag, and registry info"""
    # Handle different image formats:
    # - nginx:latest
    # - docker.io/library/nginx:latest
    # - gcr.io/project/image:tag
    # - localhost:5000/image:tag
    
    if ':' in image_string:
        # Split by last colon to separate tag
        parts = image_string.rsplit(':', 1)
        image_repo = parts[0]
        tag = parts[1]
    else:
        image_repo = image_string
        tag = 'latest'
    
    # Extract registry and repository
    if '/' in image_repo:
        parts = image_repo.split('/')
        if '.' in parts[0] or ':' in parts[0]:  # Likely a registry
            registry = parts[0]
            repository = '/'.join(parts[1:])
        else:
            registry = 'docker.io'
            repository = image_repo
    else:
        registry = 'docker.io'
        repository = f'library/{image_repo}'
    
    return {
        'registry': registry,
        'repository': repository,
        'tag': tag,
        'full_image': image_string
    }

def get_dockerhub_image_size(repository, tag):
    """Get image size from Docker Hub API"""
    try:
        # Docker Hub API v2
        if repository.startswith('library/'):
            repo_name = repository.replace('library/', '')
            url = f"https://hub.docker.com/v2/repositories/library/{repo_name}/tags/{tag}"
        else:
            url = f"https://hub.docker.com/v2/repositories/{repository}/tags/{tag}"
        
        response = requests.get(url, timeout=10)
        if response.status_code == 200:
            data = response.json()
            if 'full_size' in data:
                return data['full_size']
    except Exception as e:
        print(f"Error getting size for {repository}:{tag}: {e}")
    
    return None

def get_image_size_from_local_docker(full_image):
    """Get image size from local Docker daemon"""
    try:
        # Try to get image info from local Docker
        result = subprocess.run(
            f"docker images {full_image} --format 'table {{.Size}}'", 
            shell=True, 
            capture_output=True, 
            text=True
        )
        
        if result.returncode == 0:
            lines = result.stdout.strip().split('\n')
            if len(lines) > 1:  # Skip header
                size_str = lines[1].strip()
                return parse_docker_size(size_str)
                
    except Exception as e:
        print(f"Error getting local Docker image size: {e}")
    
    return None

def get_image_size_from_kubectl_describe(namespace, deployment_name, container_name):
    """Get image size from kubectl describe pod"""
    try:
        # Get pods for the deployment
        result = subprocess.run(
            f"kubectl get pods -n {namespace} -l app={deployment_name} -o json",
            shell=True,
            capture_output=True,
            text=True
        )
        
        if result.returncode == 0:
            pods_data = json.loads(result.stdout)
            if pods_data.get('items'):
                pod_name = pods_data['items'][0]['metadata']['name']
                
                # Describe the pod to get image info
                result = subprocess.run(
                    f"kubectl describe pod {pod_name} -n {namespace}",
                    shell=True,
                    capture_output=True,
                    text=True
                )
                
                if result.returncode == 0:
                    output = result.stdout
                    # Look for image size in the output
                    lines = output.split('\n')
                    for line in lines:
                        if 'Size:' in line or 'Image Size:' in line:
                            size_match = re.search(r'(\d+\.?\d*)\s*(MB|GB|KB|B)', line)
                            if size_match:
                                return parse_size_string(size_match.group(1), size_match.group(2))
                                
    except Exception as e:
        print(f"Error getting image size from kubectl: {e}")
    
    return None

def get_image_size_from_docker_history(full_image):
    """Get image size from docker history command"""
    try:
        result = subprocess.run(
            f"docker history {full_image} --format 'table {{.Size}}'",
            shell=True,
            capture_output=True,
            text=True
        )
        
        if result.returncode == 0:
            lines = result.stdout.strip().split('\n')
            total_size = 0
            for line in lines[1:]:  # Skip header
                if line.strip() and line.strip() != '0B':
                    size = parse_docker_size(line.strip())
                    if size:
                        total_size += size
            
            return total_size if total_size > 0 else None
            
    except Exception as e:
        print(f"Error getting image history: {e}")
    
    return None

def estimate_image_size_by_base_image(repository, tag):
    """Estimate image size based on common base images"""
    
    # Common base image sizes (approximate in bytes)
    base_image_sizes = {
        'alpine': 5 * 1024 * 1024,          # ~5MB
        'ubuntu': 72 * 1024 * 1024,         # ~72MB  
        'debian': 124 * 1024 * 1024,        # ~124MB
        'centos': 200 * 1024 * 1024,        # ~200MB
        'node': 900 * 1024 * 1024,          # ~900MB
        'nginx': 135 * 1024 * 1024,         # ~135MB
        'redis': 113 * 1024 * 1024,         # ~113MB
        'postgres': 314 * 1024 * 1024,      # ~314MB
        'mysql': 544 * 1024 * 1024,         # ~544MB
        'openjdk': 470 * 1024 * 1024,       # ~470MB
        'python': 885 * 1024 * 1024,        # ~885MB
        'golang': 862 * 1024 * 1024,        # ~862MB
        'busybox': 1.4 * 1024 * 1024,       # ~1.4MB
        'scratch': 0,                        # 0MB
    }
    
    # Check if repository contains known base image names
    repo_lower = repository.lower()
    for base_name, size in base_image_sizes.items():
        if base_name in repo_lower:
            # Add some estimation for application layers (20-200MB typically)
            if base_name in ['alpine', 'busybox', 'scratch']:
                app_layer_estimate = 50 * 1024 * 1024  # 50MB for lightweight apps
            else:
                app_layer_estimate = 150 * 1024 * 1024  # 150MB for regular apps
            
            return size + app_layer_estimate
    
    # Default estimation for unknown images
    return 200 * 1024 * 1024  # 200MB default

def parse_docker_size(size_str):
    """Parse Docker size string to bytes"""
    if not size_str or size_str == '0B':
        return 0
    
    size_match = re.search(r'(\d+\.?\d*)\s*(MB|GB|KB|B)', size_str)
    if size_match:
        return parse_size_string(size_match.group(1), size_match.group(2))
    
    return None

def parse_size_string(size_value, unit):
    """Convert size string to bytes"""
    try:
        size_float = float(size_value)
        unit_multipliers = {
            'B': 1,
            'KB': 1024,
            'MB': 1024 * 1024,
            'GB': 1024 * 1024 * 1024,
            'TB': 1024 * 1024 * 1024 * 1024
        }
        
        return int(size_float * unit_multipliers.get(unit, 1))
    except:
        return None

def get_image_size_alternative_methods(registry, repository, tag, full_image, namespace=None, deployment_name=None, container_name=None):
    """Try alternative methods to get image size without authentication"""
    
    print(f"  Trying alternative methods for {full_image}...")
    
    # Method 1: Try local Docker daemon
    size = get_image_size_from_local_docker(full_image)
    if size:
        print(f"  → Found size from local Docker: {format_size(size)}")
        return size
    
    # Method 2: Try Docker history
    size = get_image_size_from_docker_history(full_image)
    if size:
        print(f"  → Found size from Docker history: {format_size(size)}")
        return size
    
    # Method 3: Try kubectl describe pod
    if namespace and deployment_name and container_name:
        size = get_image_size_from_kubectl_describe(namespace, deployment_name, container_name)
        if size:
            print(f"  → Found size from kubectl describe: {format_size(size)}")
            return size
    
    # Method 4: Try to pull image and get size (if Docker is available)
    try:
        print(f"  → Attempting to pull {full_image} to get size...")
        result = subprocess.run(
            f"docker pull {full_image}",
            shell=True,
            capture_output=True,
            text=True
        )
        
        if result.returncode == 0:
            size = get_image_size_from_local_docker(full_image)
            if size:
                print(f"  → Found size after pull: {format_size(size)}")
                return size
        else:
            print(f"  → Pull failed: {result.stderr}")
            
    except Exception as e:
        print(f"  → Pull attempt failed: {e}")
    
    # Method 5: Try anonymous registry access (some registries allow manifest read)
    try:
        manifest_url = f"https://{registry}/v2/{repository}/manifests/{tag}"
        headers = {
            'Accept': 'application/vnd.docker.distribution.manifest.v2+json'
        }
        
        response = requests.get(manifest_url, headers=headers, timeout=5, verify=False)
        if response.status_code == 200:
            manifest = response.json()
            if 'layers' in manifest:
                total_size = sum(layer.get('size', 0) for layer in manifest['layers'])
                if total_size > 0:
                    print(f"  → Found size from anonymous manifest: {format_size(total_size)}")
                    return total_size
                    
    except Exception as e:
        print(f"  → Anonymous manifest access failed: {e}")
    
    # Method 6: Estimate based on base image
    estimated_size = estimate_image_size_by_base_image(repository, tag)
    print(f"  → Estimated size based on base image: {format_size(estimated_size)}")
    
    return estimated_size
    """Get image size from Nexus Registry API"""
    try:
        # Nexus Registry API v2
        # URL format: https://nexus.example.com/repository/docker-repo/v2/{repository}/manifests/{tag}
        
        # Clean up repository name for API call
        repo_name = repository.replace('/', '%2F')  # URL encode slashes
        
        # Try different API endpoints that Nexus commonly uses
        possible_urls = [
            f"https://{registry}/repository/docker-hosted/v2/{repository}/manifests/{tag}",
            f"https://{registry}/repository/docker-proxy/v2/{repository}/manifests/{tag}",
            f"https://{registry}/repository/docker-group/v2/{repository}/manifests/{tag}",
            f"https://{registry}/v2/{repository}/manifests/{tag}",
            f"https://{registry}/service/rest/v1/search?repository=docker-hosted&name={repository}&version={tag}",
            f"https://{registry}/service/rest/v1/search?repository=docker-proxy&name={repository}&version={tag}",
            f"https://{registry}/service/rest/v1/search?repository=docker-group&name={repository}&version={tag}"
        ]
        
        # Set up authentication
        auth = None
        if username and password:
            auth = (username, password)
        
        headers = {
            'Accept': 'application/vnd.docker.distribution.manifest.v2+json, application/vnd.docker.distribution.manifest.v1+json'
        }
        
        for url in possible_urls:
            try:
                response = requests.get(url, auth=auth, headers=headers, timeout=10, verify=False)
                
                if response.status_code == 200:
                    data = response.json()
                    
                    # Handle different response formats
                    if 'items' in data and len(data['items']) > 0:
                        # Nexus search API response
                        item = data['items'][0]
                        if 'assets' in item and len(item['assets']) > 0:
                            # Sum up all layer sizes
                            total_size = 0
                            for asset in item['assets']:
                                if 'fileSize' in asset:
                                    total_size += asset['fileSize']
                            return total_size
                    
                    elif 'config' in data:
                        # Docker manifest v2 format
                        config_size = data['config'].get('size', 0)
                        layers_size = sum(layer.get('size', 0) for layer in data.get('layers', []))
                        return config_size + layers_size
                    
                    elif 'architecture' in data:
                        # Docker manifest v1 format  
                        return data.get('size', 0)
                        
            except Exception as e:
                print(f"Error trying URL {url}: {e}")
                continue
        
        # If direct API calls fail, try to get manifest and calculate size
        manifest_url = f"https://{registry}/v2/{repository}/manifests/{tag}"
        response = requests.get(manifest_url, auth=auth, headers=headers, timeout=10, verify=False)
        
        if response.status_code == 200:
            manifest = response.json()
            if 'layers' in manifest:
                return sum(layer.get('size', 0) for layer in manifest['layers'])
        
    except Exception as e:
        print(f"Error getting size for {registry}/{repository}:{tag}: {e}")
    
    return None

def get_image_size(registry, repository, tag, full_image, namespace=None, deployment_name=None, container_name=None):
    """Get image size based on registry with fallback methods"""
    
    # Configuration for Nexus registry
    NEXUS_REGISTRIES = {
        # Add your Nexus registry configurations here
        # Format: 'registry_host': {'username': 'your_username', 'password': 'your_password'}
        # Example:
        # 'nexus.company.com': {'username': 'admin', 'password': 'admin123'},
        # 'nexus.company.com:8443': {'username': 'admin', 'password': 'admin123'},
    }
    
    if registry == 'docker.io':
        size = get_dockerhub_image_size(repository, tag)
        if size:
            return size
        # If Docker Hub fails, try alternative methods
        return get_image_size_alternative_methods(registry, repository, tag, full_image, namespace, deployment_name, container_name)
    
    elif registry in NEXUS_REGISTRIES:
        # Use configured credentials for Nexus
        creds = NEXUS_REGISTRIES[registry]
        size = get_nexus_image_size(registry, repository, tag, 
                                   creds.get('username'), creds.get('password'))
        if size:
            return size
    
    # Try to get credentials from environment variables
    username = os.getenv(f'NEXUS_USERNAME_{registry.replace(".", "_").replace(":", "_").upper()}')
    password = os.getenv(f'NEXUS_PASSWORD_{registry.replace(".", "_").replace(":", "_").upper()}')
    
    if not username:
        username = os.getenv('NEXUS_USERNAME')
        password = os.getenv('NEXUS_PASSWORD')
    
    # Try to detect if it's a Nexus registry and attempt without auth first
    if 'nexus' in registry.lower() or any(port in registry for port in [':8081', ':8443', ':5000']):
        # Try without authentication first
        size = get_nexus_image_size(registry, repository, tag)
        if size:
            return size
        
        # Try with environment variables
        if username and password:
            size = get_nexus_image_size(registry, repository, tag, username, password)
            if size:
                return size
        
        # If no size found, try with common default credentials
        common_creds = [
            ('admin', 'admin123'),
            ('admin', 'admin'),
            ('nexus', 'nexus'),
            ('docker', 'docker')
        ]
        
        for username, password in common_creds:
            size = get_nexus_image_size(registry, repository, tag, username, password)
            if size:
                return size
    
    # If all registry-specific methods fail, try alternative methods
    return get_image_size_alternative_methods(registry, repository, tag, full_image, namespace, deployment_name, container_name)

def format_size(size_bytes):
    """Format size in bytes to human readable format"""
    if size_bytes is None:
        return "Unknown"
    
    for unit in ['B', 'KB', 'MB', 'GB']:
        if size_bytes < 1024.0:
            return f"{size_bytes:.1f} {unit}"
        size_bytes /= 1024.0
    return f"{size_bytes:.1f} TB"

def extract_images_from_deployments(deployments):
    """Extract image information from deployments"""
    images = []
    
    for deployment in deployments:
        namespace = deployment.get('metadata', {}).get('namespace', 'default')
        deployment_name = deployment.get('metadata', {}).get('name', 'unknown')
        
        spec = deployment.get('spec', {})
        template = spec.get('template', {})
        pod_spec = template.get('spec', {})
        
        # Extract from containers
        containers = pod_spec.get('containers', [])
        for container in containers:
            if 'image' in container:
                image_info = parse_image_info(container['image'])
                images.append({
                    'namespace': namespace,
                    'deployment': deployment_name,
                    'container': container.get('name', 'unknown'),
                    'registry': image_info['registry'],
                    'repository': image_info['repository'],
                    'tag': image_info['tag'],
                    'full_image': image_info['full_image']
                })
        
        # Extract from init containers
        init_containers = pod_spec.get('initContainers', [])
        for container in init_containers:
            if 'image' in container:
                image_info = parse_image_info(container['image'])
                images.append({
                    'namespace': namespace,
                    'deployment': deployment_name,
                    'container': f"{container.get('name', 'unknown')} (init)",
                    'registry': image_info['registry'],
                    'repository': image_info['repository'],
                    'tag': image_info['tag'],
                    'full_image': image_info['full_image']
                })
    
    return images

def main():
    """Main function"""
    print("Kubernetes Deployment Image Extractor")
    print("=" * 40)
    
    # Check if kubectl is available
    if run_kubectl_command("kubectl version --client") is None:
        print("Error: kubectl is not available or not configured properly")
        sys.exit(1)
    
    # Get all deployments
    deployments = get_all_deployments()
    if not deployments:
        print("No deployments found or error occurred")
        sys.exit(1)
    
    print(f"Found {len(deployments)} deployments")
    
    # Extract image information
    images = extract_images_from_deployments(deployments)
    if not images:
        print("No images found in deployments")
        sys.exit(1)
    
    print(f"Found {len(images)} images")
    
    # Get image sizes (this might take a while for many images)
    print("Getting image sizes...")
    for i, image in enumerate(images):
        print(f"Processing {i+1}/{len(images)}: {image['full_image']}")
        size = get_image_size(
            image['registry'], 
            image['repository'], 
            image['tag'], 
            image['full_image'],
            image['namespace'],
            image['deployment'],
            image['container']
        )
        image['size_bytes'] = size
        image['size_formatted'] = format_size(size)
    
    # Create DataFrame
    df = pd.DataFrame(images)
    
    # Sort by repository, then by tag
    df = df.sort_values(['repository', 'tag'])
    
    # Prepare final columns for Excel
    final_columns = [
        'namespace',
        'deployment', 
        'container',
        'registry',
        'repository',
        'tag',
        'size_formatted',
        'full_image'
    ]
    
    df_final = df[final_columns].copy()
    df_final.columns = [
        'Namespace',
        'Deployment',
        'Container',
        'Registry',
        'Repository',
        'Tag',
        'Size',
        'Full Image'
    ]
    
    # Generate filename with timestamp
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"k8s_deployment_images_{timestamp}.xlsx"
    
    # Export to Excel
    print(f"Exporting to {filename}...")
    with pd.ExcelWriter(filename, engine='openpyxl') as writer:
        df_final.to_excel(writer, sheet_name='Deployment Images', index=False)
        
        # Auto-adjust column widths
        worksheet = writer.sheets['Deployment Images']
        for column in worksheet.columns:
            max_length = 0
            column_letter = column[0].column_letter
            for cell in column:
                try:
                    if len(str(cell.value)) > max_length:
                        max_length = len(str(cell.value))
                except:
                    pass
            adjusted_width = min(max_length + 2, 50)
            worksheet.column_dimensions[column_letter].width = adjusted_width
    
    print(f"Successfully exported {len(df_final)} image records to {filename}")
    
    # Print summary
    print("\nSummary:")
    print(f"Total images: {len(df_final)}")
    print(f"Unique repositories: {df_final['Repository'].nunique()}")
    print(f"Registries used: {', '.join(df_final['Registry'].unique())}")
    
    # Show sample data
    print("\nSample data:")
    print(df_final.head(10).to_string(index=False))

if __name__ == "__main__":
    main()
profile
bytebliss

0개의 댓글