Serving Server - kfserving

우야·2021년 6월 15일
0

KFModel

  • kfserving에서 각 프레임워크별로 서버를 만들때 상속받는 class로 kfserving의 기본 flow가 구현되어 있다.
    • http client(predictor, transformer 등 통신)
    • load model
    • preprocess
    • predict
    • postprocess
    • explain

KFServer

  • Serving을 위한 Server 구현부
    • 아래의 외부 image를 사용하는 것을 제외하고, code를 Docker build하는 부분에서 공통으로 사용하는 Server
    • Server는 tornado web app server를 사용
    • KFServer.start(KFModel)로 Server를 구동

ML Platform Serving server image or code

tensorflow server

"image": "tensorflow/serving"

onnx server

"image": "mcr.microsoft.com/onnxruntime/server"

triton server

"image": "nvcr.io/nvidia/tritonserver"

lightgbm

"image": "kfserving/lgbserver"

pytorch server


import kfserving
import os
from typing import Dict
import torch
import importlib
import sys

PYTORCH_FILE = "model.pt"


class PyTorchModel(kfserving.KFModel):
    def __init__(self, name: str, model_class_name: str, model_dir: str):
        super().__init__(name)
        self.name = name
        self.model_class_name = model_class_name
        self.model_dir = model_dir
        self.ready = False
        self.model = None
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

    def load(self) -> bool:
        model_file_dir = kfserving.Storage.download(self.model_dir, self.name)
        model_file = os.path.join(model_file_dir, PYTORCH_FILE)
        py_files = []
        for filename in os.listdir(model_file_dir):
            if filename.endswith('.py'):
                py_files.append(filename)
        if len(py_files) == 1:
            model_class_file = os.path.join(model_file_dir, py_files[0])
        elif len(py_files) == 0:
            raise Exception('Missing PyTorch Model Class File.')
        else:
            raise Exception('More than one Python file is detected',
                            'Only one Python file is allowed within model_dir.')
        model_class_name = self.model_class_name

        # Load the python class into memory
        sys.path.append(os.path.dirname(model_class_file))
        modulename = os.path.basename(model_class_file).split('.')[0].replace('-', '_')
        model_class = getattr(importlib.import_module(modulename), model_class_name)

        # Make sure the model weight is transform with the right device in this machine
        self.model = model_class().to(self.device)
        self.model.load_state_dict(torch.load(model_file, map_location=self.device))
        self.model.eval()
        self.ready = True
        return self.ready

    def predict(self, request: Dict) -> Dict:
        inputs = []
        with torch.no_grad():
            try:
                inputs = torch.tensor(request["instances"]).to(self.device)
            except Exception as e:
                raise TypeError(
                    "Failed to initialize Torch Tensor from inputs: %s, %s" % (e, inputs))
            try:
                return {"predictions":  self.model(inputs).tolist()}
            except Exception as e:
                raise Exception("Failed to predict %s" % e)

-----

import kfserving
import argparse

from pytorchserver import PyTorchModel

DEFAULT_MODEL_NAME = "model"
DEFAULT_LOCAL_MODEL_DIR = "/tmp/model"
DEFAULT_MODEL_CLASS_NAME = "PyTorchModel"

parser = argparse.ArgumentParser(parents=[kfserving.kfserver.parser])
parser.add_argument('--model_dir', required=True,
                    help='A URI pointer to the model directory')
parser.add_argument('--model_name', default=DEFAULT_MODEL_NAME,
                    help='The name that the model is served under.')
parser.add_argument('--model_class_name', default=DEFAULT_MODEL_CLASS_NAME,
                    help='The class name for the model.')
args, _ = parser.parse_known_args()

if __name__ == "__main__":
    model = PyTorchModel(args.model_name, args.model_class_name, args.model_dir)
    model.load()
    kfserving.KFServer().start([model])
    

sklearn server


import kfserving
import joblib
import numpy as np
import os
from typing import Dict

MODEL_BASENAME = "model"
MODEL_EXTENSIONS = [".joblib", ".pkl", ".pickle"]


class SKLearnModel(kfserving.KFModel):  # pylint:disable=c-extension-no-member
    def __init__(self, name: str, model_dir: str):
        super().__init__(name)
        self.name = name
        self.model_dir = model_dir
        self.ready = False

    def load(self) -> bool:
        model_path = kfserving.Storage.download(self.model_dir)
        paths = [os.path.join(model_path, MODEL_BASENAME + model_extension)
                 for model_extension in MODEL_EXTENSIONS]
        for path in paths:
            if os.path.exists(path):
                self._model = joblib.load(path)
                self.ready = True
                break
        return self.ready

    def predict(self, request: Dict) -> Dict:
        instances = request["instances"]
        try:
            inputs = np.array(instances)
        except Exception as e:
            raise Exception(
                "Failed to initialize NumPy array from inputs: %s, %s" % (e, instances))
        try:
            result = self._model.predict(inputs).tolist()
            return {"predictions": result}
        except Exception as e:
            raise Exception("Failed to predict %s" % e)

------

import os
from kfserving.kfmodel_repository import KFModelRepository, MODEL_MOUNT_DIRS
from sklearnserver import SKLearnModel


class SKLearnModelRepository(KFModelRepository):

    def __init__(self, model_dir: str = MODEL_MOUNT_DIRS):
        super().__init__(model_dir)

    async def load(self, name: str) -> bool:
        model = SKLearnModel(name, os.path.join(self.models_dir, name))
        if model.load():
            self.update(model)
        return model.ready


----

mport argparse
import logging
import sys

import kfserving
from sklearnserver import SKLearnModel, SKLearnModelRepository

DEFAULT_MODEL_NAME = "model"
DEFAULT_LOCAL_MODEL_DIR = "/tmp/model"

parser = argparse.ArgumentParser(parents=[kfserving.kfserver.parser])
parser.add_argument('--model_dir', required=True,
                    help='A URI pointer to the model binary')
parser.add_argument('--model_name', default=DEFAULT_MODEL_NAME,
                    help='The name that the model is served under.')
args, _ = parser.parse_known_args()

if __name__ == "__main__":
    model = SKLearnModel(args.model_name, args.model_dir)
    try:
        model.load()
    except Exception:
        ex_type, ex_value, _ = sys.exc_info()
        logging.error(f"fail to load model {args.model_name} from dir {args.model_dir}. "
                      f"exception type {ex_type}, exception msg: {ex_value}")
        model.ready = False
    kfserving.KFServer(registered_models=SKLearnModelRepository(args.model_dir)).start([model] if model.ready else [])

xgboost server


import kfserving
import xgboost as xgb
import numpy as np
from xgboost import XGBModel
import os
from typing import Dict

BOOSTER_FILE = "model.bst"


class XGBoostModel(kfserving.KFModel):
    def __init__(self, name: str, model_dir: str, nthread: int,
                 booster: XGBModel = None):
        super().__init__(name)
        self.name = name
        self.model_dir = model_dir
        self.nthread = nthread
        if booster is not None:
            self._booster = booster
            self.ready = True

    def load(self) -> bool:
        model_file = os.path.join(
            kfserving.Storage.download(self.model_dir), BOOSTER_FILE)
        self._booster = xgb.Booster(params={"nthread": self.nthread},
                                    model_file=model_file)
        self.ready = True
        return self.ready

    def predict(self, request: Dict) -> Dict:
        try:
            # Use of list as input is deprecated see https://github.com/dmlc/xgboost/pull/3970
            dmatrix = xgb.DMatrix(np.array(request["instances"]), nthread=self.nthread)
            result: xgb.DMatrix = self._booster.predict(dmatrix)
            return {"predictions": result.tolist()}
        except Exception as e:
            raise Exception("Failed to predict %s" % e)

----

import os
from kfserving.kfmodel_repository import KFModelRepository, MODEL_MOUNT_DIRS
from xgbserver import XGBoostModel


class XGBoostModelRepository(KFModelRepository):
    def __init__(self, model_dir: str = MODEL_MOUNT_DIRS, nthread: int = 1):
        super().__init__(model_dir)
        self.nthread = nthread

    async def load(self, name: str, ) -> bool:
        model = XGBoostModel(name, os.path.join(self.models_dir, name), self.nthread)
        if model.load():
            self.update(model)
        return model.ready

----

import argparse
import logging
import sys
import kfserving


from xgbserver import XGBoostModel, XGBoostModelRepository

DEFAULT_MODEL_NAME = "default"
DEFAULT_LOCAL_MODEL_DIR = "/tmp/model"
DEFAULT_NTHREAD = 1

parser = argparse.ArgumentParser(parents=[kfserving.kfserver.parser])  # pylint:disable=c-extension-no-member
parser.add_argument('--model_dir', required=True,
                    help='A URI pointer to the model directory')
parser.add_argument('--model_name', default=DEFAULT_MODEL_NAME,
                    help='The name that the model is served under.')
parser.add_argument('--nthread', default=DEFAULT_NTHREAD,
                    help='Number of threads to use by XGBoost.')
args, _ = parser.parse_known_args()

if __name__ == "__main__":
    model = XGBoostModel(args.model_name, args.model_dir, args.nthread)
    try:
        model.load()
    except Exception:
        ex_type, ex_value, _ = sys.exc_info()
        logging.error(f"fail to load model {args.model_name} from dir {args.model_dir}. "
                      f"exception type {ex_type}, exception msg: {ex_value}")
        model.ready = False

    kfserving.KFServer(registered_models=XGBoostModelRepository(args.model_dir, args.nthread))\
        .start([model] if model.ready else [])  # pylint:disable=c-extension-no-member

pmml server


import os

import kfserving
from jpmml_evaluator import make_evaluator
from jpmml_evaluator.py4j import launch_gateway, Py4JBackend
from typing import Dict

MODEL_BASENAME = "model"

MODEL_EXTENSIONS = ['.pmml']


class PmmlModel(kfserving.KFModel):
    def __init__(self, name: str, model_dir: str):
        super().__init__(name)
        self.name = name
        self.model_dir = model_dir
        self.ready = False
        self.evaluator = None
        self.input_fields = []
        self._gateway = None
        self._backend = None

    def load(self) -> bool:
        model_path = kfserving.Storage.download(self.model_dir)
        paths = [os.path.join(model_path, MODEL_BASENAME + model_extension)
                 for model_extension in MODEL_EXTENSIONS]
        for path in paths:
            if os.path.exists(path):
                self._gateway = launch_gateway()
                self._backend = Py4JBackend(self._gateway)
                self.evaluator = make_evaluator(self._backend, path).verify()
                self.input_fields = [inputField.getName() for inputField in self.evaluator.getInputFields()]
                self.ready = True
                break
        return self.ready

    def predict(self, request: Dict) -> Dict:
        instances = request["instances"]
        try:
            result = [self.evaluator.evaluate(dict(zip(self.input_fields, instance))) for instance in instances]
            return {"predictions": result}
        except Exception as e:
            raise Exception("Failed to predict %s" % e)

----
port kfserving
import argparse

from pmmlserver import PmmlModel

DEFAULT_MODEL_NAME = "model"
DEFAULT_LOCAL_MODEL_DIR = "/tmp/model"

parser = argparse.ArgumentParser(parents=[kfserving.kfserver.parser])
parser.add_argument('--model_dir', required=True,
                    help='A URI pointer to the model directory')
parser.add_argument('--model_name', default=DEFAULT_MODEL_NAME,
                    help='The name that the model is served under.')
args, _ = parser.parse_known_args()

if __name__ == "__main__":
    model = PmmlModel(args.model_name, args.model_dir)
    model.load()
    kfserving.KFServer().start([model])

https://github.com/kubeflow/kfserving/tree/master/python

profile
Fullstack developer

0개의 댓글