오늘은 미래의 AI 서비스 핵심이 될 서버리스(Serverless) 환경에서의 LLM(Large Language Model) 추론 최적화에 대한 흥미로운 사전 실험 이야기를 공유하려 합니다. 특히, 수백 GB에 달하는 LLM을 서버리스 환경에서 서비스할 때 발생하는 고질적인 문제인 콜드 스타트(Cold Start) 지연과 메모리/디스크 제한을 GPU 없이 어떻게 탐색하고 개선 가능성을 엿볼 수 있는지에 초점을 맞췄습니다.
아직 GPU는 없지만, 아이디어를 검증하고 핵심 과제를 깊이 이해하는 데는 얼마든지 CPU와 파이토치(PyTorch), 도커(Docker)만으로도 충분하다는 것을 보여드리고자 합니다.
서버리스 아키텍처는 유휴 시 자원 소모가 없고 자동으로 확장/축소되는 매력적인 컴퓨팅 모델입니다. 하지만 GPT-4, LLaMA-2 70B와 같은 초대형 LLM을 서버리스 함수(예: AWS Lambda)에 배포할 때는 다음과 같은 심각한 기술적 과제에 직면합니다.
이러한 문제들 때문에 서버리스 LLM 추론은 비용 효율성과 응답성 사이에서 어려운 트레이드오프에 놓이게 됩니다.
저희의 목표는 다음과 같습니다.
torch
, transformers
GPU 없이 LLM을 다루기 위해, 메모리에 로드 가능한 아주 작은 LLM을 선택합니다. Hugging Face transformers
라이브러리를 활용했습니다.
여기서는 예시로 google/gemma-2b
모델을 사용하지만, 더 작은 stabilityai/stablelm-2-zephyr-1_6b
나 Qwen-1.8B-Chat
등도 좋은 선택입니다. 핵심은 torch_dtype=torch.float16
을 사용하여 메모리 사용량을 줄이는 것입니다.
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
import time
import os
model_name = "google/gemma-2b" # CPU에서 로드 가능한 작은 LLM 선택
print(f"Loading tokenizer and model: {model_name}")
tokenizer = AutoTokenizer.from_pretrained(model_name)
# float16으로 로드하여 메모리 절약 (CPU에서도 작동)
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16)
model.to("cpu") # 명시적으로 CPU로 이동
# 전체 모델의 state_dict 저장 (이후 부분 로딩 시뮬레이션에 사용)
model_file_path = f"{model_name.split('/')[-1]}_full.pth"
print(f"Saving full model state_dict to {model_file_path}")
torch.save(model.state_dict(), model_file_path)
# 모델 크기 추정 (메모리에 로드된 상태 기준)
total_params = sum(p.numel() for p in model.parameters())
total_memory_bytes = sum(p.numel() * p.element_size() for p in model.parameters())
print(f"Total parameters: {total_params / 1e6:.2f} M")
print(f"Estimated full model size in memory: {total_memory_bytes / (1024**3):.2f} GB")
# 예: Gemma 2B float16의 경우 약 3.8GB
가장 기본적인 시나리오로, 단일 서버리스 함수 인스턴스에서 전체 모델을 로드할 때 걸리는 시간을 측정합니다. 이는 콜드 스타트의 순수 모델 로딩 시간을 흉내 냅니다.
# 기존 모델 제거 (콜드 스타트 시뮬레이션을 위해)
del model
torch.cuda.empty_cache() # GPU 없어도 호출 가능
print("\n--- Simulating Single Instance Cold Start ---")
start_time = time.time()
# 모델을 다시 로드 (실제 서버리스 함수가 처음 기동될 때의 상황)
model_loaded_single = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16)
model_loaded_single.to("cpu")
end_time = time.time()
single_load_time = end_time - start_time
print(f"Single instance full model loading time: {single_load_time:.2f} seconds")
이제 핵심 아이디어인 모델 분할을 시뮬레이션합니다. 전체 모델의 state_dict
를 여러 개의 부분으로 나누고, 각 부분을 별도의 "가상 인스턴스" (여기서는 멀티프로세싱의 Process
를 사용)에서 동시에 로드하는 시나리오를 만듭니다.
from multiprocessing import Process, Queue, Pipe
# 더미 모델 클래스 (실제 LLM 아키텍처를 단순화)
class DummyPartialModel(torch.nn.Module):
def __init__(self, partial_state_dict):
super().__init__()
# 가중치 텐서들이 메모리에 로드되는 것을 시뮬레이션
for k, v in partial_state_dict.items():
# 실제 모델처럼 복잡한 구조를 가지지 않지만, 메모리 점유는 흉내냄
self.register_buffer(k.replace('.', '_'), v) # 텐서를 모델의 버퍼로 등록
def forward(self, x):
# 실제 추론 로직은 없지만, 입력/출력 텐서 전달을 시뮬레이션
return x # 통과만 시킴
def worker_load_and_process(worker_id, partial_state_dict, start_pipe, end_pipe, input_q, output_q):
print(f"Worker {worker_id}: Starting to load model part...")
# 콜드 스타트 시뮬레이션 (모델 부분 로딩)
load_start_time = time.time()
partial_model_instance = DummyPartialModel(partial_state_dict).to("cpu")
load_end_time = time.time()
load_time = load_end_time - load_start_time
print(f"Worker {worker_id}: Part loaded in {load_time:.2f} seconds.")
# 로딩 완료 시간 부모 프로세스에 전달
end_pipe.send(load_time)
# 이제 '워밍업'된 상태에서 추론 요청 대기 시뮬레이션
while True:
data = input_q.get() # 이전 인스턴스에서 데이터 수신
if data is None: # 종료 신호
break
# 가상의 추론 연산 시간 시뮬레이션 (아주 짧게)
# 실제 LLM은 복잡한 연산을 하므로, GPU에서 이 시간이 훨씬 길어짐
time.sleep(0.005)
# 다음 인스턴스로 결과 전달 (직렬화/역직렬화 오버헤드 포함)
output_q.put(data) # 실제로는 처리된 결과 전달
print(f"Worker {worker_id}: Exiting.")
# --- 메인 실행 로직 ---
print("\n--- Simulating Collaborative Inference ---")
# 저장된 전체 state_dict 불러오기
full_state_dict = torch.load(model_file_path, map_location='cpu')
num_parts = 4 # 모델을 4개 부분으로 분할 (예시)
all_keys = list(full_state_dict.keys())
keys_per_part = len(all_keys) // num_parts
partial_state_dicts = []
for i in range(num_parts):
start_idx = i * keys_per_part
end_idx = (i + 1) * keys_per_part if i < num_parts - 1 else len(all_keys)
part_keys = all_keys[start_idx:end_idx]
partial_dict = {key: full_state_dict[key] for key in part_keys}
partial_state_dicts.append(partial_dict)
print(f"Model split into {num_parts} parts.")
# 프로세스 및 큐 생성
processes = []
load_pipes = [] # 각 워커의 로딩 시간을 받기 위한 파이프
queues = [Queue() for _ in range(num_parts + 1)] # 첫 큐는 입력, 마지막 큐는 최종 출력
for i in range(num_parts):
parent_conn, child_conn = Pipe()
load_pipes.append(parent_conn)
p = Process(target=worker_load_and_process,
args=(i, partial_state_dicts[i], child_conn, parent_conn, queues[i], queues[i+1]))
processes.append(p)
p.start()
# 모든 워커의 로딩 시간 수집 (가장 오래 걸린 워커가 전체 콜드 스타트 시간 결정)
individual_load_times = []
for pipe in load_pipes:
individual_load_times.append(pipe.recv())
max_collaborative_load_time = max(individual_load_times)
print(f"Max (Simulated Cold Start) time for Collaborative Inference: {max_collaborative_load_time:.2f} seconds")
# --- 가상 추론 요청 시뮬레이션 ---
# 모델이 다 로드된 후 (워밍업된 상태) 실제 추론 요청이 들어오는 상황
print("\n--- Simulating Inference Request after Cold Start ---")
dummy_input_tensor = torch.randn(1, 10, 768, dtype=torch.float16) # 임의의 입력 텐서
# (여기서 768은 임베딩 차원, 실제 모델의 입력 shape에 맞게 조절 필요)
inference_start_time = time.time()
queues[0].put(dummy_input_tensor) # 첫 번째 워커에게 입력 전달
final_output = queues[num_parts].get() # 최종 워커로부터 결과 수신
inference_end_time = time.time()
print(f"Total simulated inference time (incl. inter-process communication): {inference_end_time - inference_start_time:.2f} seconds")
# 모든 워커 프로세스 종료
for q in queues[:-1]: # 마지막 큐 제외
q.put(None) # 종료 신호 전달
for p in processes:
p.join() # 프로세스 종료 대기
print("\n--- Experiment Summary ---")
print(f"Single instance cold start time: {single_load_time:.2f} seconds")
print(f"Collaborative inference (N={num_parts}) max cold start time: {max_collaborative_load_time:.2f} seconds")
if single_load_time > max_collaborative_load_time:
print(f"Conclusion: Collaborative inference reduced cold start time by {single_load_time - max_collaborative_load_time:.2f} seconds!")
else:
print("Conclusion: Collaborative inference did not significantly reduce cold start time in this simulation.")
위의 multiprocessing
예제는 로컬 프로세스 간의 협업을 보여주지만, 더 현실적인 서버리스 환경을 시뮬레이션하려면 Docker 컨테이너를 사용할 수 있습니다. 각 서버리스 함수는 격리된 컨테이너에서 실행되므로, 이 시뮬레이션이 더 적합합니다.
Dockerfile 작성:
# Dockerfile
FROM python:3.9-slim-buster
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY your_experiment_script.py .
CMD ["python", "your_experiment_script.py"]
requirements.txt
에는 torch
, transformers
등을 명시합니다.
실험 스크립트 수정: 각 Docker 컨테이너는 독립적이므로, multiprocessing
대신 각 컨테이너가 전체 모델의 '부분'만 로드하도록 스크립트를 변경하고, 컨테이너 간 통신은 실제 네트워크 통신(예: 간단한 HTTP API)을 흉내 내야 합니다. 이 부분은 복잡도가 있으므로, 사전 검증 초기 단계에서는 multiprocessing
시뮬레이션으로 충분할 수 있습니다.
docker run
명령어를 사용하여 컨테이너를 시작하고, 이 컨테이너 내에서 모델 로딩 스크립트를 실행하는 시간을 측정합니다. 여러 개의 컨테이너를 동시에 띄워 네트워크 지연을 시뮬레이션할 수도 있습니다.time docker run --rm your_image_name python your_script.py
위 예시 코드의 결과는 다음과 유사하게 나타날 것입니다 (모델 크기 및 CPU 성능에 따라 상이):
주요 시사점:
Queue
를 통해 간단히 흉내 냈지만, 실제 분산 환경에서는 이 부분이 중요한 병목이 될 수 있음을 인지해야 합니다.GPU 없이 진행된 이번 사전 실험을 통해 "모델 분할 및 협업 추론"이라는 아이디어가 서버리스 환경에서의 LLM 콜드 스타트 문제를 효과적으로 완화할 수 있는 잠재력을 가지고 있음을 확인했습니다. 특히, 콜드 스타트 지연과 메모리/디스크 제한 측면에서 개선 가능성을 엿볼 수 있었습니다.
물론, 이 실험은 GPU 연산을 포함하지 않으며, 실제 LLM 추론의 복잡한 파이프라인 병렬 처리 및 GPU 메모리 관리 등은 고려되지 않았습니다. 하지만 CPU 환경에서 I/O 및 로딩 시간이라는 핵심 병목을 시뮬레이션했다는 점에서 큰 의미가 있습니다.
향후 연구 방향은 다음과 같습니다.
GPU 없이 시작한 작은 실험이지만, 서버리스 LLM 최적화라는 큰 목표를 향한 첫걸음을 내딛는 데 중요한 인사이트를 얻었습니다. 다음 글에서는 실제 GPU 환경에서의 실험 결과와 더 깊이 있는 최적화 기법에 대해 다뤄볼 수 있기를 기대합니다.