Kubernetes deployment에서 이미지 정보를 추출하여 Excel 파일로 만드는 Python 스크립트를 작성해드리겠습니다.이 스크립트를 실행하기 위해 필요한 패키지를 설치하는 requirements.txt 파일도 만들어드리겠습니다:## 사용법:
pip install -r requirements.txtrequirements.txt
pandas>=1.5.0
openpyxl>=3.0.0
requests>=2.28.0
python k8s_image_extractor.py모든 네임스페이스의 deployment 조회: kubectl get deployments --all-namespaces로 모든 deployment를 가져옵니다.
이미지 정보 파싱:
nginx:latest)gcr.io/project/image:tag)localhost:5000/image:tag)이미지 크기 조회: Docker Hub API를 통해 이미지 크기를 가져옵니다.
Excel 파일 생성:
Excel 파일에는 다음과 같은 정보가 포함됩니다:
library/nginx, gcr.io/project/applatest, v1.0.0, stable142.3 MB, 1.2 GB#!/usr/bin/env python3
"""
Kubernetes Deployment Image Information Extractor
This script extracts image information from all Kubernetes deployments,
gets image sizes, and exports the data to an Excel file.
Requirements:
- kubectl configured and accessible
- Python packages: pandas, openpyxl, requests
"""
import subprocess
import json
import pandas as pd
import re
import requests
from urllib.parse import urlparse
import sys
from datetime import datetime
def run_kubectl_command(command):
"""Execute kubectl command and return the output"""
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
check=True
)
return result.stdout
except subprocess.CalledProcessError as e:
print(f"Error executing kubectl command: {e}")
print(f"Error output: {e.stderr}")
return None
def get_all_deployments():
"""Get all deployments from all namespaces"""
print("Getting all deployments...")
command = "kubectl get deployments --all-namespaces -o json"
output = run_kubectl_command(command)
if output is None:
return []
try:
deployments_data = json.loads(output)
return deployments_data.get('items', [])
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return []
def parse_image_info(image_string):
"""Parse image string to extract repository, tag, and registry info"""
# Handle different image formats:
# - nginx:latest
# - docker.io/library/nginx:latest
# - gcr.io/project/image:tag
# - localhost:5000/image:tag
if ':' in image_string:
# Split by last colon to separate tag
parts = image_string.rsplit(':', 1)
image_repo = parts[0]
tag = parts[1]
else:
image_repo = image_string
tag = 'latest'
# Extract registry and repository
if '/' in image_repo:
parts = image_repo.split('/')
if '.' in parts[0] or ':' in parts[0]: # Likely a registry
registry = parts[0]
repository = '/'.join(parts[1:])
else:
registry = 'docker.io'
repository = image_repo
else:
registry = 'docker.io'
repository = f'library/{image_repo}'
return {
'registry': registry,
'repository': repository,
'tag': tag,
'full_image': image_string
}
def get_dockerhub_image_size(repository, tag):
"""Get image size from Docker Hub API"""
try:
# Docker Hub API v2
if repository.startswith('library/'):
repo_name = repository.replace('library/', '')
url = f"https://hub.docker.com/v2/repositories/library/{repo_name}/tags/{tag}"
else:
url = f"https://hub.docker.com/v2/repositories/{repository}/tags/{tag}"
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
if 'full_size' in data:
return data['full_size']
except Exception as e:
print(f"Error getting size for {repository}:{tag}: {e}")
return None
def get_image_size(registry, repository, tag):
"""Get image size based on registry"""
if registry == 'docker.io':
return get_dockerhub_image_size(repository, tag)
else:
# For other registries, we can't easily get size without authentication
# You might need to implement specific logic for your registries
return None
def format_size(size_bytes):
"""Format size in bytes to human readable format"""
if size_bytes is None:
return "Unknown"
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} TB"
def extract_images_from_deployments(deployments):
"""Extract image information from deployments"""
images = []
for deployment in deployments:
namespace = deployment.get('metadata', {}).get('namespace', 'default')
deployment_name = deployment.get('metadata', {}).get('name', 'unknown')
spec = deployment.get('spec', {})
template = spec.get('template', {})
pod_spec = template.get('spec', {})
# Extract from containers
containers = pod_spec.get('containers', [])
for container in containers:
if 'image' in container:
image_info = parse_image_info(container['image'])
images.append({
'namespace': namespace,
'deployment': deployment_name,
'container': container.get('name', 'unknown'),
'registry': image_info['registry'],
'repository': image_info['repository'],
'tag': image_info['tag'],
'full_image': image_info['full_image']
})
# Extract from init containers
init_containers = pod_spec.get('initContainers', [])
for container in init_containers:
if 'image' in container:
image_info = parse_image_info(container['image'])
images.append({
'namespace': namespace,
'deployment': deployment_name,
'container': f"{container.get('name', 'unknown')} (init)",
'registry': image_info['registry'],
'repository': image_info['repository'],
'tag': image_info['tag'],
'full_image': image_info['full_image']
})
return images
def main():
"""Main function"""
print("Kubernetes Deployment Image Extractor")
print("=" * 40)
# Check if kubectl is available
if run_kubectl_command("kubectl version --client") is None:
print("Error: kubectl is not available or not configured properly")
sys.exit(1)
# Get all deployments
deployments = get_all_deployments()
if not deployments:
print("No deployments found or error occurred")
sys.exit(1)
print(f"Found {len(deployments)} deployments")
# Extract image information
images = extract_images_from_deployments(deployments)
if not images:
print("No images found in deployments")
sys.exit(1)
print(f"Found {len(images)} images")
# Get image sizes (this might take a while for many images)
print("Getting image sizes...")
for i, image in enumerate(images):
print(f"Processing {i+1}/{len(images)}: {image['full_image']}")
size = get_image_size(image['registry'], image['repository'], image['tag'])
image['size_bytes'] = size
image['size_formatted'] = format_size(size)
# Create DataFrame
df = pd.DataFrame(images)
# Sort by repository, then by tag
df = df.sort_values(['repository', 'tag'])
# Prepare final columns for Excel
final_columns = [
'namespace',
'deployment',
'container',
'registry',
'repository',
'tag',
'size_formatted',
'full_image'
]
df_final = df[final_columns].copy()
df_final.columns = [
'Namespace',
'Deployment',
'Container',
'Registry',
'Repository',
'Tag',
'Size',
'Full Image'
]
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"k8s_deployment_images_{timestamp}.xlsx"
# Export to Excel
print(f"Exporting to {filename}...")
with pd.ExcelWriter(filename, engine='openpyxl') as writer:
df_final.to_excel(writer, sheet_name='Deployment Images', index=False)
# Auto-adjust column widths
worksheet = writer.sheets['Deployment Images']
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
print(f"Successfully exported {len(df_final)} image records to {filename}")
# Print summary
print("\nSummary:")
print(f"Total images: {len(df_final)}")
print(f"Unique repositories: {df_final['Repository'].nunique()}")
print(f"Registries used: {', '.join(df_final['Registry'].unique())}")
# Show sample data
print("\nSample data:")
print(df_final.head(10).to_string(index=False))
if __name__ == "__main__":
main()
export NEXUS_USERNAME="myuser"
export NEXUS_PASSWORD="mypass"
#!/usr/bin/env python3
"""
Kubernetes Deployment Image Information Extractor
This script extracts image information from all Kubernetes deployments,
gets image sizes, and exports the data to an Excel file.
Requirements:
- kubectl configured and accessible
- Python packages: pandas, openpyxl, requests
"""
import subprocess
import json
import pandas as pd
import re
import requests
from urllib.parse import urlparse
import sys
import os
from datetime import datetime
def run_kubectl_command(command):
"""Execute kubectl command and return the output"""
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
check=True
)
return result.stdout
except subprocess.CalledProcessError as e:
print(f"Error executing kubectl command: {e}")
print(f"Error output: {e.stderr}")
return None
def get_all_deployments():
"""Get all deployments from all namespaces"""
print("Getting all deployments...")
command = "kubectl get deployments --all-namespaces -o json"
output = run_kubectl_command(command)
if output is None:
return []
try:
deployments_data = json.loads(output)
return deployments_data.get('items', [])
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return []
def parse_image_info(image_string):
"""Parse image string to extract repository, tag, and registry info"""
# Handle different image formats:
# - nginx:latest
# - docker.io/library/nginx:latest
# - gcr.io/project/image:tag
# - localhost:5000/image:tag
if ':' in image_string:
# Split by last colon to separate tag
parts = image_string.rsplit(':', 1)
image_repo = parts[0]
tag = parts[1]
else:
image_repo = image_string
tag = 'latest'
# Extract registry and repository
if '/' in image_repo:
parts = image_repo.split('/')
if '.' in parts[0] or ':' in parts[0]: # Likely a registry
registry = parts[0]
repository = '/'.join(parts[1:])
else:
registry = 'docker.io'
repository = image_repo
else:
registry = 'docker.io'
repository = f'library/{image_repo}'
return {
'registry': registry,
'repository': repository,
'tag': tag,
'full_image': image_string
}
def get_dockerhub_image_size(repository, tag):
"""Get image size from Docker Hub API"""
try:
# Docker Hub API v2
if repository.startswith('library/'):
repo_name = repository.replace('library/', '')
url = f"https://hub.docker.com/v2/repositories/library/{repo_name}/tags/{tag}"
else:
url = f"https://hub.docker.com/v2/repositories/{repository}/tags/{tag}"
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
if 'full_size' in data:
return data['full_size']
except Exception as e:
print(f"Error getting size for {repository}:{tag}: {e}")
return None
def get_nexus_image_size(registry, repository, tag, username=None, password=None):
"""Get image size from Nexus Registry API"""
try:
# Nexus Registry API v2
# URL format: https://nexus.example.com/repository/docker-repo/v2/{repository}/manifests/{tag}
# Clean up repository name for API call
repo_name = repository.replace('/', '%2F') # URL encode slashes
# Try different API endpoints that Nexus commonly uses
possible_urls = [
f"https://{registry}/repository/docker-hosted/v2/{repository}/manifests/{tag}",
f"https://{registry}/repository/docker-proxy/v2/{repository}/manifests/{tag}",
f"https://{registry}/repository/docker-group/v2/{repository}/manifests/{tag}",
f"https://{registry}/v2/{repository}/manifests/{tag}",
f"https://{registry}/service/rest/v1/search?repository=docker-hosted&name={repository}&version={tag}",
f"https://{registry}/service/rest/v1/search?repository=docker-proxy&name={repository}&version={tag}",
f"https://{registry}/service/rest/v1/search?repository=docker-group&name={repository}&version={tag}"
]
# Set up authentication
auth = None
if username and password:
auth = (username, password)
headers = {
'Accept': 'application/vnd.docker.distribution.manifest.v2+json, application/vnd.docker.distribution.manifest.v1+json'
}
for url in possible_urls:
try:
response = requests.get(url, auth=auth, headers=headers, timeout=10, verify=False)
if response.status_code == 200:
data = response.json()
# Handle different response formats
if 'items' in data and len(data['items']) > 0:
# Nexus search API response
item = data['items'][0]
if 'assets' in item and len(item['assets']) > 0:
# Sum up all layer sizes
total_size = 0
for asset in item['assets']:
if 'fileSize' in asset:
total_size += asset['fileSize']
return total_size
elif 'config' in data:
# Docker manifest v2 format
config_size = data['config'].get('size', 0)
layers_size = sum(layer.get('size', 0) for layer in data.get('layers', []))
return config_size + layers_size
elif 'architecture' in data:
# Docker manifest v1 format
return data.get('size', 0)
except Exception as e:
print(f"Error trying URL {url}: {e}")
continue
# If direct API calls fail, try to get manifest and calculate size
manifest_url = f"https://{registry}/v2/{repository}/manifests/{tag}"
response = requests.get(manifest_url, auth=auth, headers=headers, timeout=10, verify=False)
if response.status_code == 200:
manifest = response.json()
if 'layers' in manifest:
return sum(layer.get('size', 0) for layer in manifest['layers'])
except Exception as e:
print(f"Error getting size for {registry}/{repository}:{tag}: {e}")
return None
def get_image_size(registry, repository, tag):
"""Get image size based on registry"""
# Configuration for Nexus registry
NEXUS_REGISTRIES = {
# Add your Nexus registry configurations here
# Format: 'registry_host': {'username': 'your_username', 'password': 'your_password'}
# Example:
# 'nexus.company.com': {'username': 'admin', 'password': 'admin123'},
# 'nexus.company.com:8443': {'username': 'admin', 'password': 'admin123'},
}
if registry == 'docker.io':
return get_dockerhub_image_size(repository, tag)
elif registry in NEXUS_REGISTRIES:
# Use configured credentials for Nexus
creds = NEXUS_REGISTRIES[registry]
return get_nexus_image_size(registry, repository, tag,
creds.get('username'), creds.get('password'))
else:
# Try to get credentials from environment variables
username = os.getenv(f'NEXUS_USERNAME_{registry.replace(".", "_").replace(":", "_").upper()}')
password = os.getenv(f'NEXUS_PASSWORD_{registry.replace(".", "_").replace(":", "_").upper()}')
if not username:
username = os.getenv('NEXUS_USERNAME')
password = os.getenv('NEXUS_PASSWORD')
# Try to detect if it's a Nexus registry and attempt without auth first
if 'nexus' in registry.lower() or any(port in registry for port in [':8081', ':8443', ':5000']):
# Try without authentication first
size = get_nexus_image_size(registry, repository, tag)
if size is not None:
return size
# Try with environment variables
if username and password:
size = get_nexus_image_size(registry, repository, tag, username, password)
if size is not None:
return size
# If no size found, try with common default credentials
common_creds = [
('admin', 'admin123'),
('admin', 'admin'),
('nexus', 'nexus'),
('docker', 'docker')
]
for username, password in common_creds:
size = get_nexus_image_size(registry, repository, tag, username, password)
if size is not None:
return size
# For other registries, we can't easily get size without authentication
return None
def format_size(size_bytes):
"""Format size in bytes to human readable format"""
if size_bytes is None:
return "Unknown"
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} TB"
def extract_images_from_deployments(deployments):
"""Extract image information from deployments"""
images = []
for deployment in deployments:
namespace = deployment.get('metadata', {}).get('namespace', 'default')
deployment_name = deployment.get('metadata', {}).get('name', 'unknown')
spec = deployment.get('spec', {})
template = spec.get('template', {})
pod_spec = template.get('spec', {})
# Extract from containers
containers = pod_spec.get('containers', [])
for container in containers:
if 'image' in container:
image_info = parse_image_info(container['image'])
images.append({
'namespace': namespace,
'deployment': deployment_name,
'container': container.get('name', 'unknown'),
'registry': image_info['registry'],
'repository': image_info['repository'],
'tag': image_info['tag'],
'full_image': image_info['full_image']
})
# Extract from init containers
init_containers = pod_spec.get('initContainers', [])
for container in init_containers:
if 'image' in container:
image_info = parse_image_info(container['image'])
images.append({
'namespace': namespace,
'deployment': deployment_name,
'container': f"{container.get('name', 'unknown')} (init)",
'registry': image_info['registry'],
'repository': image_info['repository'],
'tag': image_info['tag'],
'full_image': image_info['full_image']
})
return images
def main():
"""Main function"""
print("Kubernetes Deployment Image Extractor")
print("=" * 40)
# Check if kubectl is available
if run_kubectl_command("kubectl version --client") is None:
print("Error: kubectl is not available or not configured properly")
sys.exit(1)
# Get all deployments
deployments = get_all_deployments()
if not deployments:
print("No deployments found or error occurred")
sys.exit(1)
print(f"Found {len(deployments)} deployments")
# Extract image information
images = extract_images_from_deployments(deployments)
if not images:
print("No images found in deployments")
sys.exit(1)
print(f"Found {len(images)} images")
# Get image sizes (this might take a while for many images)
print("Getting image sizes...")
for i, image in enumerate(images):
print(f"Processing {i+1}/{len(images)}: {image['full_image']}")
size = get_image_size(image['registry'], image['repository'], image['tag'])
image['size_bytes'] = size
image['size_formatted'] = format_size(size)
# Create DataFrame
df = pd.DataFrame(images)
# Sort by repository, then by tag
df = df.sort_values(['repository', 'tag'])
# Prepare final columns for Excel
final_columns = [
'namespace',
'deployment',
'container',
'registry',
'repository',
'tag',
'size_formatted',
'full_image'
]
df_final = df[final_columns].copy()
df_final.columns = [
'Namespace',
'Deployment',
'Container',
'Registry',
'Repository',
'Tag',
'Size',
'Full Image'
]
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"k8s_deployment_images_{timestamp}.xlsx"
# Export to Excel
print(f"Exporting to {filename}...")
with pd.ExcelWriter(filename, engine='openpyxl') as writer:
df_final.to_excel(writer, sheet_name='Deployment Images', index=False)
# Auto-adjust column widths
worksheet = writer.sheets['Deployment Images']
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
print(f"Successfully exported {len(df_final)} image records to {filename}")
# Print summary
print("\nSummary:")
print(f"Total images: {len(df_final)}")
print(f"Unique repositories: {df_final['Repository'].nunique()}")
print(f"Registries used: {', '.join(df_final['Registry'].unique())}")
# Show sample data
print("\nSample data:")
print(df_final.head(10).to_string(index=False))
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Kubernetes Deployment Image Information Extractor
This script extracts image information from all Kubernetes deployments,
gets image sizes, and exports the data to an Excel file.
Requirements:
- kubectl configured and accessible
- Python packages: pandas, openpyxl, requests
"""
import subprocess
import json
import pandas as pd
import re
import requests
from urllib.parse import urlparse
import sys
import os
from datetime import datetime
def run_kubectl_command(command):
"""Execute kubectl command and return the output"""
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
check=True
)
return result.stdout
except subprocess.CalledProcessError as e:
print(f"Error executing kubectl command: {e}")
print(f"Error output: {e.stderr}")
return None
def get_all_deployments():
"""Get all deployments from all namespaces"""
print("Getting all deployments...")
command = "kubectl get deployments --all-namespaces -o json"
output = run_kubectl_command(command)
if output is None:
return []
try:
deployments_data = json.loads(output)
return deployments_data.get('items', [])
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return []
def parse_image_info(image_string):
"""Parse image string to extract repository, tag, and registry info"""
# Handle different image formats:
# - nginx:latest
# - docker.io/library/nginx:latest
# - gcr.io/project/image:tag
# - localhost:5000/image:tag
if ':' in image_string:
# Split by last colon to separate tag
parts = image_string.rsplit(':', 1)
image_repo = parts[0]
tag = parts[1]
else:
image_repo = image_string
tag = 'latest'
# Extract registry and repository
if '/' in image_repo:
parts = image_repo.split('/')
if '.' in parts[0] or ':' in parts[0]: # Likely a registry
registry = parts[0]
repository = '/'.join(parts[1:])
else:
registry = 'docker.io'
repository = image_repo
else:
registry = 'docker.io'
repository = f'library/{image_repo}'
return {
'registry': registry,
'repository': repository,
'tag': tag,
'full_image': image_string
}
def get_dockerhub_image_size(repository, tag):
"""Get image size from Docker Hub API"""
try:
# Docker Hub API v2
if repository.startswith('library/'):
repo_name = repository.replace('library/', '')
url = f"https://hub.docker.com/v2/repositories/library/{repo_name}/tags/{tag}"
else:
url = f"https://hub.docker.com/v2/repositories/{repository}/tags/{tag}"
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
if 'full_size' in data:
return data['full_size']
except Exception as e:
print(f"Error getting size for {repository}:{tag}: {e}")
return None
def get_image_size_from_local_docker(full_image):
"""Get image size from local Docker daemon"""
try:
# Try to get image info from local Docker
result = subprocess.run(
f"docker images {full_image} --format 'table {{.Size}}'",
shell=True,
capture_output=True,
text=True
)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
if len(lines) > 1: # Skip header
size_str = lines[1].strip()
return parse_docker_size(size_str)
except Exception as e:
print(f"Error getting local Docker image size: {e}")
return None
def get_image_size_from_kubectl_describe(namespace, deployment_name, container_name):
"""Get image size from kubectl describe pod"""
try:
# Get pods for the deployment
result = subprocess.run(
f"kubectl get pods -n {namespace} -l app={deployment_name} -o json",
shell=True,
capture_output=True,
text=True
)
if result.returncode == 0:
pods_data = json.loads(result.stdout)
if pods_data.get('items'):
pod_name = pods_data['items'][0]['metadata']['name']
# Describe the pod to get image info
result = subprocess.run(
f"kubectl describe pod {pod_name} -n {namespace}",
shell=True,
capture_output=True,
text=True
)
if result.returncode == 0:
output = result.stdout
# Look for image size in the output
lines = output.split('\n')
for line in lines:
if 'Size:' in line or 'Image Size:' in line:
size_match = re.search(r'(\d+\.?\d*)\s*(MB|GB|KB|B)', line)
if size_match:
return parse_size_string(size_match.group(1), size_match.group(2))
except Exception as e:
print(f"Error getting image size from kubectl: {e}")
return None
def get_image_size_from_docker_history(full_image):
"""Get image size from docker history command"""
try:
result = subprocess.run(
f"docker history {full_image} --format 'table {{.Size}}'",
shell=True,
capture_output=True,
text=True
)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
total_size = 0
for line in lines[1:]: # Skip header
if line.strip() and line.strip() != '0B':
size = parse_docker_size(line.strip())
if size:
total_size += size
return total_size if total_size > 0 else None
except Exception as e:
print(f"Error getting image history: {e}")
return None
def estimate_image_size_by_base_image(repository, tag):
"""Estimate image size based on common base images"""
# Common base image sizes (approximate in bytes)
base_image_sizes = {
'alpine': 5 * 1024 * 1024, # ~5MB
'ubuntu': 72 * 1024 * 1024, # ~72MB
'debian': 124 * 1024 * 1024, # ~124MB
'centos': 200 * 1024 * 1024, # ~200MB
'node': 900 * 1024 * 1024, # ~900MB
'nginx': 135 * 1024 * 1024, # ~135MB
'redis': 113 * 1024 * 1024, # ~113MB
'postgres': 314 * 1024 * 1024, # ~314MB
'mysql': 544 * 1024 * 1024, # ~544MB
'openjdk': 470 * 1024 * 1024, # ~470MB
'python': 885 * 1024 * 1024, # ~885MB
'golang': 862 * 1024 * 1024, # ~862MB
'busybox': 1.4 * 1024 * 1024, # ~1.4MB
'scratch': 0, # 0MB
}
# Check if repository contains known base image names
repo_lower = repository.lower()
for base_name, size in base_image_sizes.items():
if base_name in repo_lower:
# Add some estimation for application layers (20-200MB typically)
if base_name in ['alpine', 'busybox', 'scratch']:
app_layer_estimate = 50 * 1024 * 1024 # 50MB for lightweight apps
else:
app_layer_estimate = 150 * 1024 * 1024 # 150MB for regular apps
return size + app_layer_estimate
# Default estimation for unknown images
return 200 * 1024 * 1024 # 200MB default
def parse_docker_size(size_str):
"""Parse Docker size string to bytes"""
if not size_str or size_str == '0B':
return 0
size_match = re.search(r'(\d+\.?\d*)\s*(MB|GB|KB|B)', size_str)
if size_match:
return parse_size_string(size_match.group(1), size_match.group(2))
return None
def parse_size_string(size_value, unit):
"""Convert size string to bytes"""
try:
size_float = float(size_value)
unit_multipliers = {
'B': 1,
'KB': 1024,
'MB': 1024 * 1024,
'GB': 1024 * 1024 * 1024,
'TB': 1024 * 1024 * 1024 * 1024
}
return int(size_float * unit_multipliers.get(unit, 1))
except:
return None
def get_image_size_alternative_methods(registry, repository, tag, full_image, namespace=None, deployment_name=None, container_name=None):
"""Try alternative methods to get image size without authentication"""
print(f" Trying alternative methods for {full_image}...")
# Method 1: Try local Docker daemon
size = get_image_size_from_local_docker(full_image)
if size:
print(f" → Found size from local Docker: {format_size(size)}")
return size
# Method 2: Try Docker history
size = get_image_size_from_docker_history(full_image)
if size:
print(f" → Found size from Docker history: {format_size(size)}")
return size
# Method 3: Try kubectl describe pod
if namespace and deployment_name and container_name:
size = get_image_size_from_kubectl_describe(namespace, deployment_name, container_name)
if size:
print(f" → Found size from kubectl describe: {format_size(size)}")
return size
# Method 4: Try to pull image and get size (if Docker is available)
try:
print(f" → Attempting to pull {full_image} to get size...")
result = subprocess.run(
f"docker pull {full_image}",
shell=True,
capture_output=True,
text=True
)
if result.returncode == 0:
size = get_image_size_from_local_docker(full_image)
if size:
print(f" → Found size after pull: {format_size(size)}")
return size
else:
print(f" → Pull failed: {result.stderr}")
except Exception as e:
print(f" → Pull attempt failed: {e}")
# Method 5: Try anonymous registry access (some registries allow manifest read)
try:
manifest_url = f"https://{registry}/v2/{repository}/manifests/{tag}"
headers = {
'Accept': 'application/vnd.docker.distribution.manifest.v2+json'
}
response = requests.get(manifest_url, headers=headers, timeout=5, verify=False)
if response.status_code == 200:
manifest = response.json()
if 'layers' in manifest:
total_size = sum(layer.get('size', 0) for layer in manifest['layers'])
if total_size > 0:
print(f" → Found size from anonymous manifest: {format_size(total_size)}")
return total_size
except Exception as e:
print(f" → Anonymous manifest access failed: {e}")
# Method 6: Estimate based on base image
estimated_size = estimate_image_size_by_base_image(repository, tag)
print(f" → Estimated size based on base image: {format_size(estimated_size)}")
return estimated_size
"""Get image size from Nexus Registry API"""
try:
# Nexus Registry API v2
# URL format: https://nexus.example.com/repository/docker-repo/v2/{repository}/manifests/{tag}
# Clean up repository name for API call
repo_name = repository.replace('/', '%2F') # URL encode slashes
# Try different API endpoints that Nexus commonly uses
possible_urls = [
f"https://{registry}/repository/docker-hosted/v2/{repository}/manifests/{tag}",
f"https://{registry}/repository/docker-proxy/v2/{repository}/manifests/{tag}",
f"https://{registry}/repository/docker-group/v2/{repository}/manifests/{tag}",
f"https://{registry}/v2/{repository}/manifests/{tag}",
f"https://{registry}/service/rest/v1/search?repository=docker-hosted&name={repository}&version={tag}",
f"https://{registry}/service/rest/v1/search?repository=docker-proxy&name={repository}&version={tag}",
f"https://{registry}/service/rest/v1/search?repository=docker-group&name={repository}&version={tag}"
]
# Set up authentication
auth = None
if username and password:
auth = (username, password)
headers = {
'Accept': 'application/vnd.docker.distribution.manifest.v2+json, application/vnd.docker.distribution.manifest.v1+json'
}
for url in possible_urls:
try:
response = requests.get(url, auth=auth, headers=headers, timeout=10, verify=False)
if response.status_code == 200:
data = response.json()
# Handle different response formats
if 'items' in data and len(data['items']) > 0:
# Nexus search API response
item = data['items'][0]
if 'assets' in item and len(item['assets']) > 0:
# Sum up all layer sizes
total_size = 0
for asset in item['assets']:
if 'fileSize' in asset:
total_size += asset['fileSize']
return total_size
elif 'config' in data:
# Docker manifest v2 format
config_size = data['config'].get('size', 0)
layers_size = sum(layer.get('size', 0) for layer in data.get('layers', []))
return config_size + layers_size
elif 'architecture' in data:
# Docker manifest v1 format
return data.get('size', 0)
except Exception as e:
print(f"Error trying URL {url}: {e}")
continue
# If direct API calls fail, try to get manifest and calculate size
manifest_url = f"https://{registry}/v2/{repository}/manifests/{tag}"
response = requests.get(manifest_url, auth=auth, headers=headers, timeout=10, verify=False)
if response.status_code == 200:
manifest = response.json()
if 'layers' in manifest:
return sum(layer.get('size', 0) for layer in manifest['layers'])
except Exception as e:
print(f"Error getting size for {registry}/{repository}:{tag}: {e}")
return None
def get_image_size(registry, repository, tag, full_image, namespace=None, deployment_name=None, container_name=None):
"""Get image size based on registry with fallback methods"""
# Configuration for Nexus registry
NEXUS_REGISTRIES = {
# Add your Nexus registry configurations here
# Format: 'registry_host': {'username': 'your_username', 'password': 'your_password'}
# Example:
# 'nexus.company.com': {'username': 'admin', 'password': 'admin123'},
# 'nexus.company.com:8443': {'username': 'admin', 'password': 'admin123'},
}
if registry == 'docker.io':
size = get_dockerhub_image_size(repository, tag)
if size:
return size
# If Docker Hub fails, try alternative methods
return get_image_size_alternative_methods(registry, repository, tag, full_image, namespace, deployment_name, container_name)
elif registry in NEXUS_REGISTRIES:
# Use configured credentials for Nexus
creds = NEXUS_REGISTRIES[registry]
size = get_nexus_image_size(registry, repository, tag,
creds.get('username'), creds.get('password'))
if size:
return size
# Try to get credentials from environment variables
username = os.getenv(f'NEXUS_USERNAME_{registry.replace(".", "_").replace(":", "_").upper()}')
password = os.getenv(f'NEXUS_PASSWORD_{registry.replace(".", "_").replace(":", "_").upper()}')
if not username:
username = os.getenv('NEXUS_USERNAME')
password = os.getenv('NEXUS_PASSWORD')
# Try to detect if it's a Nexus registry and attempt without auth first
if 'nexus' in registry.lower() or any(port in registry for port in [':8081', ':8443', ':5000']):
# Try without authentication first
size = get_nexus_image_size(registry, repository, tag)
if size:
return size
# Try with environment variables
if username and password:
size = get_nexus_image_size(registry, repository, tag, username, password)
if size:
return size
# If no size found, try with common default credentials
common_creds = [
('admin', 'admin123'),
('admin', 'admin'),
('nexus', 'nexus'),
('docker', 'docker')
]
for username, password in common_creds:
size = get_nexus_image_size(registry, repository, tag, username, password)
if size:
return size
# If all registry-specific methods fail, try alternative methods
return get_image_size_alternative_methods(registry, repository, tag, full_image, namespace, deployment_name, container_name)
def format_size(size_bytes):
"""Format size in bytes to human readable format"""
if size_bytes is None:
return "Unknown"
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} TB"
def extract_images_from_deployments(deployments):
"""Extract image information from deployments"""
images = []
for deployment in deployments:
namespace = deployment.get('metadata', {}).get('namespace', 'default')
deployment_name = deployment.get('metadata', {}).get('name', 'unknown')
spec = deployment.get('spec', {})
template = spec.get('template', {})
pod_spec = template.get('spec', {})
# Extract from containers
containers = pod_spec.get('containers', [])
for container in containers:
if 'image' in container:
image_info = parse_image_info(container['image'])
images.append({
'namespace': namespace,
'deployment': deployment_name,
'container': container.get('name', 'unknown'),
'registry': image_info['registry'],
'repository': image_info['repository'],
'tag': image_info['tag'],
'full_image': image_info['full_image']
})
# Extract from init containers
init_containers = pod_spec.get('initContainers', [])
for container in init_containers:
if 'image' in container:
image_info = parse_image_info(container['image'])
images.append({
'namespace': namespace,
'deployment': deployment_name,
'container': f"{container.get('name', 'unknown')} (init)",
'registry': image_info['registry'],
'repository': image_info['repository'],
'tag': image_info['tag'],
'full_image': image_info['full_image']
})
return images
def main():
"""Main function"""
print("Kubernetes Deployment Image Extractor")
print("=" * 40)
# Check if kubectl is available
if run_kubectl_command("kubectl version --client") is None:
print("Error: kubectl is not available or not configured properly")
sys.exit(1)
# Get all deployments
deployments = get_all_deployments()
if not deployments:
print("No deployments found or error occurred")
sys.exit(1)
print(f"Found {len(deployments)} deployments")
# Extract image information
images = extract_images_from_deployments(deployments)
if not images:
print("No images found in deployments")
sys.exit(1)
print(f"Found {len(images)} images")
# Get image sizes (this might take a while for many images)
print("Getting image sizes...")
for i, image in enumerate(images):
print(f"Processing {i+1}/{len(images)}: {image['full_image']}")
size = get_image_size(
image['registry'],
image['repository'],
image['tag'],
image['full_image'],
image['namespace'],
image['deployment'],
image['container']
)
image['size_bytes'] = size
image['size_formatted'] = format_size(size)
# Create DataFrame
df = pd.DataFrame(images)
# Sort by repository, then by tag
df = df.sort_values(['repository', 'tag'])
# Prepare final columns for Excel
final_columns = [
'namespace',
'deployment',
'container',
'registry',
'repository',
'tag',
'size_formatted',
'full_image'
]
df_final = df[final_columns].copy()
df_final.columns = [
'Namespace',
'Deployment',
'Container',
'Registry',
'Repository',
'Tag',
'Size',
'Full Image'
]
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"k8s_deployment_images_{timestamp}.xlsx"
# Export to Excel
print(f"Exporting to {filename}...")
with pd.ExcelWriter(filename, engine='openpyxl') as writer:
df_final.to_excel(writer, sheet_name='Deployment Images', index=False)
# Auto-adjust column widths
worksheet = writer.sheets['Deployment Images']
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
print(f"Successfully exported {len(df_final)} image records to {filename}")
# Print summary
print("\nSummary:")
print(f"Total images: {len(df_final)}")
print(f"Unique repositories: {df_final['Repository'].nunique()}")
print(f"Registries used: {', '.join(df_final['Registry'].unique())}")
# Show sample data
print("\nSample data:")
print(df_final.head(10).to_string(index=False))
if __name__ == "__main__":
main()