KFModel
- kfserving에서 각 프레임워크별로 서버를 만들때 상속받는 class로 kfserving의 기본 flow가 구현되어 있다.
- http client(predictor, transformer 등 통신)
- load model
- preprocess
- predict
- postprocess
- explain
KFServer
- Serving을 위한 Server 구현부
- 아래의 외부 image를 사용하는 것을 제외하고, code를 Docker build하는 부분에서 공통으로 사용하는 Server
- Server는 tornado web app server를 사용
- KFServer.start(KFModel)로 Server를 구동
tensorflow server
"image": "tensorflow/serving"
onnx server
"image": "mcr.microsoft.com/onnxruntime/server"
triton server
"image": "nvcr.io/nvidia/tritonserver"
lightgbm
"image": "kfserving/lgbserver"
pytorch server
import kfserving
import os
from typing import Dict
import torch
import importlib
import sys
PYTORCH_FILE = "model.pt"
class PyTorchModel(kfserving.KFModel):
def __init__(self, name: str, model_class_name: str, model_dir: str):
super().__init__(name)
self.name = name
self.model_class_name = model_class_name
self.model_dir = model_dir
self.ready = False
self.model = None
self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
def load(self) -> bool:
model_file_dir = kfserving.Storage.download(self.model_dir, self.name)
model_file = os.path.join(model_file_dir, PYTORCH_FILE)
py_files = []
for filename in os.listdir(model_file_dir):
if filename.endswith('.py'):
py_files.append(filename)
if len(py_files) == 1:
model_class_file = os.path.join(model_file_dir, py_files[0])
elif len(py_files) == 0:
raise Exception('Missing PyTorch Model Class File.')
else:
raise Exception('More than one Python file is detected',
'Only one Python file is allowed within model_dir.')
model_class_name = self.model_class_name
# Load the python class into memory
sys.path.append(os.path.dirname(model_class_file))
modulename = os.path.basename(model_class_file).split('.')[0].replace('-', '_')
model_class = getattr(importlib.import_module(modulename), model_class_name)
# Make sure the model weight is transform with the right device in this machine
self.model = model_class().to(self.device)
self.model.load_state_dict(torch.load(model_file, map_location=self.device))
self.model.eval()
self.ready = True
return self.ready
def predict(self, request: Dict) -> Dict:
inputs = []
with torch.no_grad():
try:
inputs = torch.tensor(request["instances"]).to(self.device)
except Exception as e:
raise TypeError(
"Failed to initialize Torch Tensor from inputs: %s, %s" % (e, inputs))
try:
return {"predictions": self.model(inputs).tolist()}
except Exception as e:
raise Exception("Failed to predict %s" % e)
-----
import kfserving
import argparse
from pytorchserver import PyTorchModel
DEFAULT_MODEL_NAME = "model"
DEFAULT_LOCAL_MODEL_DIR = "/tmp/model"
DEFAULT_MODEL_CLASS_NAME = "PyTorchModel"
parser = argparse.ArgumentParser(parents=[kfserving.kfserver.parser])
parser.add_argument('--model_dir', required=True,
help='A URI pointer to the model directory')
parser.add_argument('--model_name', default=DEFAULT_MODEL_NAME,
help='The name that the model is served under.')
parser.add_argument('--model_class_name', default=DEFAULT_MODEL_CLASS_NAME,
help='The class name for the model.')
args, _ = parser.parse_known_args()
if __name__ == "__main__":
model = PyTorchModel(args.model_name, args.model_class_name, args.model_dir)
model.load()
kfserving.KFServer().start([model])
sklearn server
import kfserving
import joblib
import numpy as np
import os
from typing import Dict
MODEL_BASENAME = "model"
MODEL_EXTENSIONS = [".joblib", ".pkl", ".pickle"]
class SKLearnModel(kfserving.KFModel): # pylint:disable=c-extension-no-member
def __init__(self, name: str, model_dir: str):
super().__init__(name)
self.name = name
self.model_dir = model_dir
self.ready = False
def load(self) -> bool:
model_path = kfserving.Storage.download(self.model_dir)
paths = [os.path.join(model_path, MODEL_BASENAME + model_extension)
for model_extension in MODEL_EXTENSIONS]
for path in paths:
if os.path.exists(path):
self._model = joblib.load(path)
self.ready = True
break
return self.ready
def predict(self, request: Dict) -> Dict:
instances = request["instances"]
try:
inputs = np.array(instances)
except Exception as e:
raise Exception(
"Failed to initialize NumPy array from inputs: %s, %s" % (e, instances))
try:
result = self._model.predict(inputs).tolist()
return {"predictions": result}
except Exception as e:
raise Exception("Failed to predict %s" % e)
------
import os
from kfserving.kfmodel_repository import KFModelRepository, MODEL_MOUNT_DIRS
from sklearnserver import SKLearnModel
class SKLearnModelRepository(KFModelRepository):
def __init__(self, model_dir: str = MODEL_MOUNT_DIRS):
super().__init__(model_dir)
async def load(self, name: str) -> bool:
model = SKLearnModel(name, os.path.join(self.models_dir, name))
if model.load():
self.update(model)
return model.ready
----
mport argparse
import logging
import sys
import kfserving
from sklearnserver import SKLearnModel, SKLearnModelRepository
DEFAULT_MODEL_NAME = "model"
DEFAULT_LOCAL_MODEL_DIR = "/tmp/model"
parser = argparse.ArgumentParser(parents=[kfserving.kfserver.parser])
parser.add_argument('--model_dir', required=True,
help='A URI pointer to the model binary')
parser.add_argument('--model_name', default=DEFAULT_MODEL_NAME,
help='The name that the model is served under.')
args, _ = parser.parse_known_args()
if __name__ == "__main__":
model = SKLearnModel(args.model_name, args.model_dir)
try:
model.load()
except Exception:
ex_type, ex_value, _ = sys.exc_info()
logging.error(f"fail to load model {args.model_name} from dir {args.model_dir}. "
f"exception type {ex_type}, exception msg: {ex_value}")
model.ready = False
kfserving.KFServer(registered_models=SKLearnModelRepository(args.model_dir)).start([model] if model.ready else [])
xgboost server
import kfserving
import xgboost as xgb
import numpy as np
from xgboost import XGBModel
import os
from typing import Dict
BOOSTER_FILE = "model.bst"
class XGBoostModel(kfserving.KFModel):
def __init__(self, name: str, model_dir: str, nthread: int,
booster: XGBModel = None):
super().__init__(name)
self.name = name
self.model_dir = model_dir
self.nthread = nthread
if booster is not None:
self._booster = booster
self.ready = True
def load(self) -> bool:
model_file = os.path.join(
kfserving.Storage.download(self.model_dir), BOOSTER_FILE)
self._booster = xgb.Booster(params={"nthread": self.nthread},
model_file=model_file)
self.ready = True
return self.ready
def predict(self, request: Dict) -> Dict:
try:
# Use of list as input is deprecated see https://github.com/dmlc/xgboost/pull/3970
dmatrix = xgb.DMatrix(np.array(request["instances"]), nthread=self.nthread)
result: xgb.DMatrix = self._booster.predict(dmatrix)
return {"predictions": result.tolist()}
except Exception as e:
raise Exception("Failed to predict %s" % e)
----
import os
from kfserving.kfmodel_repository import KFModelRepository, MODEL_MOUNT_DIRS
from xgbserver import XGBoostModel
class XGBoostModelRepository(KFModelRepository):
def __init__(self, model_dir: str = MODEL_MOUNT_DIRS, nthread: int = 1):
super().__init__(model_dir)
self.nthread = nthread
async def load(self, name: str, ) -> bool:
model = XGBoostModel(name, os.path.join(self.models_dir, name), self.nthread)
if model.load():
self.update(model)
return model.ready
----
import argparse
import logging
import sys
import kfserving
from xgbserver import XGBoostModel, XGBoostModelRepository
DEFAULT_MODEL_NAME = "default"
DEFAULT_LOCAL_MODEL_DIR = "/tmp/model"
DEFAULT_NTHREAD = 1
parser = argparse.ArgumentParser(parents=[kfserving.kfserver.parser]) # pylint:disable=c-extension-no-member
parser.add_argument('--model_dir', required=True,
help='A URI pointer to the model directory')
parser.add_argument('--model_name', default=DEFAULT_MODEL_NAME,
help='The name that the model is served under.')
parser.add_argument('--nthread', default=DEFAULT_NTHREAD,
help='Number of threads to use by XGBoost.')
args, _ = parser.parse_known_args()
if __name__ == "__main__":
model = XGBoostModel(args.model_name, args.model_dir, args.nthread)
try:
model.load()
except Exception:
ex_type, ex_value, _ = sys.exc_info()
logging.error(f"fail to load model {args.model_name} from dir {args.model_dir}. "
f"exception type {ex_type}, exception msg: {ex_value}")
model.ready = False
kfserving.KFServer(registered_models=XGBoostModelRepository(args.model_dir, args.nthread))\
.start([model] if model.ready else []) # pylint:disable=c-extension-no-member
pmml server
import os
import kfserving
from jpmml_evaluator import make_evaluator
from jpmml_evaluator.py4j import launch_gateway, Py4JBackend
from typing import Dict
MODEL_BASENAME = "model"
MODEL_EXTENSIONS = ['.pmml']
class PmmlModel(kfserving.KFModel):
def __init__(self, name: str, model_dir: str):
super().__init__(name)
self.name = name
self.model_dir = model_dir
self.ready = False
self.evaluator = None
self.input_fields = []
self._gateway = None
self._backend = None
def load(self) -> bool:
model_path = kfserving.Storage.download(self.model_dir)
paths = [os.path.join(model_path, MODEL_BASENAME + model_extension)
for model_extension in MODEL_EXTENSIONS]
for path in paths:
if os.path.exists(path):
self._gateway = launch_gateway()
self._backend = Py4JBackend(self._gateway)
self.evaluator = make_evaluator(self._backend, path).verify()
self.input_fields = [inputField.getName() for inputField in self.evaluator.getInputFields()]
self.ready = True
break
return self.ready
def predict(self, request: Dict) -> Dict:
instances = request["instances"]
try:
result = [self.evaluator.evaluate(dict(zip(self.input_fields, instance))) for instance in instances]
return {"predictions": result}
except Exception as e:
raise Exception("Failed to predict %s" % e)
----
port kfserving
import argparse
from pmmlserver import PmmlModel
DEFAULT_MODEL_NAME = "model"
DEFAULT_LOCAL_MODEL_DIR = "/tmp/model"
parser = argparse.ArgumentParser(parents=[kfserving.kfserver.parser])
parser.add_argument('--model_dir', required=True,
help='A URI pointer to the model directory')
parser.add_argument('--model_name', default=DEFAULT_MODEL_NAME,
help='The name that the model is served under.')
args, _ = parser.parse_known_args()
if __name__ == "__main__":
model = PmmlModel(args.model_name, args.model_dir)
model.load()
kfserving.KFServer().start([model])
https://github.com/kubeflow/kfserving/tree/master/python