import asyncio
import torch
import time
import logging
from typing import Dict, List, Optional, Any
from vllm import LLM, SamplingParams
from vllm.lora.request import LoRARequest
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
from dataclasses import dataclass
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ModelConfig:
name: str
adapter_path: str
lora_id: int
max_tokens: int
temperature: float
top_p: float
class VLLMMultiLoRAEngine:
"""vLLM 기반 4차 모델 병렬 추론 엔진"""
def __init__(self, base_model_path: str, model_configs: Dict[str, ModelConfig]):
self.base_model_path = base_model_path
self.model_configs = model_configs
self.llm = None
self.performance_stats = {}
def initialize_engine(self):
"""vLLM 엔진 초기화 - T4 최적화 설정"""[11]
logger.info("🚀 vLLM 다중 LoRA 엔진 초기화 중...")
try:
# T4 환경 최적화 설정
self.llm = LLM(
model=self.base_model_path,
enable_lora=True,
max_loras=4, # 4개 어댑터 동시 지원
max_lora_rank=32, # LoRA rank 설정
max_cpu_loras=8, # CPU 캐시 어댑터 수
dtype="half", # T4에서 bfloat16 대신 half 사용
max_model_len=2048, # T4 메모리 제약 고려
gpu_memory_utilization=0.9, # GPU 메모리 90% 활용
tensor_parallel_size=1, # 단일 GPU
trust_remote_code=True
)
logger.info("✅ vLLM 엔진 초기화 완료")
except Exception as e:
logger.error(f"❌ vLLM 엔진 초기화 실패: {e}")
raise
def generate_parallel_responses(self, requests: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""병렬 추론 실행"""[11][13]
logger.info(f"🔄 {len(requests)}개 요청 병렬 처리 시작...")
# 요청별 LoRA 설정 준비
prompts = []
sampling_params_list = []
lora_requests = []
for req in requests:
model_type = req['model_type']
config = self.model_configs[model_type]
prompts.append(req['prompt'])
# 모델별 샘플링 파라미터
sampling_params = SamplingParams(
temperature=req.get('temperature', config.temperature),
max_tokens=req.get('max_tokens', config.max_tokens),
top_p=req.get('top_p', config.top_p),
stop=req.get('stop', None)
)
sampling_params_list.append(sampling_params)
# LoRA 요청 생성
lora_request = LoRARequest(
lora_name=config.name,
lora_int_id=config.lora_id,
lora_path=config.adapter_path
)
lora_requests.append(lora_request)
# 병렬 추론 실행
start_time = time.time()
try:
# vLLM의 배치 추론으로 모든 요청을 동시 처리
outputs = self.llm.generate(
prompts,
sampling_params_list[0], # 기본 샘플링 파라미터
lora_request=lora_requests[0] if len(set(req['model_type'] for req in requests)) == 1 else None
)
total_time = time.time() - start_time
# 결과 정리
results = []
for i, (req, output) in enumerate(zip(requests, outputs)):
result = {
'request_id': req.get('request_id', i),
'model_type': req['model_type'],
'generated_text': output.outputs[0].text,
'input_tokens': len(output.prompt_token_ids),
'output_tokens': len(output.outputs[0].token_ids),
'tokens_per_second': len(output.outputs[0].token_ids) / total_time,
'finish_reason': output.outputs[0].finish_reason
}
results.append(result)
logger.info(f"✅ 병렬 처리 완료 ({total_time:.2f}초)")
return results
except Exception as e:
logger.error(f"❌ 병렬 추론 실패: {e}")
raise
def generate_mixed_batch_responses(self, requests: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""서로 다른 LoRA 어댑터를 사용하는 혼합 배치 처리"""[1][13]
logger.info(f"🔄 혼합 배치 처리 시작 ({len(requests)}개 요청)...")
results = []
# 모델 타입별로 요청 그룹화
grouped_requests = {}
for req in requests:
model_type = req['model_type']
if model_type not in grouped_requests:
grouped_requests[model_type] = []
grouped_requests[model_type].append(req)
# 각 모델 타입별로 배치 처리
for model_type, model_requests in grouped_requests.items():
config = self.model_configs[model_type]
prompts = [req['prompt'] for req in model_requests]
# 모델별 샘플링 파라미터
sampling_params = SamplingParams(
temperature=config.temperature,
max_tokens=config.max_tokens,
top_p=config.top_p
)
# LoRA 요청 생성
lora_request = LoRARequest(
lora_name=config.name,
lora_int_id=config.lora_id,
lora_path=config.adapter_path
)
start_time = time.time()
# 배치 추론 실행
outputs = self.llm.generate(
prompts,
sampling_params,
lora_request=lora_request
)
batch_time = time.time() - start_time
# 결과 정리
for req, output in zip(model_requests, outputs):
result = {
'request_id': req.get('request_id'),
'model_type': model_type,
'generated_text': output.outputs[0].text,
'input_tokens': len(output.prompt_token_ids),
'output_tokens': len(output.outputs[0].token_ids),
'tokens_per_second': len(output.outputs[0].token_ids) / batch_time,
'batch_time': batch_time,
'finish_reason': output.outputs[0].finish_reason
}
results.append(result)
return results
class AsyncMultiLoRAServer:
"""비동기 다중 LoRA 서버"""
def __init__(self, engine: VLLMMultiLoRAEngine):
self.engine = engine
self.request_queue = asyncio.Queue()
self.response_futures = {}
async def add_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""비동기 요청 추가"""
request_id = request.get('request_id', time.time())
future = asyncio.Future()
self.response_futures[request_id] = future
await self.request_queue.put(request)
return await future
async def process_requests_batch(self, batch_size: int = 4):
"""배치 단위로 요청 처리"""[5]
while True:
batch_requests = []
# 배치 크기만큼 요청 수집
for _ in range(batch_size):
try:
request = await asyncio.wait_for(
self.request_queue.get(),
timeout=1.0
)
batch_requests.append(request)
except asyncio.TimeoutError:
break
if not batch_requests:
await asyncio.sleep(0.1)
continue
# 배치 처리 실행
try:
results = self.engine.generate_mixed_batch_responses(batch_requests)
# 결과를 해당 Future에 설정
for result in results:
request_id = result['request_id']
if request_id in self.response_futures:
self.response_futures[request_id].set_result(result)
del self.response_futures[request_id]
except Exception as e:
# 오류 발생 시 모든 Future에 예외 설정
for request in batch_requests:
request_id = request.get('request_id')
if request_id in self.response_futures:
self.response_futures[request_id].set_exception(e)
del self.response_futures[request_id]
class ParallelPerformanceTester:
"""병렬 성능 테스터"""
def __init__(self, engine: VLLMMultiLoRAEngine):
self.engine = engine
def run_concurrent_benchmark(self, test_cases: Dict[str, str],
concurrent_requests: int = 8,
iterations: int = 3) -> Dict[str, Any]:
"""동시 요청 성능 벤치마크"""[4][7]
logger.info(f"🧪 동시 요청 벤치마크 시작 ({concurrent_requests}개 동시 요청)")
all_results = []
for iteration in range(iterations):
logger.info(f"반복 {iteration + 1}/{iterations}")
# 동시 요청 생성
requests = []
for i in range(concurrent_requests):
model_type = list(test_cases.keys())[i % len(test_cases)]
prompt = test_cases[model_type]
requests.append({
'request_id': f"{iteration}_{i}",
'model_type': model_type,
'prompt': prompt
})
# 병렬 실행
start_time = time.time()
results = self.engine.generate_mixed_batch_responses(requests)
total_time = time.time() - start_time
# 통계 수집
for result in results:
result['iteration'] = iteration + 1
result['total_batch_time'] = total_time
all_results.append(result)
return self._analyze_results(all_results, concurrent_requests)
def _analyze_results(self, results: List[Dict], concurrent_requests: int) -> Dict[str, Any]:
"""결과 분석"""
import pandas as pd
df = pd.DataFrame(results)
analysis = {
'total_requests': len(results),
'concurrent_requests': concurrent_requests,
'model_performance': {},
'overall_performance': {
'avg_tokens_per_second': df['tokens_per_second'].mean(),
'avg_batch_time': df['total_batch_time'].mean(),
'requests_per_second': len(results) / df['total_batch_time'].sum(),
'max_batch_time': df['total_batch_time'].max(),
'min_batch_time': df['total_batch_time'].min()
}
}
# 모델별 성능 분석
for model_type in df['model_type'].unique():
model_data = df[df['model_type'] == model_type]
analysis['model_performance'][model_type] = {
'avg_tokens_per_second': model_data['tokens_per_second'].mean(),
'avg_output_tokens': model_data['output_tokens'].mean(),
'request_count': len(model_data)
}
return analysis
def create_model_configs() -> Dict[str, ModelConfig]:
"""4차 모델 설정 생성"""
return {
"autocomplete": ModelConfig(
name="autocomplete",
adapter_path="/home/ubuntu/deepseek-coder/models/autocomplete-finetuned",
lora_id=1,
max_tokens=128,
temperature=0.3,
top_p=0.9
),
"prompt": ModelConfig(
name="prompt",
adapter_path="/home/ubuntu/deepseek-coder/models/prompt-finetuned",
lora_id=2,
max_tokens=512,
temperature=0.6,
top_p=0.95
),
"comment": ModelConfig(
name="comment",
adapter_path="/home/ubuntu/deepseek-coder/models/comment-finetuned",
lora_id=3,
max_tokens=256,
temperature=0.4,
top_p=0.9
),
"error_fix": ModelConfig(
name="error_fix",
adapter_path="/home/ubuntu/deepseek-coder/models/error-fix-finetuned",
lora_id=4,
max_tokens=512,
temperature=0.5,
top_p=0.9
)
}
# 메인 실행 코드
if __name__ == "__main__":
# 설정
base_model_path = "/home/ubuntu/deepseek-coder/models/base-model"
model_configs = create_model_configs()
# 엔진 초기화
engine = VLLMMultiLoRAEngine(base_model_path, model_configs)
engine.initialize_engine()
# 테스트 케이스
test_cases = {
"autocomplete": "def fibonacci(n):",
"prompt": "Write a Python function to calculate factorial",
"comment": "# Sort a list using quicksort algorithm",
"error_fix": "Fix this code: def hello(\n print('Hello')"
}
# 성능 테스트
tester = ParallelPerformanceTester(engine)
results = tester.run_concurrent_benchmark(test_cases, concurrent_requests=8)
print("\n📊 병렬 성능 분석 결과:")
print(json.dumps(results, indent=2, ensure_ascii=False))
import asyncio
import torch
import time
import logging
from typing import Dict, List, Optional, Any
from vllm import LLM, SamplingParams
from vllm.lora.request import LoRARequest
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import numpy as np
from dataclasses import dataclass
import re
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class DynamicModelConfig:
name: str
adapter_path: str
lora_id: int
base_tokens: int
max_tokens: int
temperature: float
top_p: float
priority_weight: float
usage_history: List[float]
@dataclass
class TokenBudget:
request_id: str
model_type: str
estimated_tokens: int
allocated_tokens: int
priority_score: float
complexity_level: str
class IntelligentPromptRouter:
"""지능형 프롬프트 라우터 - AWS 다중 LLM 라우팅 방식 적용"""
def __init__(self):
self.routing_patterns = {
'autocomplete': [
r'def\s+\w+\(',
r'class\s+\w+',
r'for\s+\w+\s+in',
r'if\s+\w+.*:$',
r'import\s+\w+',
r'from\s+\w+\s+import'
],
'prompt': [
r'write\s+a\s+function',
r'create\s+a\s+class',
r'implement\s+.*algorithm',
r'generate\s+code',
r'build\s+a\s+program',
r'develop\s+.*'
],
'comment': [
r'^#\s+.*',
r'""".*"""',
r"'''.*'''",
r'//\s+.*',
r'/\*.*\*/'
],
'error_fix': [
r'fix\s+this\s+code',
r'error\s+in\s+.*',
r'debug\s+.*',
r'syntax\s+error',
r'correct\s+.*',
r'repair\s+.*'
]
}
self.routing_history = {}
self.performance_metrics = {}
def classify_prompt(self, prompt: str) -> str:
"""프롬프트 분류 및 최적 모델 선택"""[7]
prompt_lower = prompt.lower().strip()
scores = {}
# 패턴 매칭 점수 계산
for model_type, patterns in self.routing_patterns.items():
score = 0
for pattern in patterns:
if re.search(pattern, prompt_lower, re.IGNORECASE | re.MULTILINE):
score += 1
scores[model_type] = score
# 성능 히스토리 반영
for model_type in scores:
if model_type in self.performance_metrics:
performance_weight = self.performance_metrics[model_type].get('success_rate', 0.5)
scores[model_type] *= (1 + performance_weight * 0.3)
# 가장 높은 점수의 모델 선택
if max(scores.values()) > 0:
return max(scores, key=scores.get)
# 기본값: 프롬프트 길이 기반 선택
if len(prompt) < 50:
return 'autocomplete'
else:
return 'prompt'
def update_performance_metrics(self, model_type: str, success: bool, response_time: float):
"""성능 메트릭 업데이트"""
if model_type not in self.performance_metrics:
self.performance_metrics[model_type] = {
'total_requests': 0,
'success_count': 0,
'avg_response_time': 0
}
metrics = self.performance_metrics[model_type]
metrics['total_requests'] += 1
if success:
metrics['success_count'] += 1
metrics['success_rate'] = metrics['success_count'] / metrics['total_requests']
metrics['avg_response_time'] = (metrics['avg_response_time'] * (metrics['total_requests'] - 1) +
response_time) / metrics['total_requests']
class DynamicTokenAllocator:
"""사용 패턴 기반 동적 토큰 할당기"""
def __init__(self, total_token_budget: int = 2048):
self.total_token_budget = total_token_budget
self.usage_stats = {}
self.allocation_history = []
def estimate_token_budget(self, prompt: str, model_type: str) -> TokenBudget:
"""SelfBudgeter 방식의 토큰 버젯 추정"""[2]
# 프롬프트 복잡도 분석
complexity_indicators = {
'length': len(prompt.split()),
'code_blocks': prompt.count('```'),
'functions': prompt.count('def '),
'classes': prompt.count('class '),
'loops': prompt.count('for ') + prompt.count('while '),
'conditionals': prompt.count('if ') + prompt.count('elif ')
}
# 복잡도 레벨 결정
complexity_score = sum(complexity_indicators.values())
if complexity_score < 10:
complexity_level = "simple"
base_tokens = 64
elif complexity_score < 30:
complexity_level = "medium"
base_tokens = 128
else:
complexity_level = "complex"
base_tokens = 256
# 모델별 조정
model_multipliers = {
"autocomplete": 0.5,
"prompt": 1.0,
"comment": 0.7,
"error_fix": 0.8
}
estimated_tokens = int(base_tokens * model_multipliers.get(model_type, 1.0))
# 우선순위 점수 계산
priority_score = self._calculate_priority(model_type, complexity_level)
return TokenBudget(
request_id=f"{time.time()}_{model_type}",
model_type=model_type,
estimated_tokens=estimated_tokens,
allocated_tokens=estimated_tokens,
priority_score=priority_score,
complexity_level=complexity_level
)
def _calculate_priority(self, model_type: str, complexity_level: str) -> float:
"""우선순위 점수 계산"""
base_priorities = {
"autocomplete": 1.5, # 실시간 응답 필요
"prompt": 1.0,
"comment": 0.8,
"error_fix": 1.2
}
complexity_weights = {
"simple": 1.2, # 간단한 작업 우선
"medium": 1.0,
"complex": 0.8
}
# 최근 사용 통계 반영
usage_weight = 1.0
if model_type in self.usage_stats:
recent_usage = self.usage_stats[model_type].get('recent_usage', [])
if recent_usage:
avg_recent_usage = np.mean(recent_usage[-5:]) # 최근 5개 평균
usage_weight = 1.0 + (avg_recent_usage / 100.0) # 사용량에 비례
return (base_priorities.get(model_type, 1.0) *
complexity_weights.get(complexity_level, 1.0) *
usage_weight)
def allocate_dynamic_budget(self, requests: List[TokenBudget]) -> Dict[str, int]:
"""동적 토큰 버젯 할당"""[1]
# 우선순위 기반 정렬
sorted_requests = sorted(requests, key=lambda x: x.priority_score, reverse=True)
allocation = {}
remaining_budget = self.total_token_budget
for request in sorted_requests:
# 최소 보장 토큰
min_tokens = max(32, request.estimated_tokens // 2)
# 사용 가능한 토큰 계산
available_tokens = min(request.estimated_tokens, remaining_budget)
allocated = max(min_tokens, available_tokens)
allocation[request.request_id] = allocated
remaining_budget -= allocated
if remaining_budget <= 0:
break
return allocation
def update_usage_stats(self, model_type: str, tokens_used: int,
response_time: float, success: bool):
"""모델별 사용 통계 업데이트"""
if model_type not in self.usage_stats:
self.usage_stats[model_type] = {
'total_requests': 0,
'avg_tokens': 0,
'avg_response_time': 0,
'success_rate': 0,
'recent_usage': []
}
stats = self.usage_stats[model_type]
stats['total_requests'] += 1
stats['avg_tokens'] = (stats['avg_tokens'] * (stats['total_requests'] - 1) +
tokens_used) / stats['total_requests']
stats['avg_response_time'] = (stats['avg_response_time'] * (stats['total_requests'] - 1) +
response_time) / stats['total_requests']
stats['success_rate'] = (stats['success_rate'] * (stats['total_requests'] - 1) +
(1 if success else 0)) / stats['total_requests']
# 최근 사용량 추적 (최대 10개)
stats['recent_usage'].append(tokens_used)
if len(stats['recent_usage']) > 10:
stats['recent_usage'].pop(0)
class EnhancedVLLMMultiLoRAEngine:
"""동적 토큰 할당과 지능형 라우팅이 통합된 vLLM 기반 4차 모델 병렬 추론 엔진"""
def __init__(self, base_model_path: str, model_configs: Dict[str, DynamicModelConfig]):
self.base_model_path = base_model_path
self.model_configs = model_configs
self.llm = None
self.performance_stats = {}
# 동적 컴포넌트 초기화
self.router = IntelligentPromptRouter()
self.allocator = DynamicTokenAllocator()
self.prefix_cache = {}
def initialize_engine(self):
"""vLLM 엔진 초기화 - T4 최적화 설정"""[2]
logger.info("🚀 Enhanced vLLM 다중 LoRA 엔진 초기화 중...")
try:
# T4 환경 최적화 설정
self.llm = LLM(
model=self.base_model_path,
enable_lora=True,
max_loras=4, # 4개 어댑터 동시 지원
max_lora_rank=32, # LoRA rank 설정
max_cpu_loras=8, # CPU 캐시 어댑터 수
dtype="half", # T4에서 bfloat16 대신 half 사용
max_model_len=2048, # T4 메모리 제약 고려
gpu_memory_utilization=0.9, # GPU 메모리 90% 활용
tensor_parallel_size=1, # 단일 GPU
trust_remote_code=True,
# 동적 배칭 설정
enable_chunked_prefill=True,
max_num_batched_tokens=512, # 동적 조정 가능
enable_prefix_caching=True
)
logger.info("✅ Enhanced vLLM 엔진 초기화 완료")
except Exception as e:
logger.error(f"❌ vLLM 엔진 초기화 실패: {e}")
raise
def generate_adaptive_responses(self, requests: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""적응형 응답 생성 - 자동 라우팅 + 동적 토큰 할당"""[3][4]
logger.info(f"🔄 {len(requests)}개 요청 적응형 처리 시작...")
# 1. 자동 라우팅 적용
for req in requests:
if 'model_type' not in req:
req['model_type'] = self.router.classify_prompt(req['prompt'])
# 2. 토큰 버젯 추정
token_budgets = []
for req in requests:
budget = self.allocator.estimate_token_budget(req['prompt'], req['model_type'])
budget.request_id = req.get('request_id', budget.request_id)
token_budgets.append(budget)
# 3. 동적 토큰 할당
allocation = self.allocator.allocate_dynamic_budget(token_budgets)
# 4. 프리픽스 기반 그룹화
prefix_groups = self._group_by_prefix(requests)
results = []
for prefix, group_requests in prefix_groups.items():
batch_results = self._execute_adaptive_batch(
group_requests, allocation, prefix, token_budgets
)
results.extend(batch_results)
return results
def _group_by_prefix(self, requests: List[Dict]) -> Dict[str, List[Dict]]:
"""공통 프리픽스 기반 요청 그룹화"""[3]
groups = {}
for req in requests:
# 간단한 프리픽스 추출
prompt = req['prompt']
prefix = prompt[:min(50, len(prompt))]
if prefix not in groups:
groups[prefix] = []
groups[prefix].append(req)
return groups
def _execute_adaptive_batch(self, requests: List[Dict], allocation: Dict[str, int],
prefix: str, token_budgets: List[TokenBudget]) -> List[Dict]:
"""적응형 배치 실행"""[1]
results = []
# 모델별 그룹화
model_groups = {}
budget_map = {b.request_id: b for b in token_budgets}
for req in requests:
model_type = req['model_type']
if model_type not in model_groups:
model_groups[model_type] = []
model_groups[model_type].append(req)
for model_type, model_requests in model_groups.items():
config = self.model_configs[model_type]
prompts = [req['prompt'] for req in model_requests]
# 동적 토큰 할당 적용
allocated_tokens = []
for req in model_requests:
request_id = req.get('request_id')
budget = budget_map.get(request_id)
if budget:
tokens = allocation.get(budget.request_id, config.base_tokens)
else:
tokens = config.base_tokens
allocated_tokens.append(tokens)
# 평균 할당량으로 샘플링 파라미터 설정
avg_allocation = int(np.mean(allocated_tokens))
# 동적 샘플링 파라미터
sampling_params = SamplingParams(
temperature=config.temperature,
max_tokens=avg_allocation,
top_p=config.top_p,
stop=None
)
# LoRA 요청 생성
lora_request = LoRARequest(
lora_name=config.name,
lora_int_id=config.lora_id,
lora_path=config.adapter_path
)
start_time = time.time()
try:
# vLLM 배치 추론 실행
outputs = self.llm.generate(
prompts,
sampling_params,
lora_request=lora_request
)
processing_time = time.time() - start_time
for req, output, allocated in zip(model_requests, outputs, allocated_tokens):
result = {
'request_id': req.get('request_id'),
'model_type': model_type,
'generated_text': output.outputs.text,
'input_tokens': len(output.prompt_token_ids),
'output_tokens': len(output.outputs.token_ids),
'allocated_tokens': allocated,
'tokens_per_second': len(output.outputs.token_ids) / processing_time,
'processing_time': processing_time / len(model_requests),
'finish_reason': output.outputs.finish_reason,
'prefix_cached': prefix in self.prefix_cache,
'auto_routed': True
}
results.append(result)
# 통계 업데이트
self.allocator.update_usage_stats(
model_type,
len(output.outputs.token_ids),
processing_time / len(model_requests),
True
)
self.router.update_performance_metrics(
model_type,
True,
processing_time / len(model_requests)
)
# 프리픽스 캐시 업데이트
self.prefix_cache[prefix] = time.time()
except Exception as e:
logger.error(f"배치 처리 실패 ({model_type}): {e}")
for req in model_requests:
self.allocator.update_usage_stats(model_type, 0, 0, False)
self.router.update_performance_metrics(model_type, False, 0)
return results
def get_system_status(self) -> Dict[str, Any]:
"""시스템 상태 조회"""
return {
'router_performance': self.router.performance_metrics,
'allocator_stats': self.allocator.usage_stats,
'prefix_cache_size': len(self.prefix_cache),
'total_token_budget': self.allocator.total_token_budget
}
# 기존 클래스들과 통합
class EnhancedAsyncMultiLoRAServer:
"""향상된 비동기 다중 LoRA 서버"""
def __init__(self, engine: EnhancedVLLMMultiLoRAEngine):
self.engine = engine
self.request_queue = asyncio.Queue()
self.response_futures = {}
async def add_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""비동기 요청 추가"""
request_id = request.get('request_id', f"{time.time()}_{hash(request['prompt']) % 1000}")
request['request_id'] = request_id
future = asyncio.Future()
self.response_futures[request_id] = future
await self.request_queue.put(request)
return await future
async def process_adaptive_batch(self, batch_size: int = 4):
"""적응형 배치 처리"""[5]
while True:
batch_requests = []
# 배치 크기만큼 요청 수집
for _ in range(batch_size):
try:
request = await asyncio.wait_for(
self.request_queue.get(),
timeout=1.0
)
batch_requests.append(request)
except asyncio.TimeoutError:
break
if not batch_requests:
await asyncio.sleep(0.1)
continue
# 적응형 배치 처리 실행
try:
results = self.engine.generate_adaptive_responses(batch_requests)
# 결과를 해당 Future에 설정
for result in results:
request_id = result['request_id']
if request_id in self.response_futures:
self.response_futures[request_id].set_result(result)
del self.response_futures[request_id]
except Exception as e:
# 오류 발생 시 모든 Future에 예외 설정
for request in batch_requests:
request_id = request.get('request_id')
if request_id in self.response_futures:
self.response_futures[request_id].set_exception(e)
del self.response_futures[request_id]
class EnhancedParallelPerformanceTester:
"""향상된 병렬 성능 테스터"""
def __init__(self, engine: EnhancedVLLMMultiLoRAEngine):
self.engine = engine
def run_adaptive_benchmark(self, test_cases: Dict[str, str],
concurrent_requests: int = 8,
iterations: int = 3) -> Dict[str, Any]:
"""적응형 벤치마크 실행"""[5]
logger.info(f"🧪 적응형 벤치마크 시작 ({concurrent_requests}개 동시 요청)")
all_results = []
for iteration in range(iterations):
logger.info(f"반복 {iteration + 1}/{iterations}")
# 다양한 요청 생성 (자동 라우팅 테스트)
requests = []
for i in range(concurrent_requests):
# 모델 타입을 명시하지 않고 자동 라우팅 테스트
test_prompts = list(test_cases.values())
prompt = test_prompts[i % len(test_prompts)]
requests.append({
'request_id': f"adaptive_{iteration}_{i}",
'prompt': prompt
# model_type 생략 - 자동 라우팅
})
# 적응형 병렬 실행
start_time = time.time()
results = self.engine.generate_adaptive_responses(requests)
total_time = time.time() - start_time
# 통계 수집
for result in results:
result['iteration'] = iteration + 1
result['total_batch_time'] = total_time
all_results.append(result)
return self._analyze_adaptive_results(all_results, concurrent_requests)
def _analyze_adaptive_results(self, results: List[Dict], concurrent_requests: int) -> Dict[str, Any]:
"""적응형 결과 분석"""
import pandas as pd
df = pd.DataFrame(results)
analysis = {
'total_requests': len(results),
'concurrent_requests': concurrent_requests,
'auto_routing_accuracy': df['auto_routed'].sum() / len(df),
'model_performance': {},
'token_allocation_efficiency': {},
'overall_performance': {
'avg_tokens_per_second': df['tokens_per_second'].mean(),
'avg_batch_time': df['total_batch_time'].mean(),
'requests_per_second': len(results) / df['total_batch_time'].sum(),
'avg_allocated_tokens': df['allocated_tokens'].mean(),
'token_utilization_rate': df['output_tokens'].sum() / df['allocated_tokens'].sum()
}
}
# 모델별 성능 분석
for model_type in df['model_type'].unique():
model_data = df[df['model_type'] == model_type]
analysis['model_performance'][model_type] = {
'avg_tokens_per_second': model_data['tokens_per_second'].mean(),
'avg_output_tokens': model_data['output_tokens'].mean(),
'avg_allocated_tokens': model_data['allocated_tokens'].mean(),
'request_count': len(model_data),
'token_efficiency': model_data['output_tokens'].sum() / model_data['allocated_tokens'].sum()
}
return analysis
def create_enhanced_model_configs() -> Dict[str, DynamicModelConfig]:
"""향상된 4차 모델 설정 생성"""
return {
"autocomplete": DynamicModelConfig(
name="autocomplete",
adapter_path="/home/ubuntu/deepseek-coder/models/autocomplete-finetuned",
lora_id=1,
base_tokens=64,
max_tokens=256,
temperature=0.3,
top_p=0.9,
priority_weight=1.5,
usage_history=[]
),
"prompt": DynamicModelConfig(
name="prompt",
adapter_path="/home/ubuntu/deepseek-coder/models/prompt-finetuned",
lora_id=2,
base_tokens=128,
max_tokens=512,
temperature=0.6,
top_p=0.95,
priority_weight=1.0,
usage_history=[]
),
"comment": DynamicModelConfig(
name="comment",
adapter_path="/home/ubuntu/deepseek-coder/models/comment-finetuned",
lora_id=3,
base_tokens=96,
max_tokens=384,
temperature=0.4,
top_p=0.9,
priority_weight=0.8,
usage_history=[]
),
"error_fix": DynamicModelConfig(
name="error_fix",
adapter_path="/home/ubuntu/deepseek-coder/models/error-fix-finetuned",
lora_id=4,
base_tokens=128,
max_tokens=512,
temperature=0.5,
top_p=0.9,
priority_weight=1.2,
usage_history=[]
)
}
# 메인 실행 코드
if __name__ == "__main__":
# 설정
base_model_path = "/home/ubuntu/deepseek-coder/models/base-model"
model_configs = create_enhanced_model_configs()
# 향상된 엔진 초기화
engine = EnhancedVLLMMultiLoRAEngine(base_model_path, model_configs)
engine.initialize_engine()
# 테스트 케이스 (모델 타입 명시하지 않음 - 자동 라우팅 테스트)
test_cases = {
"auto_complete_test": "def fibonacci(n):",
"prompt_test": "Write a Python function to calculate factorial",
"comment_test": "# Sort a list using quicksort algorithm",
"error_fix_test": "Fix this code: def hello(\n print('Hello')"
}
# 적응형 성능 테스트
tester = EnhancedParallelPerformanceTester(engine)
results = tester.run_adaptive_benchmark(test_cases, concurrent_requests=8)
print("\n📊 적응형 병렬 성능 분석 결과:")
print(json.dumps(results, indent=2, ensure_ascii=False))
# 시스템 상태 출력
print("\n🔍 시스템 상태:")
status = engine.get_system_status()
print(json.dumps(status, indent=2, ensure_ascii=False))
초기에 작업된 코드라 현재 진행 상황에서는 많이 변경됐음.