26A29b

Young-Kyoo Kim·2026년 4월 28일

규모가 크고 중요한 설계 문서이므로, 전체 아키텍처와 핵심 코드를 포함한 단일 마크다운 파일로 작성합니다.전체 아키텍처 설계서와 핵심 코드를 담은 문서를 작성했습니다.


문서 구성 요약

8개 핵심 코드 모듈이 포함되어 있으며, 각각의 역할과 연결 구조는 다음과 같습니다.

데이터 수집 → 정제 → 저장 흐름:

  • incremental_export.py — Confluence CQL 증분 추출, 파일 해시 비교로 중복 방지, MinIO raw/ 저장
  • processor.py — LangChain + 로컬 LLM(vLLM)으로 분류·링크수정·YAML 메타데이터 자동 생성, 중복 문서 감지 및 deprecated 처리
  • sync_to_git.sh — MinIO wiki/ → Git 동기화, lifecycle_manager 연계 호출
  • lifecycle_manager.py — 180일 미검증 시 Slack 알림, 365일 경과 시 archive/ 자동 이동

인덱싱 및 검색:

  • indexer.pyRecordManager로 증분 인덱싱(변경분만), 삭제된 문서 자동 정리, Milvus Hybrid Search

AIOps 모니터링:

  • aiops_agent.py — LangGraph 6-Node 워크플로우. 신뢰도 낮으면 자동 재검색 루프, K8s 실시간 컨텍스트 수집, 근본 원인 분석 → 해결책 → 리스크 예측까지 순차 실행
  • api_server.py — FastAPI 서버. Alertmanager Webhook 수신, Lens에서 HTTP 호출, 정기 리스크 스캔 엔드포인트 제공
  • argo-pipeline.yaml — 전체 6단계 파이프라인을 CronWorkflow로 오케스트레이션

생애주기 관리는 YAML status 필드(draft → active → deprecated → archive)를 중심으로 전 단계에 걸쳐 반영했고, GitLab 전환 대비 .gitlab-ci.yml도 포함되어 있습니다.

===

AIOps Platform + LLM Wiki 전체 아키텍처 설계서

대상 환경: K8s 1,000노드 클러스터 / Cilium / MinIO AIStor / Airgapped / Bitbucket → GitLab 전환 예정
목표: Confluence 문서를 LLM Wiki로 자동 변환하고, 실시간 K8s 상태와 결합하여 장애 원인 분석·해결책 제시·리스크 예측이 가능한 완전 자동화 AIOps 시스템 구축


1. 전체 시스템 아키텍처 (High-Level Overview)

┌─────────────────────────────────────────────────────────────────────────┐
│                        DATA INGESTION LAYER                             │
│  Confluence  ──►  K8s CronJob(증분추출)  ──►  MinIO AIStor [raw/]      │
│  Web Vendor Docs ──► Firecrawl(Self-hosted) ──► MinIO AIStor [raw/]    │
└──────────────────────────────┬──────────────────────────────────────────┘
                               │ Argo Workflows (Event-Driven)
┌──────────────────────────────▼──────────────────────────────────────────┐
│                     INTELLIGENCE PROCESSING LAYER                       │
│  MinIO [raw/]  ──►  LangChain Processor                                │
│                     ├─ Link Resolver (Internal Links 수정)              │
│                     ├─ LLM Classifier (vLLM/Ollama - 폐쇄망)           │
│                     ├─ Metadata YAML 자동 생성                          │
│                     └─ Lifecycle Status 관리                            │
└──────────────────────────────┬──────────────────────────────────────────┘
                               │
┌──────────────────────────────▼──────────────────────────────────────────┐
│                        KNOWLEDGE STORAGE LAYER                          │
│  Git (Bitbucket/GitLab)                                                 │
│  ├─ /raw/         ← MinIO에서 가져온 원본 MD                            │
│  └─ /wiki/        ← LLM이 정제한 구조화 지식                            │
│      ├─ /active/Architecture, SOP, Library, Reports                    │
│      └─ /archive/ ← 폐기된 구버전 문서                                  │
└──────────┬──────────────────────────────────────────────────────────────┘
           │ GitHub Actions / GitLab CI  (wiki/** push trigger)
┌──────────▼──────────────────────────────────────────────────────────────┐
│                         VECTOR INDEX LAYER                              │
│  LangChain Indexing API + RecordManager ──► Milvus (K8s Operator)      │
│  ├─ Hybrid Search: BM25(Keyword) + Vector(Semantic)                     │
│  └─ Metadata Filter: status, category, tech_stack, last_verified       │
└──────────┬──────────────────────────────────────────────────────────────┘
           │
┌──────────▼──────────────────────────────────────────────────────────────┐
│                        AIOPS MONITORING LAYER                           │
│  K8s 실시간 이벤트 (Prometheus/Alertmanager/K8s API)                   │
│      ──► LangGraph Agent                                                │
│           ├─ Node 1: 이벤트 수집 & 증상 분석                            │
│           ├─ Node 2: RAG 검색 (Milvus - 관련 SOP/Library)              │
│           ├─ Node 3: 근본 원인 분석 (Root Cause Analysis)              │
│           ├─ Node 4: 해결책 생성 & 검증                                 │
│           └─ Node 5: 리스크 예측 (Proactive Risk Detection)            │
│      ──► Lens (로컬 PC) + 사내 LLM API 연동                            │
└─────────────────────────────────────────────────────────────────────────┘

2. 문서 생애주기 (Document Lifecycle) 설계

모든 문서는 아래 4단계 상태를 가지며, LLM과 자동화 파이프라인이 상태를 관리합니다.

draft ──► active ──► deprecated ──► archive
                         │
                   (180일 미검증 시 자동 알림)
                         │
                   (365일 후 archive 자동 이동)

표준 Metadata YAML 스키마

---
# === 식별 및 분류 ===
id: "SOP-CIL-001"                         # 문서 고유 ID (자동 생성)
title: "Cilium BGP Control Plane 장애 대응"
category: "SOP"                           # SOP | Library | Architecture | Reports
tech_stack: ["Cilium", "BGP", "K8s"]
sub_category: "Network"

# === 생애주기 관리 (Lifecycle) ===
status: "active"                          # draft | active | deprecated | archive
created_at: "2026-01-15"
last_verified_at: "2026-04-28"            # 기술 검증일 (180일 미갱신 시 알림)
verified_by: "platform-lead"
applies_to_version: "cilium>=1.15"        # 유효 버전 범위
expires_at: null                          # 명시적 만료일 (null이면 자동 관리)

# === 출처 및 추적 ===
source: "confluence"                      # confluence | web | manual | ai-generated
source_url: "https://confluence.internal/pages/12345"
auto_classified: true
pipeline_version: "v1.2"

# === 운영 맥락 (AIOps) ===
severity: "Critical"                      # Critical | High | Medium | Low
environment: ["production", "staging"]
target_audience: "platform-engineer"
contains_code: true
code_languages: ["yaml", "shell"]
related_docs: ["ARCH-NET-002", "LIB-CIL-005"]

# === 검색 최적화 ===
tags: ["bgp-peering", "network-failure", "cilium", "troubleshooting"]
summary: "Cilium BGP 피어링 실패 시 원인 진단 및 단계별 복구 절차"
---

3. 핵심 코드 구현

3-1. Confluence 증분 추출 스크립트 (incremental_export.py)

#!/usr/bin/env python3
"""
Confluence 24시간 증분 추출 → MinIO AIStor 저장
K8s CronJob에서 매일 새벽 02:00 실행
"""
import os
import datetime
import hashlib
import json
from atlassian import Confluence
import html2text
from minio import Minio

# ── 환경 변수 ──────────────────────────────────────────────
CONFLUENCE_URL  = os.getenv("CONFLUENCE_URL")
CONFLUENCE_USER = os.getenv("CONFLUENCE_USER")
CONFLUENCE_TOKEN= os.getenv("CONFLUENCE_TOKEN")
PARENT_PAGE_ID  = os.getenv("PARENT_PAGE_ID")
MINIO_URL       = os.getenv("MINIO_URL")
MINIO_ACCESS    = os.getenv("MINIO_ACCESS_KEY")
MINIO_SECRET    = os.getenv("MINIO_SECRET_KEY")
BUCKET_NAME     = "confluence-wiki"
EXPORT_DIR      = "/tmp/confluence_export"
HASH_STORE_PATH = "/tmp/hash_store.json"

confluence = Confluence(url=CONFLUENCE_URL, username=CONFLUENCE_USER, password=CONFLUENCE_TOKEN)
minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=True)

h = html2text.HTML2Text()
h.ignore_links = False
h.body_width = 0  # 줄바꿈 없이 원형 유지

def load_hash_store():
    """이전 실행의 파일 해시값 로드 (중복 처리 방지)"""
    try:
        resp = minio_client.get_object(BUCKET_NAME, "metadata/hash_store.json")
        return json.loads(resp.read())
    except Exception:
        return {}

def save_hash_store(store: dict):
    data = json.dumps(store).encode()
    from io import BytesIO
    minio_client.put_object(BUCKET_NAME, "metadata/hash_store.json", BytesIO(data), len(data))

def extract_page(page_id: str, title: str) -> str:
    """Confluence 페이지 → Markdown 변환"""
    detail = confluence.get_page_by_id(page_id, expand='body.storage,version,ancestors')
    html_body = detail['body']['storage']['value']
    version = detail['version']['number']
    
    # 첨부 이미지 수집
    attachments = confluence.get_attachments_from_content(page_id)
    attachment_map = {}
    for att in attachments.get('results', []):
        att_title = att['title']
        att_url = f"{CONFLUENCE_URL}{att['_links']['download']}"
        attachment_map[att_title] = f"./assets/{att_title}"
    
    md_content = h.handle(html_body)
    
    # 첨부 이미지 경로를 상대 경로로 교체
    for original_url, relative_path in attachment_map.items():
        md_content = md_content.replace(original_url, relative_path)
    
    # YAML Frontmatter 초안 삽입 (LLM이 나중에 완성)
    safe_title = title.replace("/", "-").replace(" ", "-").lower()
    frontmatter = f"""---
id: "PENDING-{page_id}"
title: "{title}"
source: "confluence"
source_page_id: "{page_id}"
source_version: {version}
status: "draft"
auto_classified: false
created_at: "{datetime.date.today()}"
last_verified_at: "{datetime.date.today()}"
---

"""
    return frontmatter + md_content

def run_incremental_export():
    os.makedirs(EXPORT_DIR, exist_ok=True)
    hash_store = load_hash_store()
    
    # 24시간 이내 변경 페이지만 CQL로 추출
    cql = f'ancestor = {PARENT_PAGE_ID} AND lastModified >= now("-1d") ORDER BY lastModified DESC'
    results = confluence.cql(cql).get('results', [])
    print(f"[INFO] Found {len(results)} updated pages in last 24h")
    
    exported_count = 0
    for item in results:
        page = item.get('content', {})
        page_id = page.get('id')
        title = page.get('title', 'untitled').replace("/", "-")
        
        try:
            md_content = extract_page(page_id, title)
            content_hash = hashlib.sha256(md_content.encode()).hexdigest()
            
            # 해시 비교 → 변경된 파일만 처리
            if hash_store.get(page_id) == content_hash:
                print(f"[SKIP] No change: {title}")
                continue
            
            # MinIO raw/ 버킷에 저장 (날짜별 경로)
            date_prefix = datetime.date.today().strftime("%Y-%m-%d")
            minio_path = f"raw/{date_prefix}/{title}.md"
            data = md_content.encode('utf-8')
            from io import BytesIO
            minio_client.put_object(BUCKET_NAME, minio_path, BytesIO(data), len(data),
                                    content_type="text/markdown")
            
            hash_store[page_id] = content_hash
            exported_count += 1
            print(f"[OK] Exported: {title}{minio_path}")
            
        except Exception as e:
            print(f"[ERROR] Failed to export {title}: {e}")
    
    save_hash_store(hash_store)
    print(f"[DONE] Exported {exported_count} pages to MinIO")

if __name__ == "__main__":
    run_incremental_export()

3-2. LangChain 지능형 분류 처리기 (processor.py)

#!/usr/bin/env python3
"""
MinIO raw/ → LLM 분류/링크수정/메타데이터 생성 → wiki/ 폴더 구조 출력
Argo Workflows langchain-processor Pod 내부에서 실행
"""
import os
import re
import json
import uuid
import datetime
from minio import Minio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List, Optional
import yaml

MINIO_URL    = os.getenv("MINIO_URL")
MINIO_ACCESS = os.getenv("MINIO_ACCESS_KEY")
MINIO_SECRET = os.getenv("MINIO_SECRET_KEY")
LLM_ENDPOINT = os.getenv("LLM_ENDPOINT", "http://vllm-service.ai:8000/v1")
BUCKET_NAME  = "confluence-wiki"

minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=True)

# 폐쇄망: vLLM이 OpenAI 호환 API 제공
llm = ChatOpenAI(
    model="llama-3-70b-instruct",
    base_url=LLM_ENDPOINT,
    api_key="internal",
    temperature=0
)

# ── Pydantic 출력 스키마 ─────────────────────────────────────
class DocMetadata(BaseModel):
    id_suffix: str = Field(description="문서 고유 식별자 접미사 (영문 대문자 3자리, 예: NET, K8S, MIO)")
    category: str = Field(description="SOP | Library | Architecture | Reports")
    sub_category: str = Field(description="Network | Storage | Compute | Security | Monitoring")
    tech_stack: List[str] = Field(description="관련 기술 스택 리스트 (예: ['Cilium','BGP','K8s'])")
    severity: str = Field(description="Critical | High | Medium | Low")
    summary: str = Field(description="문서 핵심 내용 한 문장 요약 (한국어)")
    tags: List[str] = Field(description="검색용 핵심 키워드 리스트 (소문자, kebab-case)")
    applies_to_version: Optional[str] = Field(description="유효 버전 (예: 'cilium>=1.15'), 없으면 null")
    contains_code: bool = Field(description="코드 블록 포함 여부")
    code_languages: List[str] = Field(description="포함된 코드 언어 리스트")
    optimized_filename: str = Field(description="파일명 (영문 kebab-case, .md 제외)")
    related_concepts: List[str] = Field(description="관련 인프라 컴포넌트 또는 개념 리스트")

structured_llm = llm.with_structured_output(DocMetadata)

# ── 내부 링크 리졸버 ──────────────────────────────────────────
def resolve_internal_links(content: str, all_filenames: list) -> str:
    """[[WikiLink]] → [WikiLink](./WikiLink.md) 변환 (파일 존재 확인 포함)"""
    def replace_link(match):
        link_text = match.group(1)
        safe_name = link_text.replace(" ", "-").lower()
        # 파일 목록에서 존재 여부 확인
        matched = next((f for f in all_filenames if safe_name in f.lower()), None)
        if matched:
            return f"[{link_text}](./{matched})"
        return f"[{link_text}](./{safe_name}.md)"  # 미래 생성 파일 대비
    
    # [[Link|Alias]] 형식도 처리
    content = re.sub(r'\[\[([^|\]]+)\|([^\]]+)\]\]', 
                     lambda m: f"[{m.group(2)}](./{m.group(1).replace(' ','-').lower()}.md)", 
                     content)
    # [[Link]] 형식 처리
    content = re.sub(r'\[\[([^\]]+)\]\]', replace_link, content)
    return content

# ── 중복 문서 감지 ────────────────────────────────────────────
def check_duplicate_in_wiki(new_content: str, wiki_path: str) -> Optional[str]:
    """기존 wiki 파일과 의미적 유사도 체크 (간단 버전: 제목+핵심키워드 비교)"""
    try:
        objects = minio_client.list_objects(BUCKET_NAME, prefix="wiki/active/", recursive=True)
        new_words = set(re.findall(r'\b[A-Za-z]{4,}\b', new_content[:2000]))
        
        for obj in objects:
            if not obj.object_name.endswith('.md'):
                continue
            resp = minio_client.get_object(BUCKET_NAME, obj.object_name)
            existing = resp.read().decode('utf-8')
            existing_words = set(re.findall(r'\b[A-Za-z]{4,}\b', existing[:2000]))
            
            # 70% 이상 단어 겹침 → 중복 의심
            if len(new_words) > 0:
                overlap = len(new_words & existing_words) / len(new_words | existing_words)
                if overlap > 0.7:
                    return obj.object_name
    except Exception:
        pass
    return None

# ── 메인 분류 프로세서 ────────────────────────────────────────
def process_raw_documents():
    """MinIO raw/[today]/ 의 모든 MD 파일을 처리하여 wiki/active/ 로 저장"""
    today = datetime.date.today().strftime("%Y-%m-%d")
    prefix = f"raw/{today}/"
    
    objects = list(minio_client.list_objects(BUCKET_NAME, prefix=prefix, recursive=True))
    all_filenames = [os.path.basename(obj.object_name) for obj in objects]
    print(f"[INFO] Processing {len(objects)} files from {prefix}")
    
    category_counter = {"SOP": 0, "Library": 0, "Architecture": 0, "Reports": 0}
    
    for obj in objects:
        if not obj.object_name.endswith('.md'):
            continue
        
        try:
            # 1. 파일 읽기
            resp = minio_client.get_object(BUCKET_NAME, obj.object_name)
            raw_content = resp.read().decode('utf-8')
            
            # 기존 YAML Frontmatter 제거 (재생성 예정)
            body = re.sub(r'^---\n.*?\n---\n', '', raw_content, flags=re.DOTALL).strip()
            
            # 2. 내부 링크 수정
            refined = resolve_internal_links(body, all_filenames)
            
            # 3. LLM 메타데이터 분류
            prompt = ChatPromptTemplate.from_template("""
당신은 플랫폼 엔지니어링 지식 관리 전문가입니다.
다음 인프라 기술 문서를 분석하여 정확한 메타데이터를 추출하세요.
문서 컨텍스트: K8s 1,000노드 클러스터, Cilium CNI, MinIO AIStor 환경

문서 내용 (앞 2000자):
{content}
""")
            chain = prompt | structured_llm
            meta: DocMetadata = chain.invoke({"content": refined[:2000]})
            
            # 4. 카테고리별 ID 생성
            cat_short = {"SOP": "SOP", "Library": "LIB", "Architecture": "ARCH", "Reports": "RPT"}
            cat_key = cat_short.get(meta.category, "DOC")
            category_counter[meta.category] = category_counter.get(meta.category, 0) + 1
            doc_id = f"{cat_key}-{meta.id_suffix}-{category_counter.get(meta.category, 1):03d}"
            
            # 5. 완전한 YAML Frontmatter 생성
            frontmatter = {
                "id": doc_id,
                "title": meta.optimized_filename.replace("-", " ").title(),
                "category": meta.category,
                "sub_category": meta.sub_category,
                "tech_stack": meta.tech_stack,
                "status": "active",
                "severity": meta.severity,
                "summary": meta.summary,
                "tags": meta.tags,
                "applies_to_version": meta.applies_to_version,
                "contains_code": meta.contains_code,
                "code_languages": meta.code_languages,
                "related_docs": [],
                "related_concepts": meta.related_concepts,
                "source": "confluence",
                "auto_classified": True,
                "created_at": str(datetime.date.today()),
                "last_verified_at": str(datetime.date.today()),
                "verified_by": "auto-pipeline",
                "environment": ["production"],
                "expires_at": None,
                "pipeline_version": "v1.2"
            }
            
            final_content = f"---\n{yaml.dump(frontmatter, allow_unicode=True, default_flow_style=False)}---\n\n{refined}"
            
            # 6. 중복 체크
            duplicate_path = check_duplicate_in_wiki(refined, obj.object_name)
            if duplicate_path:
                # 기존 파일을 deprecated로 상태 변경
                print(f"[WARN] Duplicate detected: {duplicate_path} → marking as deprecated")
                _deprecate_document(duplicate_path)
            
            # 7. wiki/active/[Category]/[SubCat]/ 경로로 저장
            target_path = f"wiki/active/{meta.category}/{meta.sub_category}/{meta.optimized_filename}.md"
            data = final_content.encode('utf-8')
            from io import BytesIO
            minio_client.put_object(BUCKET_NAME, target_path, BytesIO(data), len(data),
                                    content_type="text/markdown")
            
            print(f"[OK] Classified: {obj.object_name}{target_path} [{doc_id}]")
            
        except Exception as e:
            print(f"[ERROR] Failed to process {obj.object_name}: {e}")
            import traceback; traceback.print_exc()

def _deprecate_document(minio_path: str):
    """기존 문서의 status를 deprecated로 변경"""
    try:
        resp = minio_client.get_object(BUCKET_NAME, minio_path)
        content = resp.read().decode('utf-8')
        content = re.sub(r'status: "active"', 'status: "deprecated"', content)
        data = content.encode('utf-8')
        from io import BytesIO
        minio_client.put_object(BUCKET_NAME, minio_path, BytesIO(data), len(data))
    except Exception as e:
        print(f"[ERROR] Failed to deprecate {minio_path}: {e}")

if __name__ == "__main__":
    process_raw_documents()

3-3. Git 동기화 스크립트 (sync_to_git.sh)

#!/bin/bash
# MinIO wiki/ 내용을 Git 저장소에 동기화
# Argo Workflows git-sync Pod에서 실행

set -euo pipefail

GIT_REPO_URL="https://${GIT_TOKEN}@${GIT_HOST}/platform/llm-wiki.git"
CLONE_DIR="/workspace/llm-wiki"
BUCKET="confluence-wiki"

echo "[STEP 1] Git clone or pull"
if [ ! -d "$CLONE_DIR/.git" ]; then
    git clone "$GIT_REPO_URL" "$CLONE_DIR"
fi
cd "$CLONE_DIR"
git config user.email "aiops-bot@internal.com"
git config user.name "AIOps Pipeline Bot"
git pull origin main --rebase

echo "[STEP 2] Sync MinIO wiki/ → local"
mc alias set myminio "${MINIO_URL}" "${MINIO_ACCESS_KEY}" "${MINIO_SECRET_KEY}"
mc mirror myminio/confluence-wiki/wiki/ "$CLONE_DIR/wiki/" --overwrite

echo "[STEP 3] Archive lifecycle check (365일 초과 deprecated 문서 자동 이동)"
python3 /app/lifecycle_manager.py --action archive --clone-dir "$CLONE_DIR"

echo "[STEP 4] Git commit & push"
git add wiki/
CHANGED=$(git diff --cached --name-only | wc -l)
if [ "$CHANGED" -gt "0" ]; then
    COMMIT_MSG="Auto-sync: ${CHANGED} docs updated [$(date +%Y-%m-%d %H:%M)] via AIOps Pipeline"
    git commit -m "$COMMIT_MSG"
    git push origin main
    echo "[OK] Pushed $CHANGED changed files to Git"
else
    echo "[SKIP] No changes detected, nothing to push"
fi

echo "[DONE] Git sync completed"

3-4. 문서 생애주기 관리 (lifecycle_manager.py)

#!/usr/bin/env python3
"""
문서 생애주기 자동 관리
- 180일 미검증 → Slack 알림
- 365일 deprecated → archive 자동 이동
- Argo CronJob 또는 Git sync 후 호출
"""
import os
import re
import yaml
import datetime
import argparse
import requests
from pathlib import Path

SLACK_WEBHOOK = os.getenv("SLACK_WEBHOOK_URL")
ARCHIVE_DAYS  = 365
ALERT_DAYS    = 180

def parse_frontmatter(content: str) -> dict:
    match = re.match(r'^---\n(.*?)\n---', content, re.DOTALL)
    if match:
        try:
            return yaml.safe_load(match.group(1)) or {}
        except Exception:
            return {}
    return {}

def update_frontmatter(content: str, updates: dict) -> str:
    match = re.match(r'^(---\n)(.*?)(\n---)', content, re.DOTALL)
    if not match:
        return content
    try:
        fm = yaml.safe_load(match.group(2)) or {}
        fm.update(updates)
        new_fm = yaml.dump(fm, allow_unicode=True, default_flow_style=False).strip()
        return f"---\n{new_fm}\n---{content[match.end():]}"
    except Exception:
        return content

def send_slack_alert(doc_id: str, title: str, last_verified: str, file_path: str):
    if not SLACK_WEBHOOK:
        print(f"[ALERT] {doc_id} ({title}): 검증 기간 초과 - {last_verified}")
        return
    msg = {
        "blocks": [
            {"type": "section", "text": {"type": "mrkdwn",
             "text": f":warning: *문서 검증 기간 초과 알림*\n"
                     f"*ID:* `{doc_id}`\n*제목:* {title}\n"
                     f"*마지막 검증:* {last_verified}\n*경로:* `{file_path}`"}},
            {"type": "actions", "elements": [
                {"type": "button", "text": {"type": "plain_text", "text": "검증 완료로 표시"},
                 "style": "primary", "value": doc_id},
                {"type": "button", "text": {"type": "plain_text", "text": "아카이브"},
                 "style": "danger", "value": f"archive:{doc_id}"}
            ]}
        ]
    }
    requests.post(SLACK_WEBHOOK, json=msg)

def run_lifecycle_check(clone_dir: str, action: str = "check"):
    today = datetime.date.today()
    wiki_path = Path(clone_dir) / "wiki" / "active"
    archived_count = 0
    alerted_count  = 0
    
    for md_file in wiki_path.rglob("*.md"):
        content = md_file.read_text(encoding='utf-8')
        fm = parse_frontmatter(content)
        if not fm:
            continue
        
        status = fm.get("status", "active")
        last_verified_str = str(fm.get("last_verified_at", ""))
        doc_id = fm.get("id", "UNKNOWN")
        title  = fm.get("title", str(md_file.name))
        
        try:
            last_verified = datetime.date.fromisoformat(last_verified_str)
        except (ValueError, TypeError):
            last_verified = today - datetime.timedelta(days=ALERT_DAYS + 1)
        
        age_days = (today - last_verified).days
        
        # 365일 초과 deprecated → archive 폴더로 자동 이동
        if status == "deprecated" and age_days >= ARCHIVE_DAYS and action == "archive":
            archive_target = Path(clone_dir) / "wiki" / "archive" / md_file.relative_to(wiki_path)
            archive_target.parent.mkdir(parents=True, exist_ok=True)
            
            updated = update_frontmatter(content, {
                "status": "archive",
                "archived_at": str(today),
                "archive_reason": f"Auto-archived after {age_days} days without verification"
            })
            archive_target.write_text(updated, encoding='utf-8')
            md_file.unlink()
            archived_count += 1
            print(f"[ARCHIVE] {doc_id}: {md_file.name} → archive/")
        
        # 180일 초과 active → Slack 경고
        elif status == "active" and age_days >= ALERT_DAYS:
            if action in ("check", "alert"):
                send_slack_alert(doc_id, title, last_verified_str, str(md_file))
                alerted_count += 1
    
    print(f"[LIFECYCLE] archived={archived_count}, alerted={alerted_count}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--action", choices=["check", "alert", "archive"], default="check")
    parser.add_argument("--clone-dir", default="/workspace/llm-wiki")
    args = parser.parse_args()
    run_lifecycle_check(args.clone_dir, args.action)

3-5. Vector DB 인덱싱 파이프라인 (indexer.py)

#!/usr/bin/env python3
"""
Git wiki/ → Milvus Vector DB 증분 인덱싱
LangChain RecordManager로 중복/삭제 자동 동기화
Hybrid Search: Dense(Embedding) + Sparse(BM25) 지원
"""
import os
import re
from pathlib import Path
from langchain_community.document_loaders import UnstructuredMarkdownLoader
from langchain_text_splitters import MarkdownHeaderTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings  # 폐쇄망용
from langchain_community.vectorstores import Milvus
from langchain.indexes import SQLRecordManager, index
from langchain_core.documents import Document
import yaml

WIKI_DIR    = os.getenv("WIKI_DIR", "/workspace/llm-wiki/wiki/active")
MILVUS_HOST = os.getenv("MILVUS_HOST", "milvus-service.storage")
MILVUS_PORT = os.getenv("MILVUS_PORT", "19530")
MODEL_PATH  = os.getenv("EMBEDDING_MODEL_PATH", "/models/bge-m3")  # 폐쇄망 로컬 모델
COLLECTION  = "platform_ops_wiki"
DB_URL      = os.getenv("RECORD_MANAGER_DB", "sqlite:////data/record_manager.db")

# ── 임베딩 모델 (폐쇄망: HuggingFace 로컬 모델) ──────────────
embeddings = HuggingFaceEmbeddings(
    model_name=MODEL_PATH,
    model_kwargs={'device': 'cuda'},
    encode_kwargs={'normalize_embeddings': True}
)

# ── Vector Store ──────────────────────────────────────────────
vector_store = Milvus(
    embedding_function=embeddings,
    collection_name=COLLECTION,
    connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT},
    index_params={"metric_type": "COSINE"},
)

# ── RecordManager: 중복/삭제 자동 동기화 ──────────────────────
record_manager = SQLRecordManager(namespace=f"milvus/{COLLECTION}", db_url=DB_URL)
record_manager.create_schema()

# ── 마크다운 헤더 기반 청킹 ────────────────────────────────────
HEADERS_TO_SPLIT = [("#", "h1"), ("##", "h2"), ("###", "h3")]
text_splitter = MarkdownHeaderTextSplitter(HEADERS_TO_SPLIT, strip_headers=False)

def parse_frontmatter(content: str) -> dict:
    match = re.match(r'^---\n(.*?)\n---', content, re.DOTALL)
    if match:
        try:
            return yaml.safe_load(match.group(1)) or {}
        except Exception:
            return {}
    return {}

def load_documents(wiki_dir: str) -> list[Document]:
    """wiki/active/ 모든 MD 파일 → LangChain Document 리스트 (메타데이터 포함)"""
    docs = []
    for md_path in Path(wiki_dir).rglob("*.md"):
        content = md_path.read_text(encoding='utf-8')
        fm = parse_frontmatter(content)
        
        # status가 active인 문서만 인덱싱
        if fm.get("status") not in ("active", None):
            continue
        
        # YAML Frontmatter 제거 후 본문만 청킹
        body = re.sub(r'^---\n.*?\n---\n', '', content, flags=re.DOTALL).strip()
        chunks = text_splitter.split_text(body)
        
        for chunk in chunks:
            # 청킹된 각 조각에 문서 메타데이터 부착
            chunk.metadata.update({
                "doc_id":       fm.get("id", ""),
                "category":     fm.get("category", ""),
                "sub_category": fm.get("sub_category", ""),
                "tech_stack":   ",".join(fm.get("tech_stack", [])),
                "severity":     fm.get("severity", ""),
                "status":       fm.get("status", "active"),
                "tags":         ",".join(fm.get("tags", [])),
                "summary":      fm.get("summary", ""),
                "last_verified":str(fm.get("last_verified_at", "")),
                "file_path":    str(md_path.relative_to(wiki_dir)),
                "source":       fm.get("source", ""),
            })
            docs.append(chunk)
    
    return docs

def run_indexing():
    print(f"[INFO] Loading documents from {WIKI_DIR}")
    docs = load_documents(WIKI_DIR)
    print(f"[INFO] Total chunks to index: {len(docs)}")
    
    # cleanup="incremental": 변경된 문서만 업데이트, 삭제된 문서는 DB에서도 삭제
    result = index(
        docs,
        record_manager,
        vector_store,
        cleanup="incremental",
        source_id_key="doc_id"
    )
    print(f"[DONE] Indexing result: {result}")

if __name__ == "__main__":
    run_indexing()

3-6. LangGraph AIOps 모니터링 에이전트 (aiops_agent.py)

#!/usr/bin/env python3
"""
K8s 실시간 이벤트 + RAG(Wiki) 기반 AIOps 에이전트
기능:
  1. 이벤트 수집 & 증상 분석
  2. 관련 SOP/Library RAG 검색
  3. 근본 원인 분석 (Root Cause Analysis)
  4. 해결책 생성 & 실행 가능 명령어 제시
  5. 리스크 예측 (Proactive Risk Detection)
"""
import os
import json
from typing import TypedDict, Annotated, List, Optional
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Milvus
from kubernetes import client, config

LLM_ENDPOINT  = os.getenv("LLM_ENDPOINT", "http://vllm-service.ai:8000/v1")
MILVUS_HOST   = os.getenv("MILVUS_HOST", "milvus-service.storage")
MODEL_PATH    = os.getenv("EMBEDDING_MODEL_PATH", "/models/bge-m3")
COLLECTION    = "platform_ops_wiki"

llm = ChatOpenAI(model="llama-3-70b-instruct", base_url=LLM_ENDPOINT, api_key="internal", temperature=0)

embeddings = HuggingFaceEmbeddings(model_name=MODEL_PATH, model_kwargs={'device': 'cpu'})
vector_store = Milvus(
    embedding_function=embeddings,
    collection_name=COLLECTION,
    connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT},
)

# ── K8s 클라이언트 초기화 ─────────────────────────────────────
try:
    config.load_incluster_config()
except Exception:
    config.load_kube_config()

k8s_core = client.CoreV1Api()
k8s_apps = client.AppsV1Api()

# ── 에이전트 상태 정의 ────────────────────────────────────────
class AIOpsState(TypedDict):
    messages: Annotated[list, add_messages]
    event_raw: str                    # 원본 K8s 이벤트/알람
    symptoms: str                     # 분석된 증상
    search_query: str                 # RAG 검색 쿼리
    retrieved_docs: List[dict]        # 검색된 관련 문서
    root_cause: str                   # 근본 원인 분석 결과
    solution: str                     # 제안된 해결책 + 명령어
    risk_prediction: str              # 예측된 리스크
    k8s_context: dict                 # K8s 클러스터 실시간 상태
    retry_count: int                  # 재시도 횟수 (루프 제어)
    confidence: float                 # 분석 신뢰도 (0~1)

# ── Node 1: K8s 컨텍스트 수집 ──────────────────────────────────
def collect_k8s_context(state: AIOpsState) -> AIOpsState:
    """이벤트와 관련된 K8s 실시간 상태 수집"""
    ctx = {}
    event_text = state["event_raw"]
    
    try:
        # 네임스페이스 추출 시도
        ns_match = __import__('re').search(r'namespace[=: ]+(\S+)', event_text, re.IGNORECASE)
        namespace = ns_match.group(1) if ns_match else "kube-system"
        
        # 최근 이벤트 수집
        events = k8s_core.list_namespaced_event(
            namespace=namespace,
            limit=20,
            field_selector="type=Warning"
        )
        ctx["recent_warnings"] = [
            {"reason": e.reason, "message": e.message, "object": e.involved_object.name}
            for e in events.items
        ]
        
        # 비정상 파드 수집
        pods = k8s_core.list_namespaced_pod(namespace=namespace)
        ctx["unhealthy_pods"] = [
            {"name": p.metadata.name, "phase": p.status.phase,
             "conditions": [c.type for c in (p.status.conditions or []) if not c.status == "True"]}
            for p in pods.items
            if p.status.phase not in ("Running", "Succeeded")
        ]
        
        # 노드 상태
        nodes = k8s_core.list_node()
        ctx["node_summary"] = {
            "total": len(nodes.items),
            "not_ready": [n.metadata.name for n in nodes.items
                          if not any(c.type == "Ready" and c.status == "True"
                                     for c in (n.status.conditions or []))]
        }
        
    except Exception as e:
        ctx["error"] = str(e)
    
    state["k8s_context"] = ctx
    return state

# ── Node 2: 증상 분석 ──────────────────────────────────────────
def analyze_symptoms(state: AIOpsState) -> AIOpsState:
    """이벤트 + K8s 컨텍스트 → 증상 구조화"""
    prompt = f"""
당신은 K8s 인프라 전문가입니다. 다음 이벤트와 클러스터 상태를 분석하여 정확한 증상을 파악하세요.

## 원본 이벤트/알람:
{state['event_raw']}

## 실시간 K8s 상태:
{json.dumps(state['k8s_context'], ensure_ascii=False, indent=2)}

다음 형식으로 분석하세요:
1. **주요 증상**: 가장 명확한 이상 현상
2. **영향 범위**: 영향받는 컴포넌트/네임스페이스/노드
3. **심각도**: Critical/High/Medium/Low
4. **검색 키워드**: 관련 문서 검색에 쓸 핵심 키워드 (3~5개)
"""
    response = llm.invoke([HumanMessage(content=prompt)])
    state["symptoms"] = response.content
    
    # 검색 쿼리 추출
    keywords = __import__('re').findall(r'\*\*검색 키워드\*\*:?\s*(.+)', response.content)
    state["search_query"] = keywords[0] if keywords else state["event_raw"][:200]
    
    return state

# ── Node 3: RAG 검색 (Hybrid Search) ─────────────────────────
def retrieve_knowledge(state: AIOpsState) -> AIOpsState:
    """Milvus Hybrid Search: 의미 검색 + 메타데이터 필터링"""
    query = state["search_query"]
    
    # 카테고리 필터: SOP 우선 검색
    sop_results = vector_store.similarity_search_with_score(
        query,
        k=3,
        expr='category == "SOP" and status == "active"'
    )
    
    # Library/Architecture도 추가 검색
    lib_results = vector_store.similarity_search_with_score(
        query,
        k=3,
        expr='(category == "Library" or category == "Architecture") and status == "active"'
    )
    
    all_results = sop_results + lib_results
    # 유사도 점수 기준 정렬 (낮을수록 유사)
    all_results.sort(key=lambda x: x[1])
    
    retrieved = []
    for doc, score in all_results[:5]:
        retrieved.append({
            "doc_id":   doc.metadata.get("doc_id"),
            "category": doc.metadata.get("category"),
            "severity": doc.metadata.get("severity"),
            "content":  doc.page_content,
            "score":    round(score, 4),
            "file_path":doc.metadata.get("file_path"),
        })
    
    state["retrieved_docs"] = retrieved
    return state

# ── Node 4: 근본 원인 분석 ────────────────────────────────────
def analyze_root_cause(state: AIOpsState) -> AIOpsState:
    """증상 + 검색 결과 → 근본 원인 분석 + 신뢰도 평가"""
    docs_context = "\n\n---\n".join([
        f"[{d['doc_id']}] (유사도: {d['score']})\n{d['content']}"
        for d in state["retrieved_docs"]
    ])
    
    prompt = f"""
당신은 1,000노드 K8s 클러스터 운영 전문가입니다.

## 분석된 증상:
{state['symptoms']}

## 참조 문서 (SOP/Library):
{docs_context}

## K8s 실시간 상태:
{json.dumps(state['k8s_context'], ensure_ascii=False)}

근본 원인을 분석하세요:

1. **근본 원인**: 가장 가능성 높은 원인 (1~2가지)
2. **원인 근거**: 증상과 참조 문서를 연결한 논리적 근거
3. **관련 컴포넌트**: 직접 관련된 K8s/Cilium/MinIO 컴포넌트
4. **분석 신뢰도**: 0.0~1.0 (참조 문서의 관련성이 낮으면 낮게 평가)
"""
    response = llm.invoke([HumanMessage(content=prompt)])
    state["root_cause"] = response.content
    
    # 신뢰도 추출
    conf_match = __import__('re').search(r'분석 신뢰도.*?([0-9]\.[0-9]+)', response.content)
    state["confidence"] = float(conf_match.group(1)) if conf_match else 0.5
    
    return state

# ── Node 5: 해결책 생성 ────────────────────────────────────────
def generate_solution(state: AIOpsState) -> AIOpsState:
    """근본 원인 → 단계별 해결책 + 실행 가능한 명령어 생성"""
    prompt = f"""
## 근본 원인 분석:
{state['root_cause']}

## 관련 SOP/Library 요약:
{chr(10).join([d['content'][:500] for d in state['retrieved_docs'][:2]])}

즉시 실행 가능한 해결책을 제시하세요:

1. **즉각 조치 (0~5)**: 피해 최소화를 위한 긴급 조치
2. **단계별 복구 절차**: 순서대로 실행할 kubectl/cilium 명령어 포함
3. **검증 방법**: 복구 성공 여부를 확인하는 방법
4. **재발 방지**: 같은 이슈가 재발하지 않도록 하는 설정 변경

```bash
# 즉각 조치 명령어 예시
# (실제 리소스명으로 교체 필요)

주의사항: 이 명령어는 Production 환경에 적용 전 반드시 검토하세요.
"""
response = llm.invoke([HumanMessage(content=prompt)])
state["solution"] = response.content
return state

── Node 6: 리스크 예측 ────────────────────────────────────────

def predict_risks(state: AIOpsState) -> AIOpsState:
"""현재 상태 기반 근시일 내 발생 가능한 리스크 예측"""
prompt = f"""

현재 장애 상황:

{state['symptoms']}

K8s 클러스터 상태:

{json.dumps(state['k8s_context'], ensure_ascii=False)}

이미 수집된 운영 지식:

{chr(10).join([d['content'][:300] for d in state['retrieved_docs'][:3]])}

현재 상황을 방치하거나 부분 조치만 취했을 때 발생 가능한 리스크를 예측하세요:

  1. 단기 리스크 (24시간 내): 연쇄 장애 가능성
  2. 중기 리스크 (1주일 내): 성능 저하 또는 데이터 손실 위험
  3. 선제 조치 권장: 리스크를 사전에 차단하기 위한 예방 조치

리스크별 확률(%)과 영향도(Critical/High/Medium)도 표시하세요.
"""
response = llm.invoke([HumanMessage(content=prompt)])
state["risk_prediction"] = response.content
return state

── 조건부 엣지: 신뢰도 낮으면 재검색 ───────────────────────

def should_retry_search(state: AIOpsState) -> str:
if state["confidence"] < 0.4 and state["retry_count"] < 2:
state["retry_count"] += 1
state["search_query"] = state["symptoms"][:300] # 더 넓은 쿼리로 재시도
print(f"[RETRY] Low confidence ({state['confidence']}), retrying search...")
return "retrieve_knowledge"
return "generate_solution"

── LangGraph 워크플로우 구성 ─────────────────────────────────

def build_aiops_graph() -> StateGraph:
graph = StateGraph(AIOpsState)

graph.add_node("collect_k8s_context", collect_k8s_context)
graph.add_node("analyze_symptoms",    analyze_symptoms)
graph.add_node("retrieve_knowledge",  retrieve_knowledge)
graph.add_node("analyze_root_cause",  analyze_root_cause)
graph.add_node("generate_solution",   generate_solution)
graph.add_node("predict_risks",       predict_risks)

graph.set_entry_point("collect_k8s_context")
graph.add_edge("collect_k8s_context", "analyze_symptoms")
graph.add_edge("analyze_symptoms",    "retrieve_knowledge")
graph.add_edge("retrieve_knowledge",  "analyze_root_cause")

# 조건부 엣지: 신뢰도 낮으면 재검색 루프
graph.add_conditional_edges("analyze_root_cause", should_retry_search,
                            {"retrieve_knowledge": "retrieve_knowledge",
                             "generate_solution": "generate_solution"})
graph.add_edge("generate_solution", "predict_risks")
graph.add_edge("predict_risks",     END)

return graph.compile()

def analyze_event(event_text: str) -> dict:
"""외부에서 호출하는 메인 분석 함수"""
graph = build_aiops_graph()
initial_state = AIOpsState(
messages=[],
event_raw=event_text,
symptoms="",
search_query="",
retrieved_docs=[],
root_cause="",
solution="",
risk_prediction="",
k8s_context={},
retry_count=0,
confidence=0.0,
)
result = graph.invoke(initial_state)
return {
"symptoms": result["symptoms"],
"root_cause": result["root_cause"],
"solution": result["solution"],
"risk_prediction": result["risk_prediction"],
"confidence": result["confidence"],
"referenced_docs": [d["doc_id"] for d in result["retrieved_docs"]],
}

if name == "main":

# 테스트 이벤트
test_event = """
ALERT: CiliumNetworkPolicy not enforced
Namespace: production
Node: k8s-worker-042
Message: BGP peer 10.0.0.1 connection lost, routes not propagated
Timestamp: 2026-04-29T03:14:00Z
"""
result = analyze_event(test_event)
import json
print(json.dumps(result, ensure_ascii=False, indent=2))

---

### 3-7. Argo Workflows 전체 파이프라인 YAML

```yaml
# argo-pipeline.yaml
# kubectl apply -f argo-pipeline.yaml
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: llm-wiki-pipeline
  namespace: platform-ops
spec:
  schedule: "0 2 * * *"     # 매일 새벽 2시
  timezone: "Asia/Seoul"
  concurrencyPolicy: Forbid  # 중복 실행 방지
  workflowSpec:
    serviceAccountName: argo-wiki-sa
    entrypoint: main-pipeline
    
    # 공유 볼륨 (파이프라인 단계 간 데이터 공유)
    volumes:
    - name: workspace
      persistentVolumeClaim:
        claimName: wiki-pipeline-pvc
    
    templates:
    # ── 메인 DAG ───────────────────────────────────────────────
    - name: main-pipeline
      dag:
        tasks:
        - name: step1-extract
          template: confluence-extractor
        
        - name: step2-process
          dependencies: [step1-extract]
          template: langchain-processor
        
        - name: step3-lifecycle
          dependencies: [step2-process]
          template: lifecycle-manager
        
        - name: step4-git-sync
          dependencies: [step3-lifecycle]
          template: git-syncer
        
        - name: step5-indexing
          dependencies: [step4-git-sync]
          template: vector-indexer
        
        - name: step6-notify
          dependencies: [step5-indexing]
          template: slack-notifier
    
    # ── Step 1: Confluence 증분 추출 ───────────────────────────
    - name: confluence-extractor
      container:
        image: internal-reg.com/wiki-pipeline/extractor:v1.2
        command: ["python", "incremental_export.py"]
        resources:
          requests: {cpu: "500m", memory: "1Gi"}
          limits:   {cpu: "2", memory: "2Gi"}
        envFrom:
        - secretRef:
            name: wiki-pipeline-secrets
        volumeMounts:
        - name: workspace
          mountPath: /workspace
    
    # ── Step 2: LLM 분류 처리 (GPU 활용) ──────────────────────
    - name: langchain-processor
      container:
        image: internal-reg.com/wiki-pipeline/processor:v1.2
        command: ["python", "processor.py"]
        resources:
          requests: {cpu: "2", memory: "8Gi", "nvidia.com/gpu": "1"}
          limits:   {cpu: "8", memory: "16Gi", "nvidia.com/gpu": "1"}
        nodeSelector:
          accelerator: nvidia-gpu
        envFrom:
        - secretRef:
            name: wiki-pipeline-secrets
        env:
        - name: LLM_ENDPOINT
          value: "http://vllm-service.ai-namespace:8000/v1"
        volumeMounts:
        - name: workspace
          mountPath: /workspace
    
    # ── Step 3: 생애주기 관리 ──────────────────────────────────
    - name: lifecycle-manager
      container:
        image: internal-reg.com/wiki-pipeline/processor:v1.2
        command: ["python", "lifecycle_manager.py", "--action", "archive",
                  "--clone-dir", "/workspace/llm-wiki"]
        resources:
          requests: {cpu: "200m", memory: "512Mi"}
          limits:   {cpu: "1", memory: "1Gi"}
        envFrom:
        - secretRef:
            name: wiki-pipeline-secrets
        volumeMounts:
        - name: workspace
          mountPath: /workspace
    
    # ── Step 4: Git 동기화 ─────────────────────────────────────
    - name: git-syncer
      container:
        image: internal-reg.com/wiki-pipeline/git-tools:v1.0
        command: ["/bin/bash", "/app/sync_to_git.sh"]
        resources:
          requests: {cpu: "200m", memory: "256Mi"}
        envFrom:
        - secretRef:
            name: wiki-pipeline-secrets
        volumeMounts:
        - name: workspace
          mountPath: /workspace
    
    # ── Step 5: Vector DB 인덱싱 ──────────────────────────────
    - name: vector-indexer
      container:
        image: internal-reg.com/wiki-pipeline/indexer:v1.2
        command: ["python", "indexer.py"]
        resources:
          requests: {cpu: "1", memory: "4Gi"}
          limits:   {cpu: "4", memory: "8Gi"}
        env:
        - name: WIKI_DIR
          value: "/workspace/llm-wiki/wiki/active"
        - name: EMBEDDING_MODEL_PATH
          value: "/models/bge-m3"
        envFrom:
        - secretRef:
            name: wiki-pipeline-secrets
        volumeMounts:
        - name: workspace
          mountPath: /workspace
        - name: models
          mountPath: /models
          readOnly: true
      volumes:
      - name: models
        hostPath:
          path: /data/models
    
    # ── Step 6: 완료 알림 ─────────────────────────────────────
    - name: slack-notifier
      container:
        image: curlimages/curl:latest
        command: [sh, -c]
        args:
        - |
          curl -X POST $SLACK_WEBHOOK_URL \
            -H 'Content-type: application/json' \
            -d '{"text":"✅ LLM Wiki 파이프라인 완료 ('"$(date '+%Y-%m-%d %H:%M')"')\n자세한 내용: Argo UI 확인"}'
        envFrom:
        - secretRef:
            name: wiki-pipeline-secrets
---
# K8s Secret (실제 배포 시 Vault 또는 Sealed Secrets 권장)
apiVersion: v1
kind: Secret
metadata:
  name: wiki-pipeline-secrets
  namespace: platform-ops
type: Opaque
stringData:
  CONFLUENCE_URL:   "https://your-domain.atlassian.net/wiki"
  CONFLUENCE_USER:  "admin@company.com"
  CONFLUENCE_TOKEN: "your-api-token"
  PARENT_PAGE_ID:   "12345678"
  MINIO_URL:        "minio.storage.svc.cluster.local:9000"
  MINIO_ACCESS_KEY: "minio-user"
  MINIO_SECRET_KEY: "minio-password"
  GIT_TOKEN:        "your-git-token"
  GIT_HOST:         "bitbucket.internal.com"
  MILVUS_HOST:      "milvus.storage.svc.cluster.local"
  SLACK_WEBHOOK_URL:"https://hooks.slack.com/services/..."

3-8. AIOps API 서버 (api_server.py)

Lens 로컬 PC 및 모니터링 시스템에서 에이전트를 호출하는 FastAPI 엔드포인트입니다.

#!/usr/bin/env python3
"""
AIOps Agent FastAPI 서버
- Lens(로컬 PC) 에서 HTTP 호출
- Alertmanager Webhook 수신
- 모니터링 시스템 연동
"""
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import asyncio
from aiops_agent import analyze_event  # 위의 LangGraph 에이전트 임포트
import uvicorn

app = FastAPI(title="AIOps Agent API", version="1.0.0")

class EventRequest(BaseModel):
    event_text: str
    source: str = "manual"        # manual | alertmanager | prometheus | lens
    namespace: Optional[str] = None
    severity: Optional[str] = None

class AlertmanagerWebhook(BaseModel):
    """Prometheus Alertmanager Webhook 형식"""
    alerts: list
    groupLabels: dict
    commonAnnotations: dict

# ── 즉시 분석 엔드포인트 ─────────────────────────────────────
@app.post("/analyze")
async def analyze(req: EventRequest):
    """이벤트 텍스트 즉시 분석 (Lens 또는 사내 모니터링 시스템에서 호출)"""
    result = await asyncio.get_event_loop().run_in_executor(
        None, analyze_event, req.event_text
    )
    return {
        "status": "ok",
        "source": req.source,
        "analysis": result
    }

# ── Alertmanager Webhook 수신 ─────────────────────────────────
@app.post("/webhook/alertmanager")
async def alertmanager_webhook(payload: AlertmanagerWebhook, background_tasks: BackgroundTasks):
    """Prometheus Alertmanager에서 실시간 알람 수신 → 백그라운드 분석"""
    for alert in payload.alerts:
        if alert.get("status") == "firing":
            event_text = (
                f"ALERT: {alert.get('labels', {}).get('alertname', 'Unknown')}\n"
                f"Namespace: {alert.get('labels', {}).get('namespace', 'unknown')}\n"
                f"Severity: {alert.get('labels', {}).get('severity', 'unknown')}\n"
                f"Summary: {alert.get('annotations', {}).get('summary', '')}\n"
                f"Description: {alert.get('annotations', {}).get('description', '')}"
            )
            background_tasks.add_task(_analyze_and_notify, event_text)
    return {"status": "received"}

async def _analyze_and_notify(event_text: str):
    """백그라운드: 분석 후 Slack으로 결과 전송"""
    import httpx, os
    result = await asyncio.get_event_loop().run_in_executor(None, analyze_event, event_text)
    
    slack_msg = {
        "blocks": [
            {"type": "header", "text": {"type": "plain_text",
             "text": f"🚨 AIOps 장애 분석 결과 (신뢰도: {result['confidence']:.0%})"}},
            {"type": "section", "text": {"type": "mrkdwn",
             "text": f"*📋 증상 요약*\n{result['symptoms'][:500]}"}},
            {"type": "divider"},
            {"type": "section", "text": {"type": "mrkdwn",
             "text": f"*🔍 근본 원인*\n{result['root_cause'][:600]}"}},
            {"type": "divider"},
            {"type": "section", "text": {"type": "mrkdwn",
             "text": f"*🛠 해결책*\n{result['solution'][:600]}"}},
            {"type": "divider"},
            {"type": "section", "text": {"type": "mrkdwn",
             "text": f"*⚠️ 리스크 예측*\n{result['risk_prediction'][:400]}"}},
            {"type": "context", "elements": [
                {"type": "mrkdwn",
                 "text": f"참조 문서: {', '.join(result['referenced_docs'])}"}
            ]}
        ]
    }
    
    webhook_url = os.getenv("SLACK_WEBHOOK_URL")
    if webhook_url:
        async with httpx.AsyncClient() as client:
            await client.post(webhook_url, json=slack_msg)

# ── 리스크 예측 전용 엔드포인트 ──────────────────────────────
@app.get("/risk-scan")
async def proactive_risk_scan():
    """정기 리스크 스캔 (cron으로 매시간 호출 가능)"""
    from kubernetes import client as k8s, config
    config.load_incluster_config()
    v1 = k8s.CoreV1Api()
    
    # 경고 이벤트 수집
    events = v1.list_event_for_all_namespaces(
        field_selector="type=Warning", limit=50
    )
    event_summary = "\n".join([
        f"{e.involved_object.namespace}/{e.involved_object.name}: {e.reason} - {e.message}"
        for e in events.items
    ])
    
    if not event_summary:
        return {"status": "healthy", "message": "No warning events detected"}
    
    result = await asyncio.get_event_loop().run_in_executor(
        None, analyze_event, f"정기 리스크 스캔\n수집된 경고 이벤트:\n{event_summary}"
    )
    return {"status": "risks_detected", "analysis": result}

# ── Wiki 검색 엔드포인트 ──────────────────────────────────────
@app.get("/search")
async def search_wiki(q: str, category: Optional[str] = None, limit: int = 5):
    """Lens에서 직접 Wiki 검색 (RAG 없이 순수 검색)"""
    from langchain_community.vectorstores import Milvus as MilvusVS
    from langchain_huggingface import HuggingFaceEmbeddings
    import os
    
    emb = HuggingFaceEmbeddings(model_name=os.getenv("EMBEDDING_MODEL_PATH", "/models/bge-m3"))
    vs = MilvusVS(embedding_function=emb, collection_name="platform_ops_wiki",
                  connection_args={"host": os.getenv("MILVUS_HOST"), "port": "19530"})
    
    expr = f'category == "{category}" and status == "active"' if category else 'status == "active"'
    results = vs.similarity_search_with_score(q, k=limit, expr=expr)
    
    return {"results": [
        {"doc_id": r.metadata.get("doc_id"), "score": s,
         "summary": r.metadata.get("summary"), "file": r.metadata.get("file_path")}
        for r, s in results
    ]}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8080)

4. Lens 연동 설정 (로컬 PC)

Lens에서 사내 AIOps API 서버와 연동하는 방법입니다.

4-1. Lens Extension 설정 (Prometheus Metrics 연동)

# lens-extension-config.yaml
# Lens > Extensions > AIOps 설정
aiops:
  apiEndpoint: "https://aiops-agent.internal.com"
  alertWebhook: "/webhook/alertmanager"
  searchEndpoint: "/search"
  
  # 사내 LLM API (Lens에서 직접 쿼리)
  llmApi:
    endpoint: "https://llm-internal.company.com/v1"
    model: "llama-3-70b"
    authHeader: "Bearer ${INTERNAL_LLM_TOKEN}"

4-2. Alertmanager 연동 설정

# alertmanager-config.yaml (기존 Alertmanager에 webhook 추가)
route:
  group_by: ['alertname', 'namespace']
  group_wait: 30s
  group_interval: 5m
  repeat_interval: 4h
  receiver: 'slack-default'
  routes:
  - match:
      severity: critical
    receiver: 'aiops-agent'    # Critical 알람은 AIOps 에이전트로

receivers:
- name: 'aiops-agent'
  webhook_configs:
  - url: 'http://aiops-agent-svc.platform-ops:8080/webhook/alertmanager'
    send_resolved: false
    
- name: 'slack-default'
  slack_configs:
  - api_url: 'https://hooks.slack.com/services/...'
    channel: '#platform-alerts'

5. Git 폴더 구조 (최종)

llm-wiki/                          ← Git 저장소 루트
├── raw/                           ← Confluence 원본 (수정 금지)
│   └── 2026-04-29/
│       ├── Cilium-BGP-Guide.md
│       └── MinIO-Scaling.md
│
├── wiki/
│   ├── active/                    ← 현재 운영 중인 지식 (RAG 대상)
│   │   ├── SOP/
│   │   │   ├── Network/
│   │   │   │   └── cilium-bgp-peering-recovery.md
│   │   │   └── Storage/
│   │   │       └── minio-node-expansion.md
│   │   ├── Library/
│   │   │   ├── Cilium/
│   │   │   └── K8s/
│   │   ├── Architecture/
│   │   └── Reports/
│   │
│   └── archive/                   ← 폐기된 구버전 문서 (RAG 제외)
│       └── SOP/Network/
│           └── cilium-1.14-bgp-old.md
│
├── scripts/                       ← 파이프라인 스크립트
│   ├── incremental_export.py
│   ├── processor.py
│   ├── lifecycle_manager.py
│   ├── indexer.py
│   └── sync_to_git.sh
│
└── .gitlab-ci.yml                 ← GitLab 전환 후 사용할 CI 설정

6. GitLab CI/CD (6개월 후 전환 대비)

# .gitlab-ci.yml
stages:
  - validate
  - index
  - notify

validate-lifecycle:
  stage: validate
  image: internal-reg.com/wiki-pipeline/processor:v1.2
  script:
    - python scripts/lifecycle_manager.py --action check --clone-dir .
  rules:
    - if: $CI_COMMIT_BRANCH == "main"
      changes:
        - "wiki/**/*"

vector-indexing:
  stage: index
  image: internal-reg.com/wiki-pipeline/indexer:v1.2
  script:
    - python scripts/indexer.py
  variables:
    WIKI_DIR: "./wiki/active"
  rules:
    - if: $CI_COMMIT_BRANCH == "main"
      changes:
        - "wiki/active/**/*"

notify-update:
  stage: notify
  image: curlimages/curl
  script:
    - |
      CHANGED=$(git diff HEAD~1 --name-only wiki/ | wc -l)
      curl -X POST $SLACK_WEBHOOK_URL \
        -d "{\"text\":\"📚 Wiki 업데이트: ${CHANGED}개 문서 변경 (${CI_COMMIT_SHORT_SHA})\"}"
  rules:
    - if: $CI_COMMIT_BRANCH == "main"

7. 전체 운영 체크리스트

단계항목상태
인프라MinIO AIStor 버킷 생성 (confluence-wiki)
인프라Milvus K8s Operator 설치
인프라vLLM Pod 배포 (GPU 노드)
인프라BGE-M3 임베딩 모델 반입 (폐쇄망)
파이프라인Argo Workflows CronWorkflow 배포
파이프라인K8s Secret 생성 (Confluence/Git/MinIO 인증정보)
파이프라인첫 전체 추출 실행 (manual trigger)
인덱싱Vector DB 초기 인덱싱 완료 확인
모니터링AIOps API 서버 배포
모니터링Alertmanager Webhook 연동
로컬Lens AIOps Extension 설정
거버넌스Slack 알림 채널 설정 (180일 검증 알림)
전환GitLab .gitlab-ci.yml 준비 (6개월 후)

문서 ID: ARCH-OPS-001 | 작성일: 2026-04-29 | 버전: v1.0 | 검토자: Platform Lead

0개의 댓글