#!/usr/bin/env python3
"""
K8s Node Labeling Management Tool
폐쇄망 환경에서 CSV/Excel 파일을 기반으로 K8s node labeling을 관리하는 도구
"""
import argparse
import os
import sys
import subprocess
import json
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import pandas as pd
from tabulate import tabulate
import yaml
class ColoredOutput:
"""컬러 출력을 위한 클래스"""
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
@classmethod
def print_header(cls, text: str):
print(f"{cls.HEADER}{cls.BOLD}{text}{cls.ENDC}")
@classmethod
def print_success(cls, text: str):
print(f"{cls.OKGREEN}✓ {text}{cls.ENDC}")
@classmethod
def print_warning(cls, text: str):
print(f"{cls.WARNING}⚠ {text}{cls.ENDC}")
@classmethod
def print_error(cls, text: str):
print(f"{cls.FAIL}✗ {text}{cls.ENDC}")
@classmethod
def print_info(cls, text: str):
print(f"{cls.OKCYAN}ℹ {text}{cls.ENDC}")
class K8sNodeManager:
"""K8s 노드 관리 클래스"""
def __init__(self):
self.kubectl_available = self._check_kubectl()
def _check_kubectl(self) -> bool:
"""kubectl 명령어 사용 가능 여부 확인"""
try:
result = subprocess.run(['kubectl', 'version', '--client'],
capture_output=True, text=True, timeout=10)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def get_current_nodes(self) -> Dict[str, Dict]:
"""현재 K8s 클러스터의 모든 노드 정보 조회"""
if not self.kubectl_available:
raise RuntimeError("kubectl 명령어를 사용할 수 없습니다. K8s 클러스터 연결을 확인해주세요.")
try:
# 노드 목록과 IP 정보 조회
result = subprocess.run([
'kubectl', 'get', 'nodes', '-o', 'json'
], capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise RuntimeError(f"노드 정보 조회 실패: {result.stderr}")
nodes_data = json.loads(result.stdout)
nodes_info = {}
for node in nodes_data['items']:
node_name = node['metadata']['name']
# 노드 IP 찾기
node_ip = None
for address in node['status'].get('addresses', []):
if address['type'] == 'InternalIP':
node_ip = address['address']
break
# 노드 라벨 정보
labels = node['metadata'].get('labels', {})
nodes_info[node_name] = {
'ip': node_ip,
'labels': labels,
'ready': self._is_node_ready(node)
}
return nodes_info
except (subprocess.TimeoutExpired, json.JSONDecodeError) as e:
raise RuntimeError(f"노드 정보 처리 중 오류 발생: {str(e)}")
def _is_node_ready(self, node_data: Dict) -> bool:
"""노드가 Ready 상태인지 확인"""
for condition in node_data['status'].get('conditions', []):
if condition['type'] == 'Ready':
return condition['status'] == 'True'
return False
def apply_node_label(self, node_name: str, label_key: str, label_value: str) -> bool:
"""노드에 라벨 적용"""
try:
cmd = ['kubectl', 'label', 'node', node_name, f'{label_key}={label_value}', '--overwrite']
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
if result.returncode == 0:
ColoredOutput.print_success(f"노드 {node_name}에 라벨 적용: {label_key}={label_value}")
return True
else:
ColoredOutput.print_error(f"노드 {node_name} 라벨 적용 실패: {result.stderr.strip()}")
return False
except subprocess.TimeoutExpired:
ColoredOutput.print_error(f"노드 {node_name} 라벨 적용 시간 초과")
return False
class FileProcessor:
"""파일 처리 클래스"""
@staticmethod
def read_management_file(file_path: str) -> pd.DataFrame:
"""관리 대장 파일 읽기 (CSV, XLS, XLSX 지원)"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"파일을 찾을 수 없습니다: {file_path}")
file_ext = file_path.suffix.lower()
try:
if file_ext == '.csv':
# Tab 구분자 CSV 파일
df = pd.read_csv(file_path, sep='\t', encoding='utf-8')
elif file_ext in ['.xls', '.xlsx']:
# Excel 파일
df = pd.read_excel(file_path)
else:
raise ValueError(f"지원하지 않는 파일 형식: {file_ext}")
# 필수 컬럼 확인
required_columns = ['hostname', 'ip', 'label']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"필수 컬럼이 누락되었습니다: {missing_columns}")
# 데이터 정리
df = df.dropna(subset=['hostname', 'ip', 'label'])
df['hostname'] = df['hostname'].astype(str).str.strip()
df['ip'] = df['ip'].astype(str).str.strip()
df['label'] = df['label'].astype(str).str.strip()
ColoredOutput.print_success(f"파일 읽기 완료: {len(df)}개 레코드")
return df
except Exception as e:
raise RuntimeError(f"파일 처리 중 오류 발생: {str(e)}")
class NodeLabelingManager:
"""노드 라벨링 관리 메인 클래스"""
def __init__(self):
self.k8s_manager = K8sNodeManager()
self.file_processor = FileProcessor()
def add_labels_from_file(self, file_path: str):
"""파일에서 라벨 정보를 읽어 노드에 적용"""
ColoredOutput.print_header("📋 노드 라벨링 추가 작업 시작")
# 파일 읽기
try:
df = self.file_processor.read_management_file(file_path)
except Exception as e:
ColoredOutput.print_error(f"파일 읽기 실패: {e}")
return False
# 현재 K8s 노드 정보 조회
try:
current_nodes = self.k8s_manager.get_current_nodes()
except Exception as e:
ColoredOutput.print_error(f"K8s 노드 정보 조회 실패: {e}")
return False
# 매칭되는 노드 찾기
matching_operations = []
unmatched_records = []
for _, row in df.iterrows():
hostname = row['hostname']
ip = row['ip']
label_value = row['label']
# hostname으로 먼저 매칭 시도
matched_node = None
for node_name, node_info in current_nodes.items():
if node_name == hostname or node_info['ip'] == ip:
matched_node = node_name
break
if matched_node:
matching_operations.append({
'node_name': matched_node,
'file_hostname': hostname,
'file_ip': ip,
'label_value': label_value,
'current_ip': current_nodes[matched_node]['ip']
})
else:
unmatched_records.append({
'hostname': hostname,
'ip': ip,
'label': label_value
})
# 결과 출력
if matching_operations:
ColoredOutput.print_info(f"\n매칭된 노드 ({len(matching_operations)}개):")
headers = ['노드명', '파일 hostname', '파일 IP', '노드 IP', '적용할 라벨']
table_data = []
for op in matching_operations:
table_data.append([
op['node_name'],
op['file_hostname'],
op['file_ip'],
op['current_ip'],
f"node={op['label_value']}"
])
print(tabulate(table_data, headers=headers, tablefmt="grid"))
if unmatched_records:
ColoredOutput.print_warning(f"\n매칭되지 않은 레코드 ({len(unmatched_records)}개):")
headers = ['hostname', 'IP', 'label']
table_data = [[r['hostname'], r['ip'], r['label']] for r in unmatched_records]
print(tabulate(table_data, headers=headers, tablefmt="grid"))
if not matching_operations:
ColoredOutput.print_warning("적용할 라벨이 없습니다.")
return False
# 사용자 확인
print(f"\n{len(matching_operations)}개 노드에 라벨을 적용하시겠습니까?")
response = input("계속하려면 'yes' 또는 'y'를 입력하세요: ").strip().lower()
if response not in ['yes', 'y']:
ColoredOutput.print_info("작업이 취소되었습니다.")
return False
# 라벨 적용 실행
ColoredOutput.print_header("\n🚀 라벨 적용 실행 중...")
success_count = 0
for op in matching_operations:
success = self.k8s_manager.apply_node_label(
op['node_name'],
'node',
op['label_value']
)
if success:
success_count += 1
# 최종 결과
ColoredOutput.print_header(f"\n📊 작업 완료 결과")
ColoredOutput.print_success(f"성공: {success_count}/{len(matching_operations)}개")
if success_count < len(matching_operations):
ColoredOutput.print_warning(f"실패: {len(matching_operations) - success_count}개")
return success_count > 0
def list_node_labels(self):
"""모든 노드의 라벨 정보를 가독성 좋게 출력"""
ColoredOutput.print_header("📋 K8s 노드 라벨 정보")
try:
current_nodes = self.k8s_manager.get_current_nodes()
except Exception as e:
ColoredOutput.print_error(f"노드 정보 조회 실패: {e}")
return False
if not current_nodes:
ColoredOutput.print_warning("조회된 노드가 없습니다.")
return False
# 1. 노드 개요 정보 출력
self._print_nodes_overview(current_nodes)
# 2. 사용자 정의 라벨 요약 (node= 라벨)
self._print_custom_labels_summary(current_nodes)
# 3. 각 노드별 상세 라벨 정보
self._print_detailed_node_labels(current_nodes)
return True
def _print_nodes_overview(self, nodes: Dict[str, Dict]):
"""노드 개요 정보 출력"""
ColoredOutput.print_header("\n🖥️ 노드 개요")
headers = ['노드명', 'IP 주소', '상태', '총 라벨 수']
table_data = []
for node_name, node_info in nodes.items():
status = '🟢 Ready' if node_info['ready'] else '🔴 NotReady'
ip = node_info['ip'] or '-'
label_count = len(node_info['labels'])
table_data.append([node_name, ip, status, label_count])
print(tabulate(table_data, headers=headers, tablefmt="rounded_outline"))
def _print_custom_labels_summary(self, nodes: Dict[str, Dict]):
"""사용자 정의 라벨 요약 출력 (node= 라벨 중심)"""
ColoredOutput.print_header("\n🏷️ 사용자 정의 라벨 (node=)")
# node= 라벨이 있는 노드들 찾기
custom_labeled_nodes = {}
for node_name, node_info in nodes.items():
if 'node' in node_info['labels']:
custom_labeled_nodes[node_name] = node_info['labels']['node']
if custom_labeled_nodes:
headers = ['노드명', '라벨 값']
table_data = [[node, label] for node, label in custom_labeled_nodes.items()]
print(tabulate(table_data, headers=headers, tablefmt="rounded_outline"))
else:
ColoredOutput.print_info("사용자 정의 라벨(node=)이 설정된 노드가 없습니다.")
def _print_detailed_node_labels(self, nodes: Dict[str, Dict]):
"""각 노드별 상세 라벨 정보 출력"""
ColoredOutput.print_header("\n📋 노드별 상세 라벨 정보")
for i, (node_name, node_info) in enumerate(nodes.items()):
if i > 0:
print() # 노드 간 구분선
# 노드 헤더
status_emoji = '🟢' if node_info['ready'] else '🔴'
ColoredOutput.print_info(f"{status_emoji} {node_name} ({node_info['ip']})")
# 라벨 분류
labels = node_info['labels']
system_labels = {}
custom_labels = {}
# 시스템 라벨과 사용자 라벨 분리
system_prefixes = [
'kubernetes.io/', 'node.kubernetes.io/', 'beta.kubernetes.io/',
'topology.kubernetes.io/', 'node-role.kubernetes.io/'
]
for key, value in labels.items():
is_system = any(key.startswith(prefix) for prefix in system_prefixes)
if is_system:
system_labels[key] = value
else:
custom_labels[key] = value
# 사용자 정의 라벨 출력 (우선)
if custom_labels:
print(" 📌 사용자 정의 라벨:")
for key, value in sorted(custom_labels.items()):
if key == 'node':
# node= 라벨은 강조 표시
print(f" 🎯 {ColoredOutput.OKGREEN}{key}{ColoredOutput.ENDC} = {ColoredOutput.BOLD}{value}{ColoredOutput.ENDC}")
else:
print(f" 🔹 {key} = {value}")
# 시스템 라벨 출력 (접을 수 있는 형태)
if system_labels:
print(f" ⚙️ 시스템 라벨 ({len(system_labels)}개):")
# 중요한 시스템 라벨만 먼저 표시
important_system_labels = {
k: v for k, v in system_labels.items()
if any(important in k for important in ['arch', 'os', 'instance-type', 'zone', 'role'])
}
if important_system_labels:
for key, value in sorted(important_system_labels.items()):
short_key = key.split('/')[-1] # 마지막 부분만 표시
print(f" 📋 {short_key} = {value}")
# 나머지 시스템 라벨은 요약으로 표시
remaining_count = len(system_labels) - len(important_system_labels)
if remaining_count > 0:
print(f" 💭 기타 시스템 라벨: {remaining_count}개")
print(f" (전체 목록을 보려면 'kubectl describe node {node_name}' 실행)")
if not custom_labels and not system_labels:
print(" 📝 라벨이 없습니다.")
# 전체 요약
total_nodes = len(nodes)
total_labels = sum(len(node_info['labels']) for node_info in nodes.values())
ready_nodes = sum(1 for node_info in nodes.values() if node_info['ready'])
ColoredOutput.print_header(f"\n📊 요약")
print(f"총 노드: {total_nodes}개 (Ready: {ready_nodes}개)")
print(f"총 라벨: {total_labels}개")
# 사용자 정의 라벨 통계
custom_label_nodes = sum(1 for node_info in nodes.values() if 'node' in node_info['labels'])
if custom_label_nodes > 0:
print(f"사용자 정의 라벨(node=) 적용 노드: {custom_label_nodes}개")
return True
def main():
"""메인 함수"""
parser = argparse.ArgumentParser(
description="K8s Node Labeling Management Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
사용 예시:
python node-labeling-mgmt.py data.csv add # CSV 파일로부터 라벨 추가
python node-labeling-mgmt.py data.xlsx add # Excel 파일로부터 라벨 추가
python node-labeling-mgmt.py node list # 현재 노드 라벨 정보 출력
"""
)
parser.add_argument('target', help='파일 경로 또는 "node"')
parser.add_argument('action', help='수행할 작업: "add" 또는 "list"')
args = parser.parse_args()
manager = NodeLabelingManager()
try:
if args.target == 'node' and args.action == 'list':
# 노드 라벨 목록 출력
success = manager.list_node_labels()
elif args.action == 'add':
# 파일에서 라벨 추가
success = manager.add_labels_from_file(args.target)
else:
ColoredOutput.print_error("잘못된 인수입니다. 도움말을 확인하세요.")
parser.print_help()
sys.exit(1)
sys.exit(0 if success else 1)
except KeyboardInterrupt:
ColoredOutput.print_warning("\n작업이 사용자에 의해 중단되었습니다.")
sys.exit(1)
except Exception as e:
ColoredOutput.print_error(f"예상치 못한 오류 발생: {e}")
sys.exit(1)
if name == "main":
main()