이제 막 프론트엔드+백엔드+모델 서빙하는 것을 끝냈는데 쉴 새 없이 또 새로운 개념을 익혀야 한다... 정말 휘몰아치는구나... 하루에 18시간씩 공부해도 진도를 따라갈 수가 없다 ㅠㅠ
학습시간 09:00~03:00(당일18H/누적2114H)
A. 서빙 환경
B. 서빙 퍼포먼스
A. 자기회귀(Autoregressive) 모델의 특징
B. KV 캐시 (Key-Value Cache)
C. KV 캐시의 한계

A. PagedAttention의 등장
B. 작동 원리

# vLLM을 사용한 간단한 텍스트 생성 예시
from vllm import LLM, SamplingParams
# 모델 로드
llm = LLM(model="facebook/opt-125m")
# 생성할 프롬프트
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
]
# 샘플링 파라미터 설정
sampling_params = SamplingParams(temperature=0.8, top_p=0.95, max_tokens=50)
# 텍스트 생성 실행
responses = llm.generate(prompts, sampling_params)
# 결과 출력
for response in responses:
prompt = response.prompt
generated_text = response.outputs[0].text
print(f"Prompt: {prompt!r}, Generated text: {generated_text!r}")
| 비교 대상 | 특징 | vLLM의 장점 |
|---|---|---|
| HF Transformers | 가장 기본적인 모델 로딩 및 추론 라이브러리 | PagedAttention 부재로 메모리 효율 낮고 처리량 저조 |
| TGI | HF에서 만든 서빙 전용 프레임워크 | vLLM 대비 여전히 낮은 처리량과 메모리 효율 |
| vLLM | PagedAttention 기반 LLM 서빙 최적화 | 압도적인 처리량과 메모리 효율성 |
| 기능 | 설명 |
|---|---|
| InferenceService | KServe의 핵심 구성 요소로, 모델 배포의 모든 설정을 정의 |
| Predictor | 실제 모델 추론을 담당하는 부분 |
| Transformer | 모델 입출력 데이터의 전/후처리를 담당하는 선택적 요소 |
| Knative | 서버리스 기능을 제공하며, 트래픽에 따라 Pod 수를 자동으로 조절 (Auto-scaling) |
A. LLM 엔진 로딩
vllm 라이브러리에서 LLM 클래스를 가져와서 사용할 모델과 여러 옵션을 지정하여 객체를 생성model: 허깅페이스에 등록된 모델 이름을 그대로 사용max_model_len: 모델이 처리할 수 있는 최대 토큰 길이 (입력+출력)gpu_memory_utilization: 전체 GPU 메모리 중 vLLM이 KV 캐시 등으로 사용할 비율을 지정 (0.0 ~ 1.0)from vllm import LLM
# vLLM 엔진을 초기화하면서 사용할 모델과 옵션을 설정
llm = LLM(
model="facebook/opt-125m",
max_model_len=512,
gpu_memory_utilization=0.5 # GPU 메모리의 50%를 사용하도록 설정
)
A. 샘플링 파라미터 설정
SamplingParams 클래스를 이용해 텍스트를 어떻게 생성할지 상세하게 조절temperature: 값이 높을수록 모델이 더 창의적이고 무작위적인 텍스트를 생성 (보통 0.7 ~ 1.0)top_p: 확률이 높은 순서대로 토큰을 정렬했을 때, 누적 확률이 p가 될 때까지의 후보군에서만 샘플링max_tokens: 생성할 텍스트의 최대 길이를 지정B. 텍스트 생성 및 결과 확인
llm 객체의 generate() 메서드를 호출하여 텍스트 생성을 실행generate()의 첫 번째 인자로는 프롬프트(prompt) 리스트를, 두 번째 인자로는 SamplingParams 객체를 전달outputs 리스트를 순회하면서 각 프롬프트에 대한 생성 결과(텍스트, 토큰 ID 등)를 확인 가능from vllm import LLM, SamplingParams
# 텍스트 생성 방법을 정의하는 샘플링 파라미터
sampling_params = SamplingParams(
temperature=0.8,
top_p=0.95,
max_tokens=100
)
# 생성할 텍스트의 시작점(프롬프트)
prompt = "The future of AI is "
# 프롬프트와 샘플링 파라미터를 이용해 텍스트 생성
outputs = llm.generate([prompt], sampling_params)
# 결과 확인
for output in outputs:
print(f"Prompt: {output.prompt}")
print(f"Generated text: {output.outputs[0].text}")
print(f"Tokens generated: {len(output.outputs[0].token_ids)}")
A. 개별 처리 vs 배치 처리
for 문을 돌면서 하나씩 generate()로 처리하는 방식generate()에 한 번에 넘겨 처리하는 방식B. 성능 비교
import time
from vllm import LLM, SamplingParams
llm = LLM(model="facebook/opt-125m")
sampling_params = SamplingParams(max_tokens=50)
prompts = [f"Test prompt {i}" for i in range(8)] # 테스트용 프롬프트 8개
# 1. 개별 처리 시간 측정
start_time = time.time()
for prompt in prompts:
llm.generate([prompt], sampling_params)
individual_time = time.time() - start_time
# 2. 배치 처리 시간 측정
start_time = time.time()
llm.generate(prompts, sampling_params)
batch_time = time.time() - start_time
print(f"개별 처리 소요 시간: {individual_time}")
print(f"배치 처리 소요 시간: {batch_time}")
# 처리량(Throughput) 측정
batch_sizes = [1, 2, 4, 8]
throughputs = []
for batch in batch_sizes:
batch_prompts = [f"Test prompt {i}" for i in range(batch)]
start_time = time.time()
_ = llm.generate(batch_prompts, sampling_params)
elapsed_time = time.time() - start_time
throughput = batch / elapsed_time # (처리한 요청 수) / (걸린 시간)
throughputs.append(throughput)
print(f"Batch size {batch}: {elapsed_time:.4f} sec, Throughput: {throughput:.2f} req/sec")
A. vLLM 프로세스 확인
nvidia-smi 명령어를 통해 실제 GPU를 사용하는 프로세스를 확인할 수 있음VLLM::EngineCore 라는 이름의 프로세스가 GPU 메모리를 점유하고 있는 것을 볼 수 있음B. 프로세스 강제 종료
kill -9) 해줘야 함# 1. nvidia-smi로 VLLM 엔진의 PID(프로세스 ID) 확인
!nvidia-smi
# 2. grep과 awk를 조합해 PID를 자동으로 찾아 종료 (더 편리한 방법)
!kill -9 $(nvidia-smi | grep VLLM::EngineCore | awk '{print $5}')
# 3. 확인된 PID를 직접 입력하여 종료
# !kill -9 3213060
A. 실행 방법
python -m vllm.entrypoints.openai.api_server 명령어로 서버를 실행B. 주요 실행 옵션(Arguments)
-model: 사용할 모델 이름-port: API 서버를 실행할 포트 번호-max-model-len: 모델이 처리할 최대 토큰 길이-gpu-memory-utilization: 사용할 GPU 메모리 비율# facebook/opt-125m 모델을 8000번 포트에서 API 서버로 실행
!python -m vllm.entrypoints.openai.api_server \
--model facebook/opt-125m \
--port 8000 \
--max-model-len 256 \
--gpu-memory-utilization 0.5
A. 비동기(Asynchronous) 요청
asyncio와 aiohttp 라이브러리를 사용하면, 여러 개의 API 요청을 동시에 보내고 응답을 기다리는 비동기 코드를 쉽게 작성할 수 있음B. Latency 및 Health Check
aiohttp.ClientSession을 사용해 서버에 POST 요청을 보내고, 각 요청이 완료되기까지 걸리는 시간(Latency)을 측정import asyncio
import aiohttp
import time
# 동시에 보낼 요청의 수
NUM_REQUESTS = 10
URL = "http://localhost:8000/v1/completions"
# 비동기로 API 요청을 보내는 함수
async def make_request(session, request_id):
payload = {
"model": "facebook/opt-125m",
"prompt": f"Test request {request_id}: The weather is ",
"max_tokens": 30,
"temperature": 0.7
}
start_time = time.time()
async with session.post(URL, json=payload) as response:
result = await response.json()
latency = time.time() - start_time
print(f"Request {request_id} - Latency: {latency:.4f} sec, Status: {response.status}")
return {
"request_id": request_id,
"latency": latency,
"status": response.status
}
# 메인 비동기 실행 함수
async def benchmark_api(num_requests=NUM_REQUESTS):
async with aiohttp.ClientSession() as session:
# 정해진 수만큼 요청 작업을 생성
tasks = [make_request(session, i) for i in range(num_requests)]
# 모든 작업을 동시에 실행하고 결과를 기다림
results = await asyncio.gather(*tasks)
return results
# 비동기 함수 실행
# await benchmark_api()
A. 배치 사이즈별 성능 테스트
batch_sizes 리스트를 만들어 동시 요청 수를 늘려가며 전체 작업 완료 시간(batch_time)을 측정import asyncio
import aiohttp
import time
URL = "http://localhost:8000/v1/completions"
async def batch_efficiency_test():
# 테스트할 동시 요청 수 목록
batch_sizes = [1, 2, 4, 8, 16]
results = []
async with aiohttp.ClientSession() as session:
for batch_size in batch_sizes:
print(f"Testing with batch size: {batch_size}")
tasks = []
for i in range(batch_size):
payload = {
"model": "facebook/opt-125m",
"prompt": f"Test request {i}: The weather is ",
"max_tokens": 30,
"temperature": 0.7
}
# aiohttp.post는 코루틴이 아니므로 await을 붙이지 않음
# session.post 자체를 태스크로 만들어야 함
task = asyncio.create_task(session.post(URL, json=payload))
tasks.append(task)
start_time = time.time()
# 모든 요청이 끝날 때까지 기다림
responses = await asyncio.gather(*tasks)
batch_time = time.time() - start_time
# 응답 상태 확인 (옵션)
success_count = sum(1 for r in responses if r.status == 200)
print(f"-> Total time for {batch_size} requests: {batch_time:.4f} sec")
print(f"-> Successful responses: {success_count}/{batch_size}\n")
results.append({"batch_size": batch_size, "time": batch_time})
return results
# 비동기 함수 실행
# await batch_efficiency_test()