규모가 크고 중요한 설계 문서이므로, 전체 아키텍처와 핵심 코드를 포함한 단일 마크다운 파일로 작성합니다.전체 아키텍처 설계서와 핵심 코드를 담은 문서를 작성했습니다.
8개 핵심 코드 모듈이 포함되어 있으며, 각각의 역할과 연결 구조는 다음과 같습니다.
데이터 수집 → 정제 → 저장 흐름:
incremental_export.py — Confluence CQL 증분 추출, 파일 해시 비교로 중복 방지, MinIO raw/ 저장processor.py — LangChain + 로컬 LLM(vLLM)으로 분류·링크수정·YAML 메타데이터 자동 생성, 중복 문서 감지 및 deprecated 처리sync_to_git.sh — MinIO wiki/ → Git 동기화, lifecycle_manager 연계 호출lifecycle_manager.py — 180일 미검증 시 Slack 알림, 365일 경과 시 archive/ 자동 이동인덱싱 및 검색:
indexer.py — RecordManager로 증분 인덱싱(변경분만), 삭제된 문서 자동 정리, Milvus Hybrid SearchAIOps 모니터링:
aiops_agent.py — LangGraph 6-Node 워크플로우. 신뢰도 낮으면 자동 재검색 루프, K8s 실시간 컨텍스트 수집, 근본 원인 분석 → 해결책 → 리스크 예측까지 순차 실행api_server.py — FastAPI 서버. Alertmanager Webhook 수신, Lens에서 HTTP 호출, 정기 리스크 스캔 엔드포인트 제공argo-pipeline.yaml — 전체 6단계 파이프라인을 CronWorkflow로 오케스트레이션생애주기 관리는 YAML status 필드(draft → active → deprecated → archive)를 중심으로 전 단계에 걸쳐 반영했고, GitLab 전환 대비 .gitlab-ci.yml도 포함되어 있습니다.
===
대상 환경: K8s 1,000노드 클러스터 / Cilium / MinIO AIStor / Airgapped / Bitbucket → GitLab 전환 예정
목표: Confluence 문서를 LLM Wiki로 자동 변환하고, 실시간 K8s 상태와 결합하여 장애 원인 분석·해결책 제시·리스크 예측이 가능한 완전 자동화 AIOps 시스템 구축
┌─────────────────────────────────────────────────────────────────────────┐
│ DATA INGESTION LAYER │
│ Confluence ──► K8s CronJob(증분추출) ──► MinIO AIStor [raw/] │
│ Web Vendor Docs ──► Firecrawl(Self-hosted) ──► MinIO AIStor [raw/] │
└──────────────────────────────┬──────────────────────────────────────────┘
│ Argo Workflows (Event-Driven)
┌──────────────────────────────▼──────────────────────────────────────────┐
│ INTELLIGENCE PROCESSING LAYER │
│ MinIO [raw/] ──► LangChain Processor │
│ ├─ Link Resolver (Internal Links 수정) │
│ ├─ LLM Classifier (vLLM/Ollama - 폐쇄망) │
│ ├─ Metadata YAML 자동 생성 │
│ └─ Lifecycle Status 관리 │
└──────────────────────────────┬──────────────────────────────────────────┘
│
┌──────────────────────────────▼──────────────────────────────────────────┐
│ KNOWLEDGE STORAGE LAYER │
│ Git (Bitbucket/GitLab) │
│ ├─ /raw/ ← MinIO에서 가져온 원본 MD │
│ └─ /wiki/ ← LLM이 정제한 구조화 지식 │
│ ├─ /active/Architecture, SOP, Library, Reports │
│ └─ /archive/ ← 폐기된 구버전 문서 │
└──────────┬──────────────────────────────────────────────────────────────┘
│ GitHub Actions / GitLab CI (wiki/** push trigger)
┌──────────▼──────────────────────────────────────────────────────────────┐
│ VECTOR INDEX LAYER │
│ LangChain Indexing API + RecordManager ──► Milvus (K8s Operator) │
│ ├─ Hybrid Search: BM25(Keyword) + Vector(Semantic) │
│ └─ Metadata Filter: status, category, tech_stack, last_verified │
└──────────┬──────────────────────────────────────────────────────────────┘
│
┌──────────▼──────────────────────────────────────────────────────────────┐
│ AIOPS MONITORING LAYER │
│ K8s 실시간 이벤트 (Prometheus/Alertmanager/K8s API) │
│ ──► LangGraph Agent │
│ ├─ Node 1: 이벤트 수집 & 증상 분석 │
│ ├─ Node 2: RAG 검색 (Milvus - 관련 SOP/Library) │
│ ├─ Node 3: 근본 원인 분석 (Root Cause Analysis) │
│ ├─ Node 4: 해결책 생성 & 검증 │
│ └─ Node 5: 리스크 예측 (Proactive Risk Detection) │
│ ──► Lens (로컬 PC) + 사내 LLM API 연동 │
└─────────────────────────────────────────────────────────────────────────┘
모든 문서는 아래 4단계 상태를 가지며, LLM과 자동화 파이프라인이 상태를 관리합니다.
draft ──► active ──► deprecated ──► archive
│
(180일 미검증 시 자동 알림)
│
(365일 후 archive 자동 이동)
---
# === 식별 및 분류 ===
id: "SOP-CIL-001" # 문서 고유 ID (자동 생성)
title: "Cilium BGP Control Plane 장애 대응"
category: "SOP" # SOP | Library | Architecture | Reports
tech_stack: ["Cilium", "BGP", "K8s"]
sub_category: "Network"
# === 생애주기 관리 (Lifecycle) ===
status: "active" # draft | active | deprecated | archive
created_at: "2026-01-15"
last_verified_at: "2026-04-28" # 기술 검증일 (180일 미갱신 시 알림)
verified_by: "platform-lead"
applies_to_version: "cilium>=1.15" # 유효 버전 범위
expires_at: null # 명시적 만료일 (null이면 자동 관리)
# === 출처 및 추적 ===
source: "confluence" # confluence | web | manual | ai-generated
source_url: "https://confluence.internal/pages/12345"
auto_classified: true
pipeline_version: "v1.2"
# === 운영 맥락 (AIOps) ===
severity: "Critical" # Critical | High | Medium | Low
environment: ["production", "staging"]
target_audience: "platform-engineer"
contains_code: true
code_languages: ["yaml", "shell"]
related_docs: ["ARCH-NET-002", "LIB-CIL-005"]
# === 검색 최적화 ===
tags: ["bgp-peering", "network-failure", "cilium", "troubleshooting"]
summary: "Cilium BGP 피어링 실패 시 원인 진단 및 단계별 복구 절차"
---
incremental_export.py)#!/usr/bin/env python3
"""
Confluence 24시간 증분 추출 → MinIO AIStor 저장
K8s CronJob에서 매일 새벽 02:00 실행
"""
import os
import datetime
import hashlib
import json
from atlassian import Confluence
import html2text
from minio import Minio
# ── 환경 변수 ──────────────────────────────────────────────
CONFLUENCE_URL = os.getenv("CONFLUENCE_URL")
CONFLUENCE_USER = os.getenv("CONFLUENCE_USER")
CONFLUENCE_TOKEN= os.getenv("CONFLUENCE_TOKEN")
PARENT_PAGE_ID = os.getenv("PARENT_PAGE_ID")
MINIO_URL = os.getenv("MINIO_URL")
MINIO_ACCESS = os.getenv("MINIO_ACCESS_KEY")
MINIO_SECRET = os.getenv("MINIO_SECRET_KEY")
BUCKET_NAME = "confluence-wiki"
EXPORT_DIR = "/tmp/confluence_export"
HASH_STORE_PATH = "/tmp/hash_store.json"
confluence = Confluence(url=CONFLUENCE_URL, username=CONFLUENCE_USER, password=CONFLUENCE_TOKEN)
minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=True)
h = html2text.HTML2Text()
h.ignore_links = False
h.body_width = 0 # 줄바꿈 없이 원형 유지
def load_hash_store():
"""이전 실행의 파일 해시값 로드 (중복 처리 방지)"""
try:
resp = minio_client.get_object(BUCKET_NAME, "metadata/hash_store.json")
return json.loads(resp.read())
except Exception:
return {}
def save_hash_store(store: dict):
data = json.dumps(store).encode()
from io import BytesIO
minio_client.put_object(BUCKET_NAME, "metadata/hash_store.json", BytesIO(data), len(data))
def extract_page(page_id: str, title: str) -> str:
"""Confluence 페이지 → Markdown 변환"""
detail = confluence.get_page_by_id(page_id, expand='body.storage,version,ancestors')
html_body = detail['body']['storage']['value']
version = detail['version']['number']
# 첨부 이미지 수집
attachments = confluence.get_attachments_from_content(page_id)
attachment_map = {}
for att in attachments.get('results', []):
att_title = att['title']
att_url = f"{CONFLUENCE_URL}{att['_links']['download']}"
attachment_map[att_title] = f"./assets/{att_title}"
md_content = h.handle(html_body)
# 첨부 이미지 경로를 상대 경로로 교체
for original_url, relative_path in attachment_map.items():
md_content = md_content.replace(original_url, relative_path)
# YAML Frontmatter 초안 삽입 (LLM이 나중에 완성)
safe_title = title.replace("/", "-").replace(" ", "-").lower()
frontmatter = f"""---
id: "PENDING-{page_id}"
title: "{title}"
source: "confluence"
source_page_id: "{page_id}"
source_version: {version}
status: "draft"
auto_classified: false
created_at: "{datetime.date.today()}"
last_verified_at: "{datetime.date.today()}"
---
"""
return frontmatter + md_content
def run_incremental_export():
os.makedirs(EXPORT_DIR, exist_ok=True)
hash_store = load_hash_store()
# 24시간 이내 변경 페이지만 CQL로 추출
cql = f'ancestor = {PARENT_PAGE_ID} AND lastModified >= now("-1d") ORDER BY lastModified DESC'
results = confluence.cql(cql).get('results', [])
print(f"[INFO] Found {len(results)} updated pages in last 24h")
exported_count = 0
for item in results:
page = item.get('content', {})
page_id = page.get('id')
title = page.get('title', 'untitled').replace("/", "-")
try:
md_content = extract_page(page_id, title)
content_hash = hashlib.sha256(md_content.encode()).hexdigest()
# 해시 비교 → 변경된 파일만 처리
if hash_store.get(page_id) == content_hash:
print(f"[SKIP] No change: {title}")
continue
# MinIO raw/ 버킷에 저장 (날짜별 경로)
date_prefix = datetime.date.today().strftime("%Y-%m-%d")
minio_path = f"raw/{date_prefix}/{title}.md"
data = md_content.encode('utf-8')
from io import BytesIO
minio_client.put_object(BUCKET_NAME, minio_path, BytesIO(data), len(data),
content_type="text/markdown")
hash_store[page_id] = content_hash
exported_count += 1
print(f"[OK] Exported: {title} → {minio_path}")
except Exception as e:
print(f"[ERROR] Failed to export {title}: {e}")
save_hash_store(hash_store)
print(f"[DONE] Exported {exported_count} pages to MinIO")
if __name__ == "__main__":
run_incremental_export()
processor.py)#!/usr/bin/env python3
"""
MinIO raw/ → LLM 분류/링크수정/메타데이터 생성 → wiki/ 폴더 구조 출력
Argo Workflows langchain-processor Pod 내부에서 실행
"""
import os
import re
import json
import uuid
import datetime
from minio import Minio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List, Optional
import yaml
MINIO_URL = os.getenv("MINIO_URL")
MINIO_ACCESS = os.getenv("MINIO_ACCESS_KEY")
MINIO_SECRET = os.getenv("MINIO_SECRET_KEY")
LLM_ENDPOINT = os.getenv("LLM_ENDPOINT", "http://vllm-service.ai:8000/v1")
BUCKET_NAME = "confluence-wiki"
minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=True)
# 폐쇄망: vLLM이 OpenAI 호환 API 제공
llm = ChatOpenAI(
model="llama-3-70b-instruct",
base_url=LLM_ENDPOINT,
api_key="internal",
temperature=0
)
# ── Pydantic 출력 스키마 ─────────────────────────────────────
class DocMetadata(BaseModel):
id_suffix: str = Field(description="문서 고유 식별자 접미사 (영문 대문자 3자리, 예: NET, K8S, MIO)")
category: str = Field(description="SOP | Library | Architecture | Reports")
sub_category: str = Field(description="Network | Storage | Compute | Security | Monitoring")
tech_stack: List[str] = Field(description="관련 기술 스택 리스트 (예: ['Cilium','BGP','K8s'])")
severity: str = Field(description="Critical | High | Medium | Low")
summary: str = Field(description="문서 핵심 내용 한 문장 요약 (한국어)")
tags: List[str] = Field(description="검색용 핵심 키워드 리스트 (소문자, kebab-case)")
applies_to_version: Optional[str] = Field(description="유효 버전 (예: 'cilium>=1.15'), 없으면 null")
contains_code: bool = Field(description="코드 블록 포함 여부")
code_languages: List[str] = Field(description="포함된 코드 언어 리스트")
optimized_filename: str = Field(description="파일명 (영문 kebab-case, .md 제외)")
related_concepts: List[str] = Field(description="관련 인프라 컴포넌트 또는 개념 리스트")
structured_llm = llm.with_structured_output(DocMetadata)
# ── 내부 링크 리졸버 ──────────────────────────────────────────
def resolve_internal_links(content: str, all_filenames: list) -> str:
"""[[WikiLink]] → [WikiLink](./WikiLink.md) 변환 (파일 존재 확인 포함)"""
def replace_link(match):
link_text = match.group(1)
safe_name = link_text.replace(" ", "-").lower()
# 파일 목록에서 존재 여부 확인
matched = next((f for f in all_filenames if safe_name in f.lower()), None)
if matched:
return f"[{link_text}](./{matched})"
return f"[{link_text}](./{safe_name}.md)" # 미래 생성 파일 대비
# [[Link|Alias]] 형식도 처리
content = re.sub(r'\[\[([^|\]]+)\|([^\]]+)\]\]',
lambda m: f"[{m.group(2)}](./{m.group(1).replace(' ','-').lower()}.md)",
content)
# [[Link]] 형식 처리
content = re.sub(r'\[\[([^\]]+)\]\]', replace_link, content)
return content
# ── 중복 문서 감지 ────────────────────────────────────────────
def check_duplicate_in_wiki(new_content: str, wiki_path: str) -> Optional[str]:
"""기존 wiki 파일과 의미적 유사도 체크 (간단 버전: 제목+핵심키워드 비교)"""
try:
objects = minio_client.list_objects(BUCKET_NAME, prefix="wiki/active/", recursive=True)
new_words = set(re.findall(r'\b[A-Za-z]{4,}\b', new_content[:2000]))
for obj in objects:
if not obj.object_name.endswith('.md'):
continue
resp = minio_client.get_object(BUCKET_NAME, obj.object_name)
existing = resp.read().decode('utf-8')
existing_words = set(re.findall(r'\b[A-Za-z]{4,}\b', existing[:2000]))
# 70% 이상 단어 겹침 → 중복 의심
if len(new_words) > 0:
overlap = len(new_words & existing_words) / len(new_words | existing_words)
if overlap > 0.7:
return obj.object_name
except Exception:
pass
return None
# ── 메인 분류 프로세서 ────────────────────────────────────────
def process_raw_documents():
"""MinIO raw/[today]/ 의 모든 MD 파일을 처리하여 wiki/active/ 로 저장"""
today = datetime.date.today().strftime("%Y-%m-%d")
prefix = f"raw/{today}/"
objects = list(minio_client.list_objects(BUCKET_NAME, prefix=prefix, recursive=True))
all_filenames = [os.path.basename(obj.object_name) for obj in objects]
print(f"[INFO] Processing {len(objects)} files from {prefix}")
category_counter = {"SOP": 0, "Library": 0, "Architecture": 0, "Reports": 0}
for obj in objects:
if not obj.object_name.endswith('.md'):
continue
try:
# 1. 파일 읽기
resp = minio_client.get_object(BUCKET_NAME, obj.object_name)
raw_content = resp.read().decode('utf-8')
# 기존 YAML Frontmatter 제거 (재생성 예정)
body = re.sub(r'^---\n.*?\n---\n', '', raw_content, flags=re.DOTALL).strip()
# 2. 내부 링크 수정
refined = resolve_internal_links(body, all_filenames)
# 3. LLM 메타데이터 분류
prompt = ChatPromptTemplate.from_template("""
당신은 플랫폼 엔지니어링 지식 관리 전문가입니다.
다음 인프라 기술 문서를 분석하여 정확한 메타데이터를 추출하세요.
문서 컨텍스트: K8s 1,000노드 클러스터, Cilium CNI, MinIO AIStor 환경
문서 내용 (앞 2000자):
{content}
""")
chain = prompt | structured_llm
meta: DocMetadata = chain.invoke({"content": refined[:2000]})
# 4. 카테고리별 ID 생성
cat_short = {"SOP": "SOP", "Library": "LIB", "Architecture": "ARCH", "Reports": "RPT"}
cat_key = cat_short.get(meta.category, "DOC")
category_counter[meta.category] = category_counter.get(meta.category, 0) + 1
doc_id = f"{cat_key}-{meta.id_suffix}-{category_counter.get(meta.category, 1):03d}"
# 5. 완전한 YAML Frontmatter 생성
frontmatter = {
"id": doc_id,
"title": meta.optimized_filename.replace("-", " ").title(),
"category": meta.category,
"sub_category": meta.sub_category,
"tech_stack": meta.tech_stack,
"status": "active",
"severity": meta.severity,
"summary": meta.summary,
"tags": meta.tags,
"applies_to_version": meta.applies_to_version,
"contains_code": meta.contains_code,
"code_languages": meta.code_languages,
"related_docs": [],
"related_concepts": meta.related_concepts,
"source": "confluence",
"auto_classified": True,
"created_at": str(datetime.date.today()),
"last_verified_at": str(datetime.date.today()),
"verified_by": "auto-pipeline",
"environment": ["production"],
"expires_at": None,
"pipeline_version": "v1.2"
}
final_content = f"---\n{yaml.dump(frontmatter, allow_unicode=True, default_flow_style=False)}---\n\n{refined}"
# 6. 중복 체크
duplicate_path = check_duplicate_in_wiki(refined, obj.object_name)
if duplicate_path:
# 기존 파일을 deprecated로 상태 변경
print(f"[WARN] Duplicate detected: {duplicate_path} → marking as deprecated")
_deprecate_document(duplicate_path)
# 7. wiki/active/[Category]/[SubCat]/ 경로로 저장
target_path = f"wiki/active/{meta.category}/{meta.sub_category}/{meta.optimized_filename}.md"
data = final_content.encode('utf-8')
from io import BytesIO
minio_client.put_object(BUCKET_NAME, target_path, BytesIO(data), len(data),
content_type="text/markdown")
print(f"[OK] Classified: {obj.object_name} → {target_path} [{doc_id}]")
except Exception as e:
print(f"[ERROR] Failed to process {obj.object_name}: {e}")
import traceback; traceback.print_exc()
def _deprecate_document(minio_path: str):
"""기존 문서의 status를 deprecated로 변경"""
try:
resp = minio_client.get_object(BUCKET_NAME, minio_path)
content = resp.read().decode('utf-8')
content = re.sub(r'status: "active"', 'status: "deprecated"', content)
data = content.encode('utf-8')
from io import BytesIO
minio_client.put_object(BUCKET_NAME, minio_path, BytesIO(data), len(data))
except Exception as e:
print(f"[ERROR] Failed to deprecate {minio_path}: {e}")
if __name__ == "__main__":
process_raw_documents()
sync_to_git.sh)#!/bin/bash
# MinIO wiki/ 내용을 Git 저장소에 동기화
# Argo Workflows git-sync Pod에서 실행
set -euo pipefail
GIT_REPO_URL="https://${GIT_TOKEN}@${GIT_HOST}/platform/llm-wiki.git"
CLONE_DIR="/workspace/llm-wiki"
BUCKET="confluence-wiki"
echo "[STEP 1] Git clone or pull"
if [ ! -d "$CLONE_DIR/.git" ]; then
git clone "$GIT_REPO_URL" "$CLONE_DIR"
fi
cd "$CLONE_DIR"
git config user.email "aiops-bot@internal.com"
git config user.name "AIOps Pipeline Bot"
git pull origin main --rebase
echo "[STEP 2] Sync MinIO wiki/ → local"
mc alias set myminio "${MINIO_URL}" "${MINIO_ACCESS_KEY}" "${MINIO_SECRET_KEY}"
mc mirror myminio/confluence-wiki/wiki/ "$CLONE_DIR/wiki/" --overwrite
echo "[STEP 3] Archive lifecycle check (365일 초과 deprecated 문서 자동 이동)"
python3 /app/lifecycle_manager.py --action archive --clone-dir "$CLONE_DIR"
echo "[STEP 4] Git commit & push"
git add wiki/
CHANGED=$(git diff --cached --name-only | wc -l)
if [ "$CHANGED" -gt "0" ]; then
COMMIT_MSG="Auto-sync: ${CHANGED} docs updated [$(date +%Y-%m-%d %H:%M)] via AIOps Pipeline"
git commit -m "$COMMIT_MSG"
git push origin main
echo "[OK] Pushed $CHANGED changed files to Git"
else
echo "[SKIP] No changes detected, nothing to push"
fi
echo "[DONE] Git sync completed"
lifecycle_manager.py)#!/usr/bin/env python3
"""
문서 생애주기 자동 관리
- 180일 미검증 → Slack 알림
- 365일 deprecated → archive 자동 이동
- Argo CronJob 또는 Git sync 후 호출
"""
import os
import re
import yaml
import datetime
import argparse
import requests
from pathlib import Path
SLACK_WEBHOOK = os.getenv("SLACK_WEBHOOK_URL")
ARCHIVE_DAYS = 365
ALERT_DAYS = 180
def parse_frontmatter(content: str) -> dict:
match = re.match(r'^---\n(.*?)\n---', content, re.DOTALL)
if match:
try:
return yaml.safe_load(match.group(1)) or {}
except Exception:
return {}
return {}
def update_frontmatter(content: str, updates: dict) -> str:
match = re.match(r'^(---\n)(.*?)(\n---)', content, re.DOTALL)
if not match:
return content
try:
fm = yaml.safe_load(match.group(2)) or {}
fm.update(updates)
new_fm = yaml.dump(fm, allow_unicode=True, default_flow_style=False).strip()
return f"---\n{new_fm}\n---{content[match.end():]}"
except Exception:
return content
def send_slack_alert(doc_id: str, title: str, last_verified: str, file_path: str):
if not SLACK_WEBHOOK:
print(f"[ALERT] {doc_id} ({title}): 검증 기간 초과 - {last_verified}")
return
msg = {
"blocks": [
{"type": "section", "text": {"type": "mrkdwn",
"text": f":warning: *문서 검증 기간 초과 알림*\n"
f"*ID:* `{doc_id}`\n*제목:* {title}\n"
f"*마지막 검증:* {last_verified}\n*경로:* `{file_path}`"}},
{"type": "actions", "elements": [
{"type": "button", "text": {"type": "plain_text", "text": "검증 완료로 표시"},
"style": "primary", "value": doc_id},
{"type": "button", "text": {"type": "plain_text", "text": "아카이브"},
"style": "danger", "value": f"archive:{doc_id}"}
]}
]
}
requests.post(SLACK_WEBHOOK, json=msg)
def run_lifecycle_check(clone_dir: str, action: str = "check"):
today = datetime.date.today()
wiki_path = Path(clone_dir) / "wiki" / "active"
archived_count = 0
alerted_count = 0
for md_file in wiki_path.rglob("*.md"):
content = md_file.read_text(encoding='utf-8')
fm = parse_frontmatter(content)
if not fm:
continue
status = fm.get("status", "active")
last_verified_str = str(fm.get("last_verified_at", ""))
doc_id = fm.get("id", "UNKNOWN")
title = fm.get("title", str(md_file.name))
try:
last_verified = datetime.date.fromisoformat(last_verified_str)
except (ValueError, TypeError):
last_verified = today - datetime.timedelta(days=ALERT_DAYS + 1)
age_days = (today - last_verified).days
# 365일 초과 deprecated → archive 폴더로 자동 이동
if status == "deprecated" and age_days >= ARCHIVE_DAYS and action == "archive":
archive_target = Path(clone_dir) / "wiki" / "archive" / md_file.relative_to(wiki_path)
archive_target.parent.mkdir(parents=True, exist_ok=True)
updated = update_frontmatter(content, {
"status": "archive",
"archived_at": str(today),
"archive_reason": f"Auto-archived after {age_days} days without verification"
})
archive_target.write_text(updated, encoding='utf-8')
md_file.unlink()
archived_count += 1
print(f"[ARCHIVE] {doc_id}: {md_file.name} → archive/")
# 180일 초과 active → Slack 경고
elif status == "active" and age_days >= ALERT_DAYS:
if action in ("check", "alert"):
send_slack_alert(doc_id, title, last_verified_str, str(md_file))
alerted_count += 1
print(f"[LIFECYCLE] archived={archived_count}, alerted={alerted_count}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--action", choices=["check", "alert", "archive"], default="check")
parser.add_argument("--clone-dir", default="/workspace/llm-wiki")
args = parser.parse_args()
run_lifecycle_check(args.clone_dir, args.action)
indexer.py)#!/usr/bin/env python3
"""
Git wiki/ → Milvus Vector DB 증분 인덱싱
LangChain RecordManager로 중복/삭제 자동 동기화
Hybrid Search: Dense(Embedding) + Sparse(BM25) 지원
"""
import os
import re
from pathlib import Path
from langchain_community.document_loaders import UnstructuredMarkdownLoader
from langchain_text_splitters import MarkdownHeaderTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings # 폐쇄망용
from langchain_community.vectorstores import Milvus
from langchain.indexes import SQLRecordManager, index
from langchain_core.documents import Document
import yaml
WIKI_DIR = os.getenv("WIKI_DIR", "/workspace/llm-wiki/wiki/active")
MILVUS_HOST = os.getenv("MILVUS_HOST", "milvus-service.storage")
MILVUS_PORT = os.getenv("MILVUS_PORT", "19530")
MODEL_PATH = os.getenv("EMBEDDING_MODEL_PATH", "/models/bge-m3") # 폐쇄망 로컬 모델
COLLECTION = "platform_ops_wiki"
DB_URL = os.getenv("RECORD_MANAGER_DB", "sqlite:////data/record_manager.db")
# ── 임베딩 모델 (폐쇄망: HuggingFace 로컬 모델) ──────────────
embeddings = HuggingFaceEmbeddings(
model_name=MODEL_PATH,
model_kwargs={'device': 'cuda'},
encode_kwargs={'normalize_embeddings': True}
)
# ── Vector Store ──────────────────────────────────────────────
vector_store = Milvus(
embedding_function=embeddings,
collection_name=COLLECTION,
connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT},
index_params={"metric_type": "COSINE"},
)
# ── RecordManager: 중복/삭제 자동 동기화 ──────────────────────
record_manager = SQLRecordManager(namespace=f"milvus/{COLLECTION}", db_url=DB_URL)
record_manager.create_schema()
# ── 마크다운 헤더 기반 청킹 ────────────────────────────────────
HEADERS_TO_SPLIT = [("#", "h1"), ("##", "h2"), ("###", "h3")]
text_splitter = MarkdownHeaderTextSplitter(HEADERS_TO_SPLIT, strip_headers=False)
def parse_frontmatter(content: str) -> dict:
match = re.match(r'^---\n(.*?)\n---', content, re.DOTALL)
if match:
try:
return yaml.safe_load(match.group(1)) or {}
except Exception:
return {}
return {}
def load_documents(wiki_dir: str) -> list[Document]:
"""wiki/active/ 모든 MD 파일 → LangChain Document 리스트 (메타데이터 포함)"""
docs = []
for md_path in Path(wiki_dir).rglob("*.md"):
content = md_path.read_text(encoding='utf-8')
fm = parse_frontmatter(content)
# status가 active인 문서만 인덱싱
if fm.get("status") not in ("active", None):
continue
# YAML Frontmatter 제거 후 본문만 청킹
body = re.sub(r'^---\n.*?\n---\n', '', content, flags=re.DOTALL).strip()
chunks = text_splitter.split_text(body)
for chunk in chunks:
# 청킹된 각 조각에 문서 메타데이터 부착
chunk.metadata.update({
"doc_id": fm.get("id", ""),
"category": fm.get("category", ""),
"sub_category": fm.get("sub_category", ""),
"tech_stack": ",".join(fm.get("tech_stack", [])),
"severity": fm.get("severity", ""),
"status": fm.get("status", "active"),
"tags": ",".join(fm.get("tags", [])),
"summary": fm.get("summary", ""),
"last_verified":str(fm.get("last_verified_at", "")),
"file_path": str(md_path.relative_to(wiki_dir)),
"source": fm.get("source", ""),
})
docs.append(chunk)
return docs
def run_indexing():
print(f"[INFO] Loading documents from {WIKI_DIR}")
docs = load_documents(WIKI_DIR)
print(f"[INFO] Total chunks to index: {len(docs)}")
# cleanup="incremental": 변경된 문서만 업데이트, 삭제된 문서는 DB에서도 삭제
result = index(
docs,
record_manager,
vector_store,
cleanup="incremental",
source_id_key="doc_id"
)
print(f"[DONE] Indexing result: {result}")
if __name__ == "__main__":
run_indexing()
aiops_agent.py)#!/usr/bin/env python3
"""
K8s 실시간 이벤트 + RAG(Wiki) 기반 AIOps 에이전트
기능:
1. 이벤트 수집 & 증상 분석
2. 관련 SOP/Library RAG 검색
3. 근본 원인 분석 (Root Cause Analysis)
4. 해결책 생성 & 실행 가능 명령어 제시
5. 리스크 예측 (Proactive Risk Detection)
"""
import os
import json
from typing import TypedDict, Annotated, List, Optional
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Milvus
from kubernetes import client, config
LLM_ENDPOINT = os.getenv("LLM_ENDPOINT", "http://vllm-service.ai:8000/v1")
MILVUS_HOST = os.getenv("MILVUS_HOST", "milvus-service.storage")
MODEL_PATH = os.getenv("EMBEDDING_MODEL_PATH", "/models/bge-m3")
COLLECTION = "platform_ops_wiki"
llm = ChatOpenAI(model="llama-3-70b-instruct", base_url=LLM_ENDPOINT, api_key="internal", temperature=0)
embeddings = HuggingFaceEmbeddings(model_name=MODEL_PATH, model_kwargs={'device': 'cpu'})
vector_store = Milvus(
embedding_function=embeddings,
collection_name=COLLECTION,
connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT},
)
# ── K8s 클라이언트 초기화 ─────────────────────────────────────
try:
config.load_incluster_config()
except Exception:
config.load_kube_config()
k8s_core = client.CoreV1Api()
k8s_apps = client.AppsV1Api()
# ── 에이전트 상태 정의 ────────────────────────────────────────
class AIOpsState(TypedDict):
messages: Annotated[list, add_messages]
event_raw: str # 원본 K8s 이벤트/알람
symptoms: str # 분석된 증상
search_query: str # RAG 검색 쿼리
retrieved_docs: List[dict] # 검색된 관련 문서
root_cause: str # 근본 원인 분석 결과
solution: str # 제안된 해결책 + 명령어
risk_prediction: str # 예측된 리스크
k8s_context: dict # K8s 클러스터 실시간 상태
retry_count: int # 재시도 횟수 (루프 제어)
confidence: float # 분석 신뢰도 (0~1)
# ── Node 1: K8s 컨텍스트 수집 ──────────────────────────────────
def collect_k8s_context(state: AIOpsState) -> AIOpsState:
"""이벤트와 관련된 K8s 실시간 상태 수집"""
ctx = {}
event_text = state["event_raw"]
try:
# 네임스페이스 추출 시도
ns_match = __import__('re').search(r'namespace[=: ]+(\S+)', event_text, re.IGNORECASE)
namespace = ns_match.group(1) if ns_match else "kube-system"
# 최근 이벤트 수집
events = k8s_core.list_namespaced_event(
namespace=namespace,
limit=20,
field_selector="type=Warning"
)
ctx["recent_warnings"] = [
{"reason": e.reason, "message": e.message, "object": e.involved_object.name}
for e in events.items
]
# 비정상 파드 수집
pods = k8s_core.list_namespaced_pod(namespace=namespace)
ctx["unhealthy_pods"] = [
{"name": p.metadata.name, "phase": p.status.phase,
"conditions": [c.type for c in (p.status.conditions or []) if not c.status == "True"]}
for p in pods.items
if p.status.phase not in ("Running", "Succeeded")
]
# 노드 상태
nodes = k8s_core.list_node()
ctx["node_summary"] = {
"total": len(nodes.items),
"not_ready": [n.metadata.name for n in nodes.items
if not any(c.type == "Ready" and c.status == "True"
for c in (n.status.conditions or []))]
}
except Exception as e:
ctx["error"] = str(e)
state["k8s_context"] = ctx
return state
# ── Node 2: 증상 분석 ──────────────────────────────────────────
def analyze_symptoms(state: AIOpsState) -> AIOpsState:
"""이벤트 + K8s 컨텍스트 → 증상 구조화"""
prompt = f"""
당신은 K8s 인프라 전문가입니다. 다음 이벤트와 클러스터 상태를 분석하여 정확한 증상을 파악하세요.
## 원본 이벤트/알람:
{state['event_raw']}
## 실시간 K8s 상태:
{json.dumps(state['k8s_context'], ensure_ascii=False, indent=2)}
다음 형식으로 분석하세요:
1. **주요 증상**: 가장 명확한 이상 현상
2. **영향 범위**: 영향받는 컴포넌트/네임스페이스/노드
3. **심각도**: Critical/High/Medium/Low
4. **검색 키워드**: 관련 문서 검색에 쓸 핵심 키워드 (3~5개)
"""
response = llm.invoke([HumanMessage(content=prompt)])
state["symptoms"] = response.content
# 검색 쿼리 추출
keywords = __import__('re').findall(r'\*\*검색 키워드\*\*:?\s*(.+)', response.content)
state["search_query"] = keywords[0] if keywords else state["event_raw"][:200]
return state
# ── Node 3: RAG 검색 (Hybrid Search) ─────────────────────────
def retrieve_knowledge(state: AIOpsState) -> AIOpsState:
"""Milvus Hybrid Search: 의미 검색 + 메타데이터 필터링"""
query = state["search_query"]
# 카테고리 필터: SOP 우선 검색
sop_results = vector_store.similarity_search_with_score(
query,
k=3,
expr='category == "SOP" and status == "active"'
)
# Library/Architecture도 추가 검색
lib_results = vector_store.similarity_search_with_score(
query,
k=3,
expr='(category == "Library" or category == "Architecture") and status == "active"'
)
all_results = sop_results + lib_results
# 유사도 점수 기준 정렬 (낮을수록 유사)
all_results.sort(key=lambda x: x[1])
retrieved = []
for doc, score in all_results[:5]:
retrieved.append({
"doc_id": doc.metadata.get("doc_id"),
"category": doc.metadata.get("category"),
"severity": doc.metadata.get("severity"),
"content": doc.page_content,
"score": round(score, 4),
"file_path":doc.metadata.get("file_path"),
})
state["retrieved_docs"] = retrieved
return state
# ── Node 4: 근본 원인 분석 ────────────────────────────────────
def analyze_root_cause(state: AIOpsState) -> AIOpsState:
"""증상 + 검색 결과 → 근본 원인 분석 + 신뢰도 평가"""
docs_context = "\n\n---\n".join([
f"[{d['doc_id']}] (유사도: {d['score']})\n{d['content']}"
for d in state["retrieved_docs"]
])
prompt = f"""
당신은 1,000노드 K8s 클러스터 운영 전문가입니다.
## 분석된 증상:
{state['symptoms']}
## 참조 문서 (SOP/Library):
{docs_context}
## K8s 실시간 상태:
{json.dumps(state['k8s_context'], ensure_ascii=False)}
근본 원인을 분석하세요:
1. **근본 원인**: 가장 가능성 높은 원인 (1~2가지)
2. **원인 근거**: 증상과 참조 문서를 연결한 논리적 근거
3. **관련 컴포넌트**: 직접 관련된 K8s/Cilium/MinIO 컴포넌트
4. **분석 신뢰도**: 0.0~1.0 (참조 문서의 관련성이 낮으면 낮게 평가)
"""
response = llm.invoke([HumanMessage(content=prompt)])
state["root_cause"] = response.content
# 신뢰도 추출
conf_match = __import__('re').search(r'분석 신뢰도.*?([0-9]\.[0-9]+)', response.content)
state["confidence"] = float(conf_match.group(1)) if conf_match else 0.5
return state
# ── Node 5: 해결책 생성 ────────────────────────────────────────
def generate_solution(state: AIOpsState) -> AIOpsState:
"""근본 원인 → 단계별 해결책 + 실행 가능한 명령어 생성"""
prompt = f"""
## 근본 원인 분석:
{state['root_cause']}
## 관련 SOP/Library 요약:
{chr(10).join([d['content'][:500] for d in state['retrieved_docs'][:2]])}
즉시 실행 가능한 해결책을 제시하세요:
1. **즉각 조치 (0~5분)**: 피해 최소화를 위한 긴급 조치
2. **단계별 복구 절차**: 순서대로 실행할 kubectl/cilium 명령어 포함
3. **검증 방법**: 복구 성공 여부를 확인하는 방법
4. **재발 방지**: 같은 이슈가 재발하지 않도록 하는 설정 변경
```bash
# 즉각 조치 명령어 예시
# (실제 리소스명으로 교체 필요)
주의사항: 이 명령어는 Production 환경에 적용 전 반드시 검토하세요.
"""
response = llm.invoke([HumanMessage(content=prompt)])
state["solution"] = response.content
return state
def predict_risks(state: AIOpsState) -> AIOpsState:
"""현재 상태 기반 근시일 내 발생 가능한 리스크 예측"""
prompt = f"""
{state['symptoms']}
{json.dumps(state['k8s_context'], ensure_ascii=False)}
{chr(10).join([d['content'][:300] for d in state['retrieved_docs'][:3]])}
현재 상황을 방치하거나 부분 조치만 취했을 때 발생 가능한 리스크를 예측하세요:
리스크별 확률(%)과 영향도(Critical/High/Medium)도 표시하세요.
"""
response = llm.invoke([HumanMessage(content=prompt)])
state["risk_prediction"] = response.content
return state
def should_retry_search(state: AIOpsState) -> str:
if state["confidence"] < 0.4 and state["retry_count"] < 2:
state["retry_count"] += 1
state["search_query"] = state["symptoms"][:300] # 더 넓은 쿼리로 재시도
print(f"[RETRY] Low confidence ({state['confidence']}), retrying search...")
return "retrieve_knowledge"
return "generate_solution"
def build_aiops_graph() -> StateGraph:
graph = StateGraph(AIOpsState)
graph.add_node("collect_k8s_context", collect_k8s_context)
graph.add_node("analyze_symptoms", analyze_symptoms)
graph.add_node("retrieve_knowledge", retrieve_knowledge)
graph.add_node("analyze_root_cause", analyze_root_cause)
graph.add_node("generate_solution", generate_solution)
graph.add_node("predict_risks", predict_risks)
graph.set_entry_point("collect_k8s_context")
graph.add_edge("collect_k8s_context", "analyze_symptoms")
graph.add_edge("analyze_symptoms", "retrieve_knowledge")
graph.add_edge("retrieve_knowledge", "analyze_root_cause")
# 조건부 엣지: 신뢰도 낮으면 재검색 루프
graph.add_conditional_edges("analyze_root_cause", should_retry_search,
{"retrieve_knowledge": "retrieve_knowledge",
"generate_solution": "generate_solution"})
graph.add_edge("generate_solution", "predict_risks")
graph.add_edge("predict_risks", END)
return graph.compile()
def analyze_event(event_text: str) -> dict:
"""외부에서 호출하는 메인 분석 함수"""
graph = build_aiops_graph()
initial_state = AIOpsState(
messages=[],
event_raw=event_text,
symptoms="",
search_query="",
retrieved_docs=[],
root_cause="",
solution="",
risk_prediction="",
k8s_context={},
retry_count=0,
confidence=0.0,
)
result = graph.invoke(initial_state)
return {
"symptoms": result["symptoms"],
"root_cause": result["root_cause"],
"solution": result["solution"],
"risk_prediction": result["risk_prediction"],
"confidence": result["confidence"],
"referenced_docs": [d["doc_id"] for d in result["retrieved_docs"]],
}
if name == "main":
# 테스트 이벤트
test_event = """
ALERT: CiliumNetworkPolicy not enforced
Namespace: production
Node: k8s-worker-042
Message: BGP peer 10.0.0.1 connection lost, routes not propagated
Timestamp: 2026-04-29T03:14:00Z
"""
result = analyze_event(test_event)
import json
print(json.dumps(result, ensure_ascii=False, indent=2))
---
### 3-7. Argo Workflows 전체 파이프라인 YAML
```yaml
# argo-pipeline.yaml
# kubectl apply -f argo-pipeline.yaml
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: llm-wiki-pipeline
namespace: platform-ops
spec:
schedule: "0 2 * * *" # 매일 새벽 2시
timezone: "Asia/Seoul"
concurrencyPolicy: Forbid # 중복 실행 방지
workflowSpec:
serviceAccountName: argo-wiki-sa
entrypoint: main-pipeline
# 공유 볼륨 (파이프라인 단계 간 데이터 공유)
volumes:
- name: workspace
persistentVolumeClaim:
claimName: wiki-pipeline-pvc
templates:
# ── 메인 DAG ───────────────────────────────────────────────
- name: main-pipeline
dag:
tasks:
- name: step1-extract
template: confluence-extractor
- name: step2-process
dependencies: [step1-extract]
template: langchain-processor
- name: step3-lifecycle
dependencies: [step2-process]
template: lifecycle-manager
- name: step4-git-sync
dependencies: [step3-lifecycle]
template: git-syncer
- name: step5-indexing
dependencies: [step4-git-sync]
template: vector-indexer
- name: step6-notify
dependencies: [step5-indexing]
template: slack-notifier
# ── Step 1: Confluence 증분 추출 ───────────────────────────
- name: confluence-extractor
container:
image: internal-reg.com/wiki-pipeline/extractor:v1.2
command: ["python", "incremental_export.py"]
resources:
requests: {cpu: "500m", memory: "1Gi"}
limits: {cpu: "2", memory: "2Gi"}
envFrom:
- secretRef:
name: wiki-pipeline-secrets
volumeMounts:
- name: workspace
mountPath: /workspace
# ── Step 2: LLM 분류 처리 (GPU 활용) ──────────────────────
- name: langchain-processor
container:
image: internal-reg.com/wiki-pipeline/processor:v1.2
command: ["python", "processor.py"]
resources:
requests: {cpu: "2", memory: "8Gi", "nvidia.com/gpu": "1"}
limits: {cpu: "8", memory: "16Gi", "nvidia.com/gpu": "1"}
nodeSelector:
accelerator: nvidia-gpu
envFrom:
- secretRef:
name: wiki-pipeline-secrets
env:
- name: LLM_ENDPOINT
value: "http://vllm-service.ai-namespace:8000/v1"
volumeMounts:
- name: workspace
mountPath: /workspace
# ── Step 3: 생애주기 관리 ──────────────────────────────────
- name: lifecycle-manager
container:
image: internal-reg.com/wiki-pipeline/processor:v1.2
command: ["python", "lifecycle_manager.py", "--action", "archive",
"--clone-dir", "/workspace/llm-wiki"]
resources:
requests: {cpu: "200m", memory: "512Mi"}
limits: {cpu: "1", memory: "1Gi"}
envFrom:
- secretRef:
name: wiki-pipeline-secrets
volumeMounts:
- name: workspace
mountPath: /workspace
# ── Step 4: Git 동기화 ─────────────────────────────────────
- name: git-syncer
container:
image: internal-reg.com/wiki-pipeline/git-tools:v1.0
command: ["/bin/bash", "/app/sync_to_git.sh"]
resources:
requests: {cpu: "200m", memory: "256Mi"}
envFrom:
- secretRef:
name: wiki-pipeline-secrets
volumeMounts:
- name: workspace
mountPath: /workspace
# ── Step 5: Vector DB 인덱싱 ──────────────────────────────
- name: vector-indexer
container:
image: internal-reg.com/wiki-pipeline/indexer:v1.2
command: ["python", "indexer.py"]
resources:
requests: {cpu: "1", memory: "4Gi"}
limits: {cpu: "4", memory: "8Gi"}
env:
- name: WIKI_DIR
value: "/workspace/llm-wiki/wiki/active"
- name: EMBEDDING_MODEL_PATH
value: "/models/bge-m3"
envFrom:
- secretRef:
name: wiki-pipeline-secrets
volumeMounts:
- name: workspace
mountPath: /workspace
- name: models
mountPath: /models
readOnly: true
volumes:
- name: models
hostPath:
path: /data/models
# ── Step 6: 완료 알림 ─────────────────────────────────────
- name: slack-notifier
container:
image: curlimages/curl:latest
command: [sh, -c]
args:
- |
curl -X POST $SLACK_WEBHOOK_URL \
-H 'Content-type: application/json' \
-d '{"text":"✅ LLM Wiki 파이프라인 완료 ('"$(date '+%Y-%m-%d %H:%M')"')\n자세한 내용: Argo UI 확인"}'
envFrom:
- secretRef:
name: wiki-pipeline-secrets
---
# K8s Secret (실제 배포 시 Vault 또는 Sealed Secrets 권장)
apiVersion: v1
kind: Secret
metadata:
name: wiki-pipeline-secrets
namespace: platform-ops
type: Opaque
stringData:
CONFLUENCE_URL: "https://your-domain.atlassian.net/wiki"
CONFLUENCE_USER: "admin@company.com"
CONFLUENCE_TOKEN: "your-api-token"
PARENT_PAGE_ID: "12345678"
MINIO_URL: "minio.storage.svc.cluster.local:9000"
MINIO_ACCESS_KEY: "minio-user"
MINIO_SECRET_KEY: "minio-password"
GIT_TOKEN: "your-git-token"
GIT_HOST: "bitbucket.internal.com"
MILVUS_HOST: "milvus.storage.svc.cluster.local"
SLACK_WEBHOOK_URL:"https://hooks.slack.com/services/..."
api_server.py)Lens 로컬 PC 및 모니터링 시스템에서 에이전트를 호출하는 FastAPI 엔드포인트입니다.
#!/usr/bin/env python3
"""
AIOps Agent FastAPI 서버
- Lens(로컬 PC) 에서 HTTP 호출
- Alertmanager Webhook 수신
- 모니터링 시스템 연동
"""
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import asyncio
from aiops_agent import analyze_event # 위의 LangGraph 에이전트 임포트
import uvicorn
app = FastAPI(title="AIOps Agent API", version="1.0.0")
class EventRequest(BaseModel):
event_text: str
source: str = "manual" # manual | alertmanager | prometheus | lens
namespace: Optional[str] = None
severity: Optional[str] = None
class AlertmanagerWebhook(BaseModel):
"""Prometheus Alertmanager Webhook 형식"""
alerts: list
groupLabels: dict
commonAnnotations: dict
# ── 즉시 분석 엔드포인트 ─────────────────────────────────────
@app.post("/analyze")
async def analyze(req: EventRequest):
"""이벤트 텍스트 즉시 분석 (Lens 또는 사내 모니터링 시스템에서 호출)"""
result = await asyncio.get_event_loop().run_in_executor(
None, analyze_event, req.event_text
)
return {
"status": "ok",
"source": req.source,
"analysis": result
}
# ── Alertmanager Webhook 수신 ─────────────────────────────────
@app.post("/webhook/alertmanager")
async def alertmanager_webhook(payload: AlertmanagerWebhook, background_tasks: BackgroundTasks):
"""Prometheus Alertmanager에서 실시간 알람 수신 → 백그라운드 분석"""
for alert in payload.alerts:
if alert.get("status") == "firing":
event_text = (
f"ALERT: {alert.get('labels', {}).get('alertname', 'Unknown')}\n"
f"Namespace: {alert.get('labels', {}).get('namespace', 'unknown')}\n"
f"Severity: {alert.get('labels', {}).get('severity', 'unknown')}\n"
f"Summary: {alert.get('annotations', {}).get('summary', '')}\n"
f"Description: {alert.get('annotations', {}).get('description', '')}"
)
background_tasks.add_task(_analyze_and_notify, event_text)
return {"status": "received"}
async def _analyze_and_notify(event_text: str):
"""백그라운드: 분석 후 Slack으로 결과 전송"""
import httpx, os
result = await asyncio.get_event_loop().run_in_executor(None, analyze_event, event_text)
slack_msg = {
"blocks": [
{"type": "header", "text": {"type": "plain_text",
"text": f"🚨 AIOps 장애 분석 결과 (신뢰도: {result['confidence']:.0%})"}},
{"type": "section", "text": {"type": "mrkdwn",
"text": f"*📋 증상 요약*\n{result['symptoms'][:500]}"}},
{"type": "divider"},
{"type": "section", "text": {"type": "mrkdwn",
"text": f"*🔍 근본 원인*\n{result['root_cause'][:600]}"}},
{"type": "divider"},
{"type": "section", "text": {"type": "mrkdwn",
"text": f"*🛠 해결책*\n{result['solution'][:600]}"}},
{"type": "divider"},
{"type": "section", "text": {"type": "mrkdwn",
"text": f"*⚠️ 리스크 예측*\n{result['risk_prediction'][:400]}"}},
{"type": "context", "elements": [
{"type": "mrkdwn",
"text": f"참조 문서: {', '.join(result['referenced_docs'])}"}
]}
]
}
webhook_url = os.getenv("SLACK_WEBHOOK_URL")
if webhook_url:
async with httpx.AsyncClient() as client:
await client.post(webhook_url, json=slack_msg)
# ── 리스크 예측 전용 엔드포인트 ──────────────────────────────
@app.get("/risk-scan")
async def proactive_risk_scan():
"""정기 리스크 스캔 (cron으로 매시간 호출 가능)"""
from kubernetes import client as k8s, config
config.load_incluster_config()
v1 = k8s.CoreV1Api()
# 경고 이벤트 수집
events = v1.list_event_for_all_namespaces(
field_selector="type=Warning", limit=50
)
event_summary = "\n".join([
f"{e.involved_object.namespace}/{e.involved_object.name}: {e.reason} - {e.message}"
for e in events.items
])
if not event_summary:
return {"status": "healthy", "message": "No warning events detected"}
result = await asyncio.get_event_loop().run_in_executor(
None, analyze_event, f"정기 리스크 스캔\n수집된 경고 이벤트:\n{event_summary}"
)
return {"status": "risks_detected", "analysis": result}
# ── Wiki 검색 엔드포인트 ──────────────────────────────────────
@app.get("/search")
async def search_wiki(q: str, category: Optional[str] = None, limit: int = 5):
"""Lens에서 직접 Wiki 검색 (RAG 없이 순수 검색)"""
from langchain_community.vectorstores import Milvus as MilvusVS
from langchain_huggingface import HuggingFaceEmbeddings
import os
emb = HuggingFaceEmbeddings(model_name=os.getenv("EMBEDDING_MODEL_PATH", "/models/bge-m3"))
vs = MilvusVS(embedding_function=emb, collection_name="platform_ops_wiki",
connection_args={"host": os.getenv("MILVUS_HOST"), "port": "19530"})
expr = f'category == "{category}" and status == "active"' if category else 'status == "active"'
results = vs.similarity_search_with_score(q, k=limit, expr=expr)
return {"results": [
{"doc_id": r.metadata.get("doc_id"), "score": s,
"summary": r.metadata.get("summary"), "file": r.metadata.get("file_path")}
for r, s in results
]}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)
Lens에서 사내 AIOps API 서버와 연동하는 방법입니다.
# lens-extension-config.yaml
# Lens > Extensions > AIOps 설정
aiops:
apiEndpoint: "https://aiops-agent.internal.com"
alertWebhook: "/webhook/alertmanager"
searchEndpoint: "/search"
# 사내 LLM API (Lens에서 직접 쿼리)
llmApi:
endpoint: "https://llm-internal.company.com/v1"
model: "llama-3-70b"
authHeader: "Bearer ${INTERNAL_LLM_TOKEN}"
# alertmanager-config.yaml (기존 Alertmanager에 webhook 추가)
route:
group_by: ['alertname', 'namespace']
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
receiver: 'slack-default'
routes:
- match:
severity: critical
receiver: 'aiops-agent' # Critical 알람은 AIOps 에이전트로
receivers:
- name: 'aiops-agent'
webhook_configs:
- url: 'http://aiops-agent-svc.platform-ops:8080/webhook/alertmanager'
send_resolved: false
- name: 'slack-default'
slack_configs:
- api_url: 'https://hooks.slack.com/services/...'
channel: '#platform-alerts'
llm-wiki/ ← Git 저장소 루트
├── raw/ ← Confluence 원본 (수정 금지)
│ └── 2026-04-29/
│ ├── Cilium-BGP-Guide.md
│ └── MinIO-Scaling.md
│
├── wiki/
│ ├── active/ ← 현재 운영 중인 지식 (RAG 대상)
│ │ ├── SOP/
│ │ │ ├── Network/
│ │ │ │ └── cilium-bgp-peering-recovery.md
│ │ │ └── Storage/
│ │ │ └── minio-node-expansion.md
│ │ ├── Library/
│ │ │ ├── Cilium/
│ │ │ └── K8s/
│ │ ├── Architecture/
│ │ └── Reports/
│ │
│ └── archive/ ← 폐기된 구버전 문서 (RAG 제외)
│ └── SOP/Network/
│ └── cilium-1.14-bgp-old.md
│
├── scripts/ ← 파이프라인 스크립트
│ ├── incremental_export.py
│ ├── processor.py
│ ├── lifecycle_manager.py
│ ├── indexer.py
│ └── sync_to_git.sh
│
└── .gitlab-ci.yml ← GitLab 전환 후 사용할 CI 설정
# .gitlab-ci.yml
stages:
- validate
- index
- notify
validate-lifecycle:
stage: validate
image: internal-reg.com/wiki-pipeline/processor:v1.2
script:
- python scripts/lifecycle_manager.py --action check --clone-dir .
rules:
- if: $CI_COMMIT_BRANCH == "main"
changes:
- "wiki/**/*"
vector-indexing:
stage: index
image: internal-reg.com/wiki-pipeline/indexer:v1.2
script:
- python scripts/indexer.py
variables:
WIKI_DIR: "./wiki/active"
rules:
- if: $CI_COMMIT_BRANCH == "main"
changes:
- "wiki/active/**/*"
notify-update:
stage: notify
image: curlimages/curl
script:
- |
CHANGED=$(git diff HEAD~1 --name-only wiki/ | wc -l)
curl -X POST $SLACK_WEBHOOK_URL \
-d "{\"text\":\"📚 Wiki 업데이트: ${CHANGED}개 문서 변경 (${CI_COMMIT_SHORT_SHA})\"}"
rules:
- if: $CI_COMMIT_BRANCH == "main"
| 단계 | 항목 | 상태 |
|---|---|---|
| 인프라 | MinIO AIStor 버킷 생성 (confluence-wiki) | □ |
| 인프라 | Milvus K8s Operator 설치 | □ |
| 인프라 | vLLM Pod 배포 (GPU 노드) | □ |
| 인프라 | BGE-M3 임베딩 모델 반입 (폐쇄망) | □ |
| 파이프라인 | Argo Workflows CronWorkflow 배포 | □ |
| 파이프라인 | K8s Secret 생성 (Confluence/Git/MinIO 인증정보) | □ |
| 파이프라인 | 첫 전체 추출 실행 (manual trigger) | □ |
| 인덱싱 | Vector DB 초기 인덱싱 완료 확인 | □ |
| 모니터링 | AIOps API 서버 배포 | □ |
| 모니터링 | Alertmanager Webhook 연동 | □ |
| 로컬 | Lens AIOps Extension 설정 | □ |
| 거버넌스 | Slack 알림 채널 설정 (180일 검증 알림) | □ |
| 전환 | GitLab .gitlab-ci.yml 준비 (6개월 후) | □ |
문서 ID: ARCH-OPS-001 | 작성일: 2026-04-29 | 버전: v1.0 | 검토자: Platform Lead