[AI] 모델 병렬구조 구체화 초기 작업

seongyun·2025년 6월 22일

Hancom Project

목록 보기
4/12

main_model.py

import asyncio
import torch
import time
import logging
from typing import Dict, List, Optional, Any
from vllm import LLM, SamplingParams
from vllm.lora.request import LoRARequest
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
from dataclasses import dataclass

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class ModelConfig:
    name: str
    adapter_path: str
    lora_id: int
    max_tokens: int
    temperature: float
    top_p: float

class VLLMMultiLoRAEngine:
    """vLLM 기반 4차 모델 병렬 추론 엔진"""
    
    def __init__(self, base_model_path: str, model_configs: Dict[str, ModelConfig]):
        self.base_model_path = base_model_path
        self.model_configs = model_configs
        self.llm = None
        self.performance_stats = {}
        
    def initialize_engine(self):
        """vLLM 엔진 초기화 - T4 최적화 설정"""[11]
        logger.info("🚀 vLLM 다중 LoRA 엔진 초기화 중...")
        
        try:
            # T4 환경 최적화 설정
            self.llm = LLM(
                model=self.base_model_path,
                enable_lora=True,
                max_loras=4,  # 4개 어댑터 동시 지원
                max_lora_rank=32,  # LoRA rank 설정
                max_cpu_loras=8,  # CPU 캐시 어댑터 수
                dtype="half",  # T4에서 bfloat16 대신 half 사용
                max_model_len=2048,  # T4 메모리 제약 고려
                gpu_memory_utilization=0.9,  # GPU 메모리 90% 활용
                tensor_parallel_size=1,  # 단일 GPU
                trust_remote_code=True
            )
            
            logger.info("✅ vLLM 엔진 초기화 완료")
            
        except Exception as e:
            logger.error(f"❌ vLLM 엔진 초기화 실패: {e}")
            raise
    
    def generate_parallel_responses(self, requests: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """병렬 추론 실행"""[11][13]
        logger.info(f"🔄 {len(requests)}개 요청 병렬 처리 시작...")
        
        # 요청별 LoRA 설정 준비
        prompts = []
        sampling_params_list = []
        lora_requests = []
        
        for req in requests:
            model_type = req['model_type']
            config = self.model_configs[model_type]
            
            prompts.append(req['prompt'])
            
            # 모델별 샘플링 파라미터
            sampling_params = SamplingParams(
                temperature=req.get('temperature', config.temperature),
                max_tokens=req.get('max_tokens', config.max_tokens),
                top_p=req.get('top_p', config.top_p),
                stop=req.get('stop', None)
            )
            sampling_params_list.append(sampling_params)
            
            # LoRA 요청 생성
            lora_request = LoRARequest(
                lora_name=config.name,
                lora_int_id=config.lora_id,
                lora_path=config.adapter_path
            )
            lora_requests.append(lora_request)
        
        # 병렬 추론 실행
        start_time = time.time()
        
        try:
            # vLLM의 배치 추론으로 모든 요청을 동시 처리
            outputs = self.llm.generate(
                prompts,
                sampling_params_list[0],  # 기본 샘플링 파라미터
                lora_request=lora_requests[0] if len(set(req['model_type'] for req in requests)) == 1 else None
            )
            
            total_time = time.time() - start_time
            
            # 결과 정리
            results = []
            for i, (req, output) in enumerate(zip(requests, outputs)):
                result = {
                    'request_id': req.get('request_id', i),
                    'model_type': req['model_type'],
                    'generated_text': output.outputs[0].text,
                    'input_tokens': len(output.prompt_token_ids),
                    'output_tokens': len(output.outputs[0].token_ids),
                    'tokens_per_second': len(output.outputs[0].token_ids) / total_time,
                    'finish_reason': output.outputs[0].finish_reason
                }
                results.append(result)
            
            logger.info(f"✅ 병렬 처리 완료 ({total_time:.2f}초)")
            return results
            
        except Exception as e:
            logger.error(f"❌ 병렬 추론 실패: {e}")
            raise
    
    def generate_mixed_batch_responses(self, requests: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """서로 다른 LoRA 어댑터를 사용하는 혼합 배치 처리"""[1][13]
        logger.info(f"🔄 혼합 배치 처리 시작 ({len(requests)}개 요청)...")
        
        results = []
        
        # 모델 타입별로 요청 그룹화
        grouped_requests = {}
        for req in requests:
            model_type = req['model_type']
            if model_type not in grouped_requests:
                grouped_requests[model_type] = []
            grouped_requests[model_type].append(req)
        
        # 각 모델 타입별로 배치 처리
        for model_type, model_requests in grouped_requests.items():
            config = self.model_configs[model_type]
            
            prompts = [req['prompt'] for req in model_requests]
            
            # 모델별 샘플링 파라미터
            sampling_params = SamplingParams(
                temperature=config.temperature,
                max_tokens=config.max_tokens,
                top_p=config.top_p
            )
            
            # LoRA 요청 생성
            lora_request = LoRARequest(
                lora_name=config.name,
                lora_int_id=config.lora_id,
                lora_path=config.adapter_path
            )
            
            start_time = time.time()
            
            # 배치 추론 실행
            outputs = self.llm.generate(
                prompts,
                sampling_params,
                lora_request=lora_request
            )
            
            batch_time = time.time() - start_time
            
            # 결과 정리
            for req, output in zip(model_requests, outputs):
                result = {
                    'request_id': req.get('request_id'),
                    'model_type': model_type,
                    'generated_text': output.outputs[0].text,
                    'input_tokens': len(output.prompt_token_ids),
                    'output_tokens': len(output.outputs[0].token_ids),
                    'tokens_per_second': len(output.outputs[0].token_ids) / batch_time,
                    'batch_time': batch_time,
                    'finish_reason': output.outputs[0].finish_reason
                }
                results.append(result)
        
        return results

class AsyncMultiLoRAServer:
    """비동기 다중 LoRA 서버"""
    
    def __init__(self, engine: VLLMMultiLoRAEngine):
        self.engine = engine
        self.request_queue = asyncio.Queue()
        self.response_futures = {}
        
    async def add_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """비동기 요청 추가"""
        request_id = request.get('request_id', time.time())
        future = asyncio.Future()
        
        self.response_futures[request_id] = future
        await self.request_queue.put(request)
        
        return await future
    
    async def process_requests_batch(self, batch_size: int = 4):
        """배치 단위로 요청 처리"""[5]
        while True:
            batch_requests = []
            
            # 배치 크기만큼 요청 수집
            for _ in range(batch_size):
                try:
                    request = await asyncio.wait_for(
                        self.request_queue.get(), 
                        timeout=1.0
                    )
                    batch_requests.append(request)
                except asyncio.TimeoutError:
                    break
            
            if not batch_requests:
                await asyncio.sleep(0.1)
                continue
            
            # 배치 처리 실행
            try:
                results = self.engine.generate_mixed_batch_responses(batch_requests)
                
                # 결과를 해당 Future에 설정
                for result in results:
                    request_id = result['request_id']
                    if request_id in self.response_futures:
                        self.response_futures[request_id].set_result(result)
                        del self.response_futures[request_id]
                        
            except Exception as e:
                # 오류 발생 시 모든 Future에 예외 설정
                for request in batch_requests:
                    request_id = request.get('request_id')
                    if request_id in self.response_futures:
                        self.response_futures[request_id].set_exception(e)
                        del self.response_futures[request_id]

class ParallelPerformanceTester:
    """병렬 성능 테스터"""
    
    def __init__(self, engine: VLLMMultiLoRAEngine):
        self.engine = engine
        
    def run_concurrent_benchmark(self, test_cases: Dict[str, str], 
                                concurrent_requests: int = 8, 
                                iterations: int = 3) -> Dict[str, Any]:
        """동시 요청 성능 벤치마크"""[4][7]
        logger.info(f"🧪 동시 요청 벤치마크 시작 ({concurrent_requests}개 동시 요청)")
        
        all_results = []
        
        for iteration in range(iterations):
            logger.info(f"반복 {iteration + 1}/{iterations}")
            
            # 동시 요청 생성
            requests = []
            for i in range(concurrent_requests):
                model_type = list(test_cases.keys())[i % len(test_cases)]
                prompt = test_cases[model_type]
                
                requests.append({
                    'request_id': f"{iteration}_{i}",
                    'model_type': model_type,
                    'prompt': prompt
                })
            
            # 병렬 실행
            start_time = time.time()
            results = self.engine.generate_mixed_batch_responses(requests)
            total_time = time.time() - start_time
            
            # 통계 수집
            for result in results:
                result['iteration'] = iteration + 1
                result['total_batch_time'] = total_time
                all_results.append(result)
        
        return self._analyze_results(all_results, concurrent_requests)
    
    def _analyze_results(self, results: List[Dict], concurrent_requests: int) -> Dict[str, Any]:
        """결과 분석"""
        import pandas as pd
        
        df = pd.DataFrame(results)
        
        analysis = {
            'total_requests': len(results),
            'concurrent_requests': concurrent_requests,
            'model_performance': {},
            'overall_performance': {
                'avg_tokens_per_second': df['tokens_per_second'].mean(),
                'avg_batch_time': df['total_batch_time'].mean(),
                'requests_per_second': len(results) / df['total_batch_time'].sum(),
                'max_batch_time': df['total_batch_time'].max(),
                'min_batch_time': df['total_batch_time'].min()
            }
        }
        
        # 모델별 성능 분석
        for model_type in df['model_type'].unique():
            model_data = df[df['model_type'] == model_type]
            analysis['model_performance'][model_type] = {
                'avg_tokens_per_second': model_data['tokens_per_second'].mean(),
                'avg_output_tokens': model_data['output_tokens'].mean(),
                'request_count': len(model_data)
            }
        
        return analysis

def create_model_configs() -> Dict[str, ModelConfig]:
    """4차 모델 설정 생성"""
    return {
        "autocomplete": ModelConfig(
            name="autocomplete",
            adapter_path="/home/ubuntu/deepseek-coder/models/autocomplete-finetuned",
            lora_id=1,
            max_tokens=128,
            temperature=0.3,
            top_p=0.9
        ),
        "prompt": ModelConfig(
            name="prompt",
            adapter_path="/home/ubuntu/deepseek-coder/models/prompt-finetuned",
            lora_id=2,
            max_tokens=512,
            temperature=0.6,
            top_p=0.95
        ),
        "comment": ModelConfig(
            name="comment",
            adapter_path="/home/ubuntu/deepseek-coder/models/comment-finetuned",
            lora_id=3,
            max_tokens=256,
            temperature=0.4,
            top_p=0.9
        ),
        "error_fix": ModelConfig(
            name="error_fix",
            adapter_path="/home/ubuntu/deepseek-coder/models/error-fix-finetuned",
            lora_id=4,
            max_tokens=512,
            temperature=0.5,
            top_p=0.9
        )
    }

# 메인 실행 코드
if __name__ == "__main__":
    # 설정
    base_model_path = "/home/ubuntu/deepseek-coder/models/base-model"
    model_configs = create_model_configs()
    
    # 엔진 초기화
    engine = VLLMMultiLoRAEngine(base_model_path, model_configs)
    engine.initialize_engine()
    
    # 테스트 케이스
    test_cases = {
        "autocomplete": "def fibonacci(n):",
        "prompt": "Write a Python function to calculate factorial",
        "comment": "# Sort a list using quicksort algorithm",
        "error_fix": "Fix this code: def hello(\n    print('Hello')"
    }
    
    # 성능 테스트
    tester = ParallelPerformanceTester(engine)
    results = tester.run_concurrent_benchmark(test_cases, concurrent_requests=8)
    
    print("\n📊 병렬 성능 분석 결과:")
    print(json.dumps(results, indent=2, ensure_ascii=False))

base_model.py

import asyncio
import torch
import time
import logging
from typing import Dict, List, Optional, Any
from vllm import LLM, SamplingParams
from vllm.lora.request import LoRARequest
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import numpy as np
from dataclasses import dataclass
import re

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class DynamicModelConfig:
    name: str
    adapter_path: str
    lora_id: int
    base_tokens: int
    max_tokens: int
    temperature: float
    top_p: float
    priority_weight: float
    usage_history: List[float]

@dataclass
class TokenBudget:
    request_id: str
    model_type: str
    estimated_tokens: int
    allocated_tokens: int
    priority_score: float
    complexity_level: str

class IntelligentPromptRouter:
    """지능형 프롬프트 라우터 - AWS 다중 LLM 라우팅 방식 적용"""
    
    def __init__(self):
        self.routing_patterns = {
            'autocomplete': [
                r'def\s+\w+\(',
                r'class\s+\w+',
                r'for\s+\w+\s+in',
                r'if\s+\w+.*:$',
                r'import\s+\w+',
                r'from\s+\w+\s+import'
            ],
            'prompt': [
                r'write\s+a\s+function',
                r'create\s+a\s+class',
                r'implement\s+.*algorithm',
                r'generate\s+code',
                r'build\s+a\s+program',
                r'develop\s+.*'
            ],
            'comment': [
                r'^#\s+.*',
                r'""".*"""',
                r"'''.*'''",
                r'//\s+.*',
                r'/\*.*\*/'
            ],
            'error_fix': [
                r'fix\s+this\s+code',
                r'error\s+in\s+.*',
                r'debug\s+.*',
                r'syntax\s+error',
                r'correct\s+.*',
                r'repair\s+.*'
            ]
        }
        self.routing_history = {}
        self.performance_metrics = {}
    
    def classify_prompt(self, prompt: str) -> str:
        """프롬프트 분류 및 최적 모델 선택"""[7]
        prompt_lower = prompt.lower().strip()
        scores = {}
        
        # 패턴 매칭 점수 계산
        for model_type, patterns in self.routing_patterns.items():
            score = 0
            for pattern in patterns:
                if re.search(pattern, prompt_lower, re.IGNORECASE | re.MULTILINE):
                    score += 1
            scores[model_type] = score
        
        # 성능 히스토리 반영
        for model_type in scores:
            if model_type in self.performance_metrics:
                performance_weight = self.performance_metrics[model_type].get('success_rate', 0.5)
                scores[model_type] *= (1 + performance_weight * 0.3)
        
        # 가장 높은 점수의 모델 선택
        if max(scores.values()) > 0:
            return max(scores, key=scores.get)
        
        # 기본값: 프롬프트 길이 기반 선택
        if len(prompt) < 50:
            return 'autocomplete'
        else:
            return 'prompt'
    
    def update_performance_metrics(self, model_type: str, success: bool, response_time: float):
        """성능 메트릭 업데이트"""
        if model_type not in self.performance_metrics:
            self.performance_metrics[model_type] = {
                'total_requests': 0,
                'success_count': 0,
                'avg_response_time': 0
            }
        
        metrics = self.performance_metrics[model_type]
        metrics['total_requests'] += 1
        if success:
            metrics['success_count'] += 1
        
        metrics['success_rate'] = metrics['success_count'] / metrics['total_requests']
        metrics['avg_response_time'] = (metrics['avg_response_time'] * (metrics['total_requests'] - 1) + 
                                       response_time) / metrics['total_requests']

class DynamicTokenAllocator:
    """사용 패턴 기반 동적 토큰 할당기"""
    
    def __init__(self, total_token_budget: int = 2048):
        self.total_token_budget = total_token_budget
        self.usage_stats = {}
        self.allocation_history = []
    
    def estimate_token_budget(self, prompt: str, model_type: str) -> TokenBudget:
        """SelfBudgeter 방식의 토큰 버젯 추정"""[2]
        # 프롬프트 복잡도 분석
        complexity_indicators = {
            'length': len(prompt.split()),
            'code_blocks': prompt.count('```'),
            'functions': prompt.count('def '),
            'classes': prompt.count('class '),
            'loops': prompt.count('for ') + prompt.count('while '),
            'conditionals': prompt.count('if ') + prompt.count('elif ')
        }
        
        # 복잡도 레벨 결정
        complexity_score = sum(complexity_indicators.values())
        if complexity_score < 10:
            complexity_level = "simple"
            base_tokens = 64
        elif complexity_score < 30:
            complexity_level = "medium"
            base_tokens = 128
        else:
            complexity_level = "complex"
            base_tokens = 256
        
        # 모델별 조정
        model_multipliers = {
            "autocomplete": 0.5,
            "prompt": 1.0,
            "comment": 0.7,
            "error_fix": 0.8
        }
        
        estimated_tokens = int(base_tokens * model_multipliers.get(model_type, 1.0))
        
        # 우선순위 점수 계산
        priority_score = self._calculate_priority(model_type, complexity_level)
        
        return TokenBudget(
            request_id=f"{time.time()}_{model_type}",
            model_type=model_type,
            estimated_tokens=estimated_tokens,
            allocated_tokens=estimated_tokens,
            priority_score=priority_score,
            complexity_level=complexity_level
        )
    
    def _calculate_priority(self, model_type: str, complexity_level: str) -> float:
        """우선순위 점수 계산"""
        base_priorities = {
            "autocomplete": 1.5,  # 실시간 응답 필요
            "prompt": 1.0,
            "comment": 0.8,
            "error_fix": 1.2
        }
        
        complexity_weights = {
            "simple": 1.2,  # 간단한 작업 우선
            "medium": 1.0,
            "complex": 0.8
        }
        
        # 최근 사용 통계 반영
        usage_weight = 1.0
        if model_type in self.usage_stats:
            recent_usage = self.usage_stats[model_type].get('recent_usage', [])
            if recent_usage:
                avg_recent_usage = np.mean(recent_usage[-5:])  # 최근 5개 평균
                usage_weight = 1.0 + (avg_recent_usage / 100.0)  # 사용량에 비례
        
        return (base_priorities.get(model_type, 1.0) * 
                complexity_weights.get(complexity_level, 1.0) * 
                usage_weight)
    
    def allocate_dynamic_budget(self, requests: List[TokenBudget]) -> Dict[str, int]:
        """동적 토큰 버젯 할당"""[1]
        # 우선순위 기반 정렬
        sorted_requests = sorted(requests, key=lambda x: x.priority_score, reverse=True)
        
        allocation = {}
        remaining_budget = self.total_token_budget
        
        for request in sorted_requests:
            # 최소 보장 토큰
            min_tokens = max(32, request.estimated_tokens // 2)
            
            # 사용 가능한 토큰 계산
            available_tokens = min(request.estimated_tokens, remaining_budget)
            allocated = max(min_tokens, available_tokens)
            
            allocation[request.request_id] = allocated
            remaining_budget -= allocated
            
            if remaining_budget <= 0:
                break
        
        return allocation
    
    def update_usage_stats(self, model_type: str, tokens_used: int, 
                          response_time: float, success: bool):
        """모델별 사용 통계 업데이트"""
        if model_type not in self.usage_stats:
            self.usage_stats[model_type] = {
                'total_requests': 0,
                'avg_tokens': 0,
                'avg_response_time': 0,
                'success_rate': 0,
                'recent_usage': []
            }
        
        stats = self.usage_stats[model_type]
        stats['total_requests'] += 1
        stats['avg_tokens'] = (stats['avg_tokens'] * (stats['total_requests'] - 1) + 
                              tokens_used) / stats['total_requests']
        stats['avg_response_time'] = (stats['avg_response_time'] * (stats['total_requests'] - 1) + 
                                     response_time) / stats['total_requests']
        stats['success_rate'] = (stats['success_rate'] * (stats['total_requests'] - 1) + 
                                (1 if success else 0)) / stats['total_requests']
        
        # 최근 사용량 추적 (최대 10개)
        stats['recent_usage'].append(tokens_used)
        if len(stats['recent_usage']) > 10:
            stats['recent_usage'].pop(0)

class EnhancedVLLMMultiLoRAEngine:
    """동적 토큰 할당과 지능형 라우팅이 통합된 vLLM 기반 4차 모델 병렬 추론 엔진"""
    
    def __init__(self, base_model_path: str, model_configs: Dict[str, DynamicModelConfig]):
        self.base_model_path = base_model_path
        self.model_configs = model_configs
        self.llm = None
        self.performance_stats = {}
        
        # 동적 컴포넌트 초기화
        self.router = IntelligentPromptRouter()
        self.allocator = DynamicTokenAllocator()
        self.prefix_cache = {}
        
    def initialize_engine(self):
        """vLLM 엔진 초기화 - T4 최적화 설정"""[2]
        logger.info("🚀 Enhanced vLLM 다중 LoRA 엔진 초기화 중...")
        
        try:
            # T4 환경 최적화 설정
            self.llm = LLM(
                model=self.base_model_path,
                enable_lora=True,
                max_loras=4,  # 4개 어댑터 동시 지원
                max_lora_rank=32,  # LoRA rank 설정
                max_cpu_loras=8,  # CPU 캐시 어댑터 수
                dtype="half",  # T4에서 bfloat16 대신 half 사용
                max_model_len=2048,  # T4 메모리 제약 고려
                gpu_memory_utilization=0.9,  # GPU 메모리 90% 활용
                tensor_parallel_size=1,  # 단일 GPU
                trust_remote_code=True,
                # 동적 배칭 설정
                enable_chunked_prefill=True,
                max_num_batched_tokens=512,  # 동적 조정 가능
                enable_prefix_caching=True
            )
            
            logger.info("✅ Enhanced vLLM 엔진 초기화 완료")
            
        except Exception as e:
            logger.error(f"❌ vLLM 엔진 초기화 실패: {e}")
            raise
    
    def generate_adaptive_responses(self, requests: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """적응형 응답 생성 - 자동 라우팅 + 동적 토큰 할당"""[3][4]
        logger.info(f"🔄 {len(requests)}개 요청 적응형 처리 시작...")
        
        # 1. 자동 라우팅 적용
        for req in requests:
            if 'model_type' not in req:
                req['model_type'] = self.router.classify_prompt(req['prompt'])
        
        # 2. 토큰 버젯 추정
        token_budgets = []
        for req in requests:
            budget = self.allocator.estimate_token_budget(req['prompt'], req['model_type'])
            budget.request_id = req.get('request_id', budget.request_id)
            token_budgets.append(budget)
        
        # 3. 동적 토큰 할당
        allocation = self.allocator.allocate_dynamic_budget(token_budgets)
        
        # 4. 프리픽스 기반 그룹화
        prefix_groups = self._group_by_prefix(requests)
        
        results = []
        
        for prefix, group_requests in prefix_groups.items():
            batch_results = self._execute_adaptive_batch(
                group_requests, allocation, prefix, token_budgets
            )
            results.extend(batch_results)
        
        return results
    
    def _group_by_prefix(self, requests: List[Dict]) -> Dict[str, List[Dict]]:
        """공통 프리픽스 기반 요청 그룹화"""[3]
        groups = {}
        
        for req in requests:
            # 간단한 프리픽스 추출
            prompt = req['prompt']
            prefix = prompt[:min(50, len(prompt))]
            
            if prefix not in groups:
                groups[prefix] = []
            groups[prefix].append(req)
        
        return groups
    
    def _execute_adaptive_batch(self, requests: List[Dict], allocation: Dict[str, int], 
                               prefix: str, token_budgets: List[TokenBudget]) -> List[Dict]:
        """적응형 배치 실행"""[1]
        results = []
        
        # 모델별 그룹화
        model_groups = {}
        budget_map = {b.request_id: b for b in token_budgets}
        
        for req in requests:
            model_type = req['model_type']
            if model_type not in model_groups:
                model_groups[model_type] = []
            model_groups[model_type].append(req)
        
        for model_type, model_requests in model_groups.items():
            config = self.model_configs[model_type]
            prompts = [req['prompt'] for req in model_requests]
            
            # 동적 토큰 할당 적용
            allocated_tokens = []
            for req in model_requests:
                request_id = req.get('request_id')
                budget = budget_map.get(request_id)
                if budget:
                    tokens = allocation.get(budget.request_id, config.base_tokens)
                else:
                    tokens = config.base_tokens
                allocated_tokens.append(tokens)
            
            # 평균 할당량으로 샘플링 파라미터 설정
            avg_allocation = int(np.mean(allocated_tokens))
            
            # 동적 샘플링 파라미터
            sampling_params = SamplingParams(
                temperature=config.temperature,
                max_tokens=avg_allocation,
                top_p=config.top_p,
                stop=None
            )
            
            # LoRA 요청 생성
            lora_request = LoRARequest(
                lora_name=config.name,
                lora_int_id=config.lora_id,
                lora_path=config.adapter_path
            )
            
            start_time = time.time()
            
            try:
                # vLLM 배치 추론 실행
                outputs = self.llm.generate(
                    prompts,
                    sampling_params,
                    lora_request=lora_request
                )
                
                processing_time = time.time() - start_time
                
                for req, output, allocated in zip(model_requests, outputs, allocated_tokens):
                    result = {
                        'request_id': req.get('request_id'),
                        'model_type': model_type,
                        'generated_text': output.outputs.text,
                        'input_tokens': len(output.prompt_token_ids),
                        'output_tokens': len(output.outputs.token_ids),
                        'allocated_tokens': allocated,
                        'tokens_per_second': len(output.outputs.token_ids) / processing_time,
                        'processing_time': processing_time / len(model_requests),
                        'finish_reason': output.outputs.finish_reason,
                        'prefix_cached': prefix in self.prefix_cache,
                        'auto_routed': True
                    }
                    results.append(result)
                    
                    # 통계 업데이트
                    self.allocator.update_usage_stats(
                        model_type,
                        len(output.outputs.token_ids),
                        processing_time / len(model_requests),
                        True
                    )
                    
                    self.router.update_performance_metrics(
                        model_type,
                        True,
                        processing_time / len(model_requests)
                    )
                
                # 프리픽스 캐시 업데이트
                self.prefix_cache[prefix] = time.time()
                
            except Exception as e:
                logger.error(f"배치 처리 실패 ({model_type}): {e}")
                for req in model_requests:
                    self.allocator.update_usage_stats(model_type, 0, 0, False)
                    self.router.update_performance_metrics(model_type, False, 0)
        
        return results
    
    def get_system_status(self) -> Dict[str, Any]:
        """시스템 상태 조회"""
        return {
            'router_performance': self.router.performance_metrics,
            'allocator_stats': self.allocator.usage_stats,
            'prefix_cache_size': len(self.prefix_cache),
            'total_token_budget': self.allocator.total_token_budget
        }

# 기존 클래스들과 통합
class EnhancedAsyncMultiLoRAServer:
    """향상된 비동기 다중 LoRA 서버"""
    
    def __init__(self, engine: EnhancedVLLMMultiLoRAEngine):
        self.engine = engine
        self.request_queue = asyncio.Queue()
        self.response_futures = {}
        
    async def add_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """비동기 요청 추가"""
        request_id = request.get('request_id', f"{time.time()}_{hash(request['prompt']) % 1000}")
        request['request_id'] = request_id
        future = asyncio.Future()
        
        self.response_futures[request_id] = future
        await self.request_queue.put(request)
        
        return await future
    
    async def process_adaptive_batch(self, batch_size: int = 4):
        """적응형 배치 처리"""[5]
        while True:
            batch_requests = []
            
            # 배치 크기만큼 요청 수집
            for _ in range(batch_size):
                try:
                    request = await asyncio.wait_for(
                        self.request_queue.get(), 
                        timeout=1.0
                    )
                    batch_requests.append(request)
                except asyncio.TimeoutError:
                    break
            
            if not batch_requests:
                await asyncio.sleep(0.1)
                continue
            
            # 적응형 배치 처리 실행
            try:
                results = self.engine.generate_adaptive_responses(batch_requests)
                
                # 결과를 해당 Future에 설정
                for result in results:
                    request_id = result['request_id']
                    if request_id in self.response_futures:
                        self.response_futures[request_id].set_result(result)
                        del self.response_futures[request_id]
                        
            except Exception as e:
                # 오류 발생 시 모든 Future에 예외 설정
                for request in batch_requests:
                    request_id = request.get('request_id')
                    if request_id in self.response_futures:
                        self.response_futures[request_id].set_exception(e)
                        del self.response_futures[request_id]

class EnhancedParallelPerformanceTester:
    """향상된 병렬 성능 테스터"""
    
    def __init__(self, engine: EnhancedVLLMMultiLoRAEngine):
        self.engine = engine
        
    def run_adaptive_benchmark(self, test_cases: Dict[str, str], 
                              concurrent_requests: int = 8, 
                              iterations: int = 3) -> Dict[str, Any]:
        """적응형 벤치마크 실행"""[5]
        logger.info(f"🧪 적응형 벤치마크 시작 ({concurrent_requests}개 동시 요청)")
        
        all_results = []
        
        for iteration in range(iterations):
            logger.info(f"반복 {iteration + 1}/{iterations}")
            
            # 다양한 요청 생성 (자동 라우팅 테스트)
            requests = []
            for i in range(concurrent_requests):
                # 모델 타입을 명시하지 않고 자동 라우팅 테스트
                test_prompts = list(test_cases.values())
                prompt = test_prompts[i % len(test_prompts)]
                
                requests.append({
                    'request_id': f"adaptive_{iteration}_{i}",
                    'prompt': prompt
                    # model_type 생략 - 자동 라우팅
                })
            
            # 적응형 병렬 실행
            start_time = time.time()
            results = self.engine.generate_adaptive_responses(requests)
            total_time = time.time() - start_time
            
            # 통계 수집
            for result in results:
                result['iteration'] = iteration + 1
                result['total_batch_time'] = total_time
                all_results.append(result)
        
        return self._analyze_adaptive_results(all_results, concurrent_requests)
    
    def _analyze_adaptive_results(self, results: List[Dict], concurrent_requests: int) -> Dict[str, Any]:
        """적응형 결과 분석"""
        import pandas as pd
        
        df = pd.DataFrame(results)
        
        analysis = {
            'total_requests': len(results),
            'concurrent_requests': concurrent_requests,
            'auto_routing_accuracy': df['auto_routed'].sum() / len(df),
            'model_performance': {},
            'token_allocation_efficiency': {},
            'overall_performance': {
                'avg_tokens_per_second': df['tokens_per_second'].mean(),
                'avg_batch_time': df['total_batch_time'].mean(),
                'requests_per_second': len(results) / df['total_batch_time'].sum(),
                'avg_allocated_tokens': df['allocated_tokens'].mean(),
                'token_utilization_rate': df['output_tokens'].sum() / df['allocated_tokens'].sum()
            }
        }
        
        # 모델별 성능 분석
        for model_type in df['model_type'].unique():
            model_data = df[df['model_type'] == model_type]
            analysis['model_performance'][model_type] = {
                'avg_tokens_per_second': model_data['tokens_per_second'].mean(),
                'avg_output_tokens': model_data['output_tokens'].mean(),
                'avg_allocated_tokens': model_data['allocated_tokens'].mean(),
                'request_count': len(model_data),
                'token_efficiency': model_data['output_tokens'].sum() / model_data['allocated_tokens'].sum()
            }
        
        return analysis

def create_enhanced_model_configs() -> Dict[str, DynamicModelConfig]:
    """향상된 4차 모델 설정 생성"""
    return {
        "autocomplete": DynamicModelConfig(
            name="autocomplete",
            adapter_path="/home/ubuntu/deepseek-coder/models/autocomplete-finetuned",
            lora_id=1,
            base_tokens=64,
            max_tokens=256,
            temperature=0.3,
            top_p=0.9,
            priority_weight=1.5,
            usage_history=[]
        ),
        "prompt": DynamicModelConfig(
            name="prompt",
            adapter_path="/home/ubuntu/deepseek-coder/models/prompt-finetuned",
            lora_id=2,
            base_tokens=128,
            max_tokens=512,
            temperature=0.6,
            top_p=0.95,
            priority_weight=1.0,
            usage_history=[]
        ),
        "comment": DynamicModelConfig(
            name="comment",
            adapter_path="/home/ubuntu/deepseek-coder/models/comment-finetuned",
            lora_id=3,
            base_tokens=96,
            max_tokens=384,
            temperature=0.4,
            top_p=0.9,
            priority_weight=0.8,
            usage_history=[]
        ),
        "error_fix": DynamicModelConfig(
            name="error_fix",
            adapter_path="/home/ubuntu/deepseek-coder/models/error-fix-finetuned",
            lora_id=4,
            base_tokens=128,
            max_tokens=512,
            temperature=0.5,
            top_p=0.9,
            priority_weight=1.2,
            usage_history=[]
        )
    }

# 메인 실행 코드
if __name__ == "__main__":
    # 설정
    base_model_path = "/home/ubuntu/deepseek-coder/models/base-model"
    model_configs = create_enhanced_model_configs()
    
    # 향상된 엔진 초기화
    engine = EnhancedVLLMMultiLoRAEngine(base_model_path, model_configs)
    engine.initialize_engine()
    
    # 테스트 케이스 (모델 타입 명시하지 않음 - 자동 라우팅 테스트)
    test_cases = {
        "auto_complete_test": "def fibonacci(n):",
        "prompt_test": "Write a Python function to calculate factorial",
        "comment_test": "# Sort a list using quicksort algorithm",
        "error_fix_test": "Fix this code: def hello(\n    print('Hello')"
    }
    
    # 적응형 성능 테스트
    tester = EnhancedParallelPerformanceTester(engine)
    results = tester.run_adaptive_benchmark(test_cases, concurrent_requests=8)
    
    print("\n📊 적응형 병렬 성능 분석 결과:")
    print(json.dumps(results, indent=2, ensure_ascii=False))
    
    # 시스템 상태 출력
    print("\n🔍 시스템 상태:")
    status = engine.get_system_status()
    print(json.dumps(status, indent=2, ensure_ascii=False))

초기에 작업된 코드라 현재 진행 상황에서는 많이 변경됐음.

0개의 댓글