#!/usr/bin/env python3
"""
etcd-metrics.py v1.4 - Python 기반 etcd 메트릭 수집 및 컬러 테이블 출력
v1.4 변경사항:
v1.3 변경사항:
Author: System Administrator
Version: 1.4
"""
import subprocess
import json
import re
import os
import sys
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import logging
try:
import pandas as pd
from tabulate import tabulate
from rich.console import Console
from rich.table import Table
from rich.progress import Progress, BarColumn, TextColumn
from rich.panel import Panel
from rich.text import Text
from rich.columns import Columns
from rich import box
import colorama
from colorama import Fore, Back, Style
# Windows에서 색상 지원
colorama.init(autoreset=True)
except ImportError as e:
print(f"필수 모듈이 설치되지 않았습니다: {e}")
print("다음 명령어로 설치하세요:")
print("pip3 install pandas tabulate rich colorama")
sys.exit(1)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stderr)]
)
logger = logging.getLogger(name)
class ETCDMetricsCollector:
def init(self):
self.etcdctl_cmd = "etcdctl"
self.endpoints = ""
self.ca_file = ""
self.cert_file = ""
self.key_file = ""
self.etcd_config = {}
self.metrics = []
self.timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.console = Console()
# 상태별 색상 정의
self.status_colors = {
'success': 'green',
'current': 'bright_blue',
'configured': 'cyan',
'default': 'yellow',
'calculated': 'magenta',
'measured': 'bright_green',
'found': 'green',
'error': 'red',
'warning': 'yellow',
'info': 'blue'
}
def add_metric(self, metric: str, value: str, unit: str = "", status: str = "success", category: str = "general"):
"""메트릭을 리스트에 추가"""
self.metrics.append({
'Category': category,
'Metric': metric,
'Value': value,
'Unit': unit,
'Status': status,
'Timestamp': self.timestamp
})
def print_colored_log(self, message: str, level: str = "info"):
"""색상이 있는 로그 출력"""
colors = {
"info": Fore.BLUE,
"success": Fore.GREEN,
"warning": Fore.YELLOW,
"error": Fore.RED
}
icons = {
"info": "ℹ️",
"success": "✅",
"warning": "⚠️",
"error": "❌"
}
color = colors.get(level, Fore.WHITE)
icon = icons.get(level, "•")
print(f"{color}{icon} {message}{Style.RESET_ALL}", file=sys.stderr)
def format_bytes(self, bytes_value: int) -> str:
"""바이트를 읽기 쉬운 형태로 변환"""
if bytes_value >= 1073741824: # GB
return f"{bytes_value / 1073741824:.1f}GB"
elif bytes_value >= 1048576: # MB
return f"{bytes_value / 1048576:.1f}MB"
elif bytes_value >= 1024: # KB
return f"{bytes_value / 1024:.1f}KB"
else:
return f"{bytes_value}B"
def get_status_color(self, status: str, value: str = "") -> str:
"""상태에 따른 색상 반환"""
# 특별한 경우들
if status == "error" or "failed" in value.lower():
return "red"
elif status == "warning" or (value.replace('.','').isdigit() and float(value) > 80):
return "yellow"
elif status in ["success", "current", "found"]:
return "green"
elif status == "configured":
return "cyan"
elif status == "calculated":
return "magenta"
else:
return self.status_colors.get(status, "white")
def run_command(self, cmd: str, shell: bool = True) -> Tuple[str, int]:
"""명령어 실행 및 결과 반환"""
try:
result = subprocess.run(
cmd, shell=shell, capture_output=True, text=True, timeout=30
)
return result.stdout.strip(), result.returncode
except subprocess.TimeoutExpired:
logger.error(f"명령어 실행 타임아웃: {cmd}")
return "", 1
except Exception as e:
logger.error(f"명령어 실행 실패: {cmd}, 에러: {e}")
return "", 1
def find_etcd_process(self) -> Optional[str]:
"""ps 명령어로 etcd 프로세스 찾기"""
self.print_colored_log("etcd 프로세스 검색 중...", "info")
cmd = "ps -ef | grep etcd | grep -v grep"
output, returncode = self.run_command(cmd)
if returncode != 0 or not output:
self.print_colored_log("실행 중인 etcd 프로세스를 찾을 수 없습니다", "error")
return None
# 여러 프로세스가 있는 경우 첫 번째 선택
lines = output.split('\n')
for line in lines:
if 'etcd' in line and '--' in line:
self.print_colored_log(f"etcd 프로세스 발견: {line.split()[-1] if line.split() else 'unknown'}", "success")
return line
return None
def extract_cert_files_from_process(self, process_line: str) -> bool:
"""프로세스 라인에서 인증서 파일 경로 추출"""
self.print_colored_log("프로세스 파라미터에서 인증서 파일 추출 중...", "info")
# --etcd-cafile, --etcd-certfile, --etcd-keyfile 패턴 매칭
cert_patterns = {
'ca': [r'--etcd-cafile[=\s]+([^\s]+)', r'--trusted-ca-file[=\s]+([^\s]+)', r'--cacert[=\s]+([^\s]+)'],
'cert': [r'--etcd-certfile[=\s]+([^\s]+)', r'--cert-file[=\s]+([^\s]+)', r'--cert[=\s]+([^\s]+)'],
'key': [r'--etcd-keyfile[=\s]+([^\s]+)', r'--key-file[=\s]+([^\s]+)', r'--key[=\s]+([^\s]+)']
}
found_files = {}
for cert_type, patterns in cert_patterns.items():
for pattern in patterns:
match = re.search(pattern, process_line)
if match:
file_path = match.group(1).strip()
if os.path.isfile(file_path):
found_files[cert_type] = file_path
self.print_colored_log(f"{cert_type} 파일 발견: {file_path}", "success")
break
if len(found_files) == 3:
self.ca_file = found_files['ca']
self.cert_file = found_files['cert']
self.key_file = found_files['key']
self.add_metric("cert_ca_file", self.ca_file, "path", "found", "configuration")
self.add_metric("cert_cert_file", self.cert_file, "path", "found", "configuration")
self.add_metric("cert_key_file", self.key_file, "path", "found", "configuration")
return True
else:
self.print_colored_log(f"일부 인증서 파일을 찾을 수 없습니다. 발견된 파일: {found_files}", "warning")
return False
def extract_endpoints_from_process(self, process_line: str) -> bool:
"""프로세스 라인에서 엔드포인트 추출"""
self.print_colored_log("프로세스 파라미터에서 엔드포인트 추출 중...", "info")
endpoint_patterns = [
r'--listen-client-urls[=\s]+([^\s]+)',
r'--advertise-client-urls[=\s]+([^\s]+)',
r'--endpoints[=\s]+([^\s]+)'
]
for pattern in endpoint_patterns:
match = re.search(pattern, process_line)
if match:
urls = match.group(1).strip()
# 첫 번째 URL 사용
self.endpoints = urls.split(',')[0]
self.print_colored_log(f"엔드포인트 발견: {self.endpoints}", "success")
self.add_metric("endpoints", self.endpoints, "url", "found", "configuration")
return True
# 기본값 시도
default_endpoints = ["https://127.0.0.1:2379", "http://127.0.0.1:2379"]
for endpoint in default_endpoints:
if self.test_etcd_connection(endpoint):
self.endpoints = endpoint
self.add_metric("endpoints", self.endpoints, "url", "default", "configuration")
return True
return False
def extract_config_from_process(self, process_line: str):
"""프로세스 라인에서 설정값들 추출"""
self.print_colored_log("프로세스 파라미터에서 설정값 추출 중...", "info")
config_patterns = {
'quota_backend_bytes': [r'--quota-backend-bytes[=\s]+([^\s]+)'],
'max_request_bytes': [r'--max-request-bytes[=\s]+([^\s]+)'],
'snapshot_count': [r'--snapshot-count[=\s]+([^\s]+)'],
'wal_dir': [r'--wal-dir[=\s]+([^\s]+)'],
'data_dir': [r'--data-dir[=\s]+([^\s]+)'],
'heartbeat_interval': [r'--heartbeat-interval[=\s]+([^\s]+)'],
'election_timeout': [r'--election-timeout[=\s]+([^\s]+)']
}
# 기본값 설정
defaults = {
'quota_backend_bytes': '2147483648', # 2GB
'max_request_bytes': '1572864', # 1.5MB
'snapshot_count': '100000',
'wal_dir': '/var/lib/etcd/member/wal',
'data_dir': '/var/lib/etcd',
'heartbeat_interval': '100', # 100ms
'election_timeout': '1000' # 1000ms
}
units = {
'quota_backend_bytes': 'bytes',
'max_request_bytes': 'bytes',
'snapshot_count': 'transactions',
'wal_dir': 'path',
'data_dir': 'path',
'heartbeat_interval': 'ms',
'election_timeout': 'ms'
}
for config_key, patterns in config_patterns.items():
found = False
for pattern in patterns:
match = re.search(pattern, process_line)
if match:
value = match.group(1).strip()
self.etcd_config[config_key] = value
self.add_metric(config_key, value, units.get(config_key, ""), "configured", "configuration")
found = True
break
if not found:
# 기본값 사용
default_value = defaults.get(config_key, "unknown")
self.etcd_config[config_key] = default_value
self.add_metric(config_key, default_value, units.get(config_key, ""), "default", "configuration")
def test_etcd_connection(self, endpoint: str = None) -> bool:
"""etcd 연결 테스트"""
if endpoint:
test_endpoint = endpoint
else:
test_endpoint = self.endpoints
env = os.environ.copy()
env['ETCDCTL_API'] = '3'
env['ETCDCTL_ENDPOINTS'] = test_endpoint
if self.ca_file and self.cert_file and self.key_file:
env['ETCDCTL_CACERT'] = self.ca_file
env['ETCDCTL_CERT'] = self.cert_file
env['ETCDCTL_KEY'] = self.key_file
try:
result = subprocess.run(
[self.etcdctl_cmd, 'endpoint', 'health'],
env=env, capture_output=True, text=True, timeout=10
)
return result.returncode == 0
except:
return False
def setup_etcdctl_env(self):
"""etcdctl 환경변수 설정"""
os.environ['ETCDCTL_API'] = '3'
os.environ['ETCDCTL_ENDPOINTS'] = self.endpoints
if self.ca_file and self.cert_file and self.key_file:
os.environ['ETCDCTL_CACERT'] = self.ca_file
os.environ['ETCDCTL_CERT'] = self.cert_file
os.environ['ETCDCTL_KEY'] = self.key_file
def run_etcdctl(self, args: List[str]) -> Tuple[str, int]:
"""etcdctl 명령어 실행"""
cmd = [self.etcdctl_cmd] + args
return self.run_command(' '.join(cmd))
def collect_status_metrics(self):
"""etcd 상태 메트릭 수집"""
self.print_colored_log("etcd 상태 메트릭 수집 중...", "info")
output, returncode = self.run_etcdctl(['endpoint', 'status', '--write-out=json'])
if returncode != 0:
self.add_metric("status_collection", "failed", "error", "error", "status")
return
try:
status_data = json.loads(output)
if not status_data:
return
status = status_data[0].get('Status', {})
# DB 메트릭
db_size = status.get('dbSize', 0)
db_size_in_use = status.get('dbSizeInUse', 0)
db_size_limit = status.get('dbSizeLimit', 0)
# 바이트 값을 읽기 쉬운 형태로 표시
self.add_metric("db_size", f"{self.format_bytes(db_size)} ({db_size})", "bytes", "current", "database")
self.add_metric("db_size_in_use", f"{self.format_bytes(db_size_in_use)} ({db_size_in_use})", "bytes", "current", "database")
self.add_metric("db_size_limit", f"{self.format_bytes(db_size_limit)} ({db_size_limit})", "bytes", "current", "database")
# 사용률 계산
if db_size_limit > 0 and db_size > 0:
usage_percent = round((db_size / db_size_limit) * 100, 2)
status_type = "warning" if usage_percent > 80 else "current"
self.add_metric("db_usage_percent", str(usage_percent), "%", status_type, "database")
# 효율성 계산
if db_size > 0 and db_size_in_use > 0:
efficiency = round((db_size_in_use / db_size) * 100, 2)
status_type = "warning" if efficiency < 70 else "current"
self.add_metric("db_efficiency_percent", str(efficiency), "%", status_type, "database")
# 리더 정보
leader_id = status.get('leader', '0')
member_id = status.get('header', {}).get('member_id', '0')
raft_term = status.get('raftTerm', 0)
self.add_metric("leader_id", str(leader_id), "id", "current", "cluster")
self.add_metric("member_id", str(member_id), "id", "current", "cluster")
self.add_metric("raft_term", str(raft_term), "number", "current", "cluster")
is_leader = leader_id == member_id
has_leader = leader_id != '0'
leader_status = "success" if is_leader else "current"
cluster_status = "success" if has_leader else "error"
self.add_metric("is_leader", str(is_leader).lower(), "boolean", leader_status, "cluster")
self.add_metric("has_leader", str(has_leader).lower(), "boolean", cluster_status, "cluster")
# 버전 정보
version = status.get('version', 'unknown')
self.add_metric("etcd_version", version, "version", "current", "system")
# Revision 정보
revision = status.get('revision', 0)
raft_index = status.get('raftIndex', 0)
self.add_metric("revision", f"{revision:,}", "number", "current", "cluster")
self.add_metric("raft_index", f"{raft_index:,}", "number", "current", "cluster")
# 스냅샷 메트릭
snapshot_count = int(self.etcd_config.get('snapshot_count', 100000))
if revision > 0:
estimated_snapshots = revision // snapshot_count
since_last_snapshot = revision % snapshot_count
progress_percent = round((since_last_snapshot / snapshot_count) * 100, 2)
snapshot_status = "warning" if progress_percent > 90 else "current"
self.add_metric("estimated_snapshots", f"{estimated_snapshots:,}", "count", "calculated", "snapshots")
self.add_metric("since_last_snapshot", f"{since_last_snapshot:,}", "transactions", "calculated", "snapshots")
self.add_metric("snapshot_progress", str(progress_percent), "%", snapshot_status, "snapshots")
except json.JSONDecodeError as e:
self.print_colored_log(f"JSON 파싱 실패: {e}", "error")
self.add_metric("status_parsing", "failed", "error", "error", "status")collection", "failed", "error", "error", "status")
return
try:
status_data = json.loads(output)
if not status_data:
return
status = status_data[0].get('Status', {})
# DB 메트릭
db_size = status.get('dbSize', 0)
db_size_in_use = status.get('dbSizeInUse', 0)
db_size_limit = status.get('dbSizeLimit', 0)
self.add_metric("db_size", str(db_size), "bytes", "current", "database")
self.add_metric("db_size_in_use", str(db_size_in_use), "bytes", "current", "database")
self.add_metric("db_size_limit", str(db_size_limit), "bytes", "current", "database")
# 사용률 계산
if db_size_limit > 0 and db_size > 0:
usage_percent = round((db_size / db_size_limit) * 100, 2)
self.add_metric("db_usage_percent", str(usage_percent), "%", "calculated", "database")
# 효율성 계산
if db_size > 0 and db_size_in_use > 0:
efficiency = round((db_size_in_use / db_size) * 100, 2)
self.add_metric("db_efficiency_percent", str(efficiency), "%", "calculated", "database")
# 리더 정보
leader_id = status.get('leader', '0')
member_id = status.get('header', {}).get('member_id', '0')
raft_term = status.get('raftTerm', 0)
self.add_metric("leader_id", str(leader_id), "id", "current", "cluster")
self.add_metric("member_id", str(member_id), "id", "current", "cluster")
self.add_metric("raft_term", str(raft_term), "number", "current", "cluster")
is_leader = leader_id == member_id
has_leader = leader_id != '0'
self.add_metric("is_leader", str(is_leader).lower(), "boolean", "current", "cluster")
self.add_metric("has_leader", str(has_leader).lower(), "boolean", "current", "cluster")
# 버전 정보
version = status.get('version', 'unknown')
self.add_metric("etcd_version", version, "version", "current", "system")
# Revision 정보
revision = status.get('revision', 0)
raft_index = status.get('raftIndex', 0)
self.add_metric("revision", str(revision), "number", "current", "cluster")
self.add_metric("raft_index", str(raft_index), "number", "current", "cluster")
# 스냅샷 메트릭
snapshot_count = int(self.etcd_config.get('snapshot_count', 100000))
if revision > 0:
estimated_snapshots = revision // snapshot_count
since_last_snapshot = revision % snapshot_count
progress_percent = round((since_last_snapshot / snapshot_count) * 100, 2)
self.add_metric("estimated_snapshots", str(estimated_snapshots), "count", "calculated", "snapshots")
self.add_metric("since_last_snapshot", str(since_last_snapshot), "transactions", "calculated", "snapshots")
self.add_metric("snapshot_progress", str(progress_percent), "%", "calculated", "snapshots")
except json.JSONDecodeError as e:
logger.error(f"JSON 파싱 실패: {e}")
self.add_metric("status_parsing", "failed", "error", "error", "status")
def collect_member_metrics(self):
"""클러스터 멤버 메트릭 수집"""
logger.info("클러스터 멤버 메트릭 수집 중...")
output, returncode = self.run_etcdctl(['member', 'list', '--write-out=json'])
if returncode != 0:
self.add_metric("member_collection", "failed", "error", "error", "cluster")
return
try:
members_data = json.loads(output)
member_count = len(members_data.get('members', []))
self.add_metric("member_count", str(member_count), "count", "current", "cluster")
# 건강한 멤버 수 확인
health_output, health_returncode = self.run_etcdctl(['endpoint', 'health', '--write-out=json'])
if health_returncode == 0:
try:
health_data = json.loads(health_output)
healthy_count = sum(1 for endpoint in health_data if endpoint.get('health', False))
self.add_metric("healthy_members", str(healthy_count), "count", "current", "cluster")
except:
self.add_metric("healthy_members", "unknown", "error", "error", "cluster")
except json.JSONDecodeError as e:
logger.error(f"멤버 정보 JSON 파싱 실패: {e}")
self.add_metric("member_parsing", "failed", "error", "error", "cluster")
def collect_alarm_metrics(self):
"""알람 메트릭 수집"""
logger.info("알람 메트릭 수집 중...")
output, returncode = self.run_etcdctl(['alarm', 'list'])
if returncode != 0:
self.add_metric("alarm_collection", "failed", "error", "error", "alarms")
return
if not output.strip():
self.add_metric("active_alarms", "0", "count", "current", "alarms")
else:
alarm_count = len(output.strip().split('\n'))
self.add_metric("active_alarms", str(alarm_count), "count", "current", "alarms")
self.add_metric("alarm_details", output.replace('\n', ' | '), "text", "current", "alarms")
def collect_performance_metrics(self):
"""성능 메트릭 수집"""
logger.info("성능 메트릭 수집 중...")
# 간단한 PUT 성능 테스트
test_key = f"perf-test-{int(datetime.now().timestamp())}"
start_time = datetime.now()
output, returncode = self.run_etcdctl(['put', test_key, 'test-value'])
if returncode == 0:
end_time = datetime.now()
latency_ms = (end_time - start_time).total_seconds() * 1000
self.add_metric("put_latency", f"{latency_ms:.2f}", "ms", "measured", "performance")
# 테스트 키 삭제
self.run_etcdctl(['del', test_key])
else:
self.add_metric("put_latency", "failed", "error", "error", "performance")
def collect_version_metrics(self):
"""버전 메트릭 수집"""
logger.info("버전 메트릭 수집 중...")
# etcdctl 버전
output, returncode = self.run_command("etcdctl version")
if returncode == 0:
lines = output.split('\n')
if lines:
version_line = lines[0]
if 'etcdctl version:' in version_line:
version = version_line.split(':')[1].strip()
self.add_metric("etcdctl_version", version, "version", "current", "system")
def run_collection(self):
"""전체 메트릭 수집 실행"""
logger.info("etcd 메트릭 수집 시작")
# 1. etcd 프로세스 찾기
process_line = self.find_etcd_process()
if not process_line:
logger.error("etcd 프로세스를 찾을 수 없습니다")
return False
# 2. 인증서 파일 추출
if not self.extract_cert_files_from_process(process_line):
logger.error("인증서 파일을 찾을 수 없습니다")
return False
# 3. 엔드포인트 추출
if not self.extract_endpoints_from_process(process_line):
logger.error("etcd 엔드포인트를 찾을 수 없습니다")
return False
# 4. 설정값 추출
self.extract_config_from_process(process_line)
# 5. etcdctl 환경 설정
self.setup_etcdctl_env()
# 6. 연결 테스트
if not self.test_etcd_connection():
logger.error("etcd 연결 테스트 실패")
return False
logger.info("etcd 연결 성공")
# 7. 메트릭 수집
self.collect_status_metrics()
self.collect_member_metrics()
self.collect_alarm_metrics()
self.collect_performance_metrics()
self.collect_version_metrics()
logger.info("메트릭 수집 완료")
return True
def display_results(self):
"""결과를 테이블 형태로 출력"""
if not self.metrics:
print("수집된 메트릭이 없습니다.")
return
# DataFrame 생성
df = pd.DataFrame(self.metrics)
# 카테고리별로 그룹화하여 출력
categories = df['Category'].unique()
print("\n" + "="*80)
print(f"ETCD 메트릭 수집 결과 - {self.timestamp}")
print("="*80)
for category in sorted(categories):
category_df = df[df['Category'] == category].copy()
category_df = category_df.drop('Category', axis=1)
print(f"\n📊 {category.upper()} 메트릭")
print("-" * 60)
# 테이블 출력
table = tabulate(
category_df.values,
headers=category_df.columns,
tablefmt='grid',
stralign='left'
)
print(table)
# 요약 통계
print(f"\n📈 수집 요약")
print("-" * 30)
summary_stats = df.groupby(['Category', 'Status']).size().unstack(fill_value=0)
print(tabulate(summary_stats, headers=summary_stats.columns, tablefmt='grid'))
# CSV 파일로도 저장
csv_filename = f"etcd_metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
df.to_csv(csv_filename, index=False)
print(f"\n💾 결과가 {csv_filename} 파일로 저장되었습니다.")
def main():
"""메인 함수"""
print("ETCD 메트릭 수집 도구 v1.3")
print("Python 기반 자동 설정 탐지 및 테이블 출력")
print("-" * 50)
collector = ETCDMetricsCollector()
if collector.run_collection():
collector.display_results()
else:
print("❌ 메트릭 수집에 실패했습니다.")
sys.exit(1)
if name == "main":
main()