Component

JeongChaeJin·2022년 11월 28일
0

모두의 MLOps (Kubeflow)

목록 보기
13/21

Component

  • 컴포넌트 작성을 위해서는 2가지 내용을 작성해야 한다.
      1. Component Contents 작성
      1. Component Wrapper 작성

Write

Component Contents

  • 흔히 작성하는 Python Code와 다르지 않다.
print(number)
  • 숫자를 입력받고 출력한 뒤 반환하는 컴포넌트의 예시다.
  • 하지만 number라는 변수가 정의되어있지 않아 오류가 날 것이다.
  • 이런 Component contents에서 필요한 값들은 Config로 정의한다.
  • 컴포넌트 콘텐츠를 실행하기 위해 필요한 Config는 컴포넌트 래퍼에서 전달 되어야 한다.

Component Wrapper

Define a standalone Python function

def print_and_return_number(number: int) -> int:
    print(number)
    return number
  • 콘텐츠에서 필요한 Config를 래퍼의 argument로 추가한다. Type Hint도 작성해야된다.
  • Kubeflow에서는 Pipeline을 Kubeflow Format으로 변환할할 때 컴포넌트 간 연결에서 정해진 입력과 출력의 타입이 일치하는지 체크한다.
    • 만약, 컴포넌트가 필요로 하는 입력과 다른 컴포넌트로부터 전달받은 출력의 포맷이 일치하지 않을 경우 파이프라인 생성을 할 수 없다.
  • 위 처럼 argument + type hint, 반환 type hint를 통해 Component Wrapper를 완성한다.
from typing import NamedTuple


def divide_and_return_number(
    number: int,
) -> NamedTuple("DivideOutputs", [("quotient", int), ("remainder", int)]):
    from collections import namedtuple

    quotient, remainder = divmod(number, 2)
    print("quotient is", quotient)
    print("remainder is", remainder)

    divide_outputs = namedtuple(
        "DivideOutputs",
        [
            "quotient",
            "remainder",
        ],
    )
    return divide_outputs(quotient, remainder)
  • 단일 값이 아닌 여러 값을 반환하려면, collentions.namedtuple을 이용해야 한다.
  • 위 예시는 숫자를 2로 나눈 몫과 나머지를 반환하는 컴포넌트이다.

Convert to Kubeflow Format

from kfp.components import create_component_from_func

@create_component_from_func
def print_and_return_number(number: int) -> int:
    print(number)
    return number
  • 작성한 Component를 Kubeflow에서 사용할 수 있는 format으로 변환한다.
    • kfp.components.create_component_from_func 사용

Share component with yaml file

  • python code로 공유할 수 없는 경우, yaml 파일로 컴포넌트를 공유해 사용할 수 있다.
from kfp.components import create_component_from_func

@create_component_from_func
def print_and_return_number(number: int) -> int:
    print(number)
    return number

if __name__ == "__main__":
    print_and_return_number.component_spec.save("print_and_return_number.yaml")
  • 컴포넌트를 yaml 파일로 변환하는 코드이다.
name: Print and return number
inputs:
- {name: number, type: Integer}
outputs:
- {name: Output, type: Integer}
implementation:
  container:
    image: python:3.7
    command:
    - sh
    - -ec
    - |
      program_path=$(mktemp)
      printf "%s" "$0" > "$program_path"
      python3 -u "$program_path" "$@"
    - |
      def print_and_return_number(number):
          print(number)
          return number

      def _serialize_int(int_value: int) -> str:
          if isinstance(int_value, str):
              return int_value
          if not isinstance(int_value, int):
              raise TypeError('Value "{}" has type "{}" instead of int.'.format(str(int_value), str(type(int_value))))
          return str(int_value)

      import argparse
      _parser = argparse.ArgumentParser(prog='Print and return number', description='')
      _parser.add_argument("--number", dest="number", type=int, required=True, default=argparse.SUPPRESS)
      _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
      _parsed_args = vars(_parser.parse_args())
      _output_files = _parsed_args.pop("_output_paths", [])

      _outputs = print_and_return_number(**_parsed_args)

      _outputs = [_outputs]

      _output_serializers = [
          _serialize_int,

      ]

      import os
      for idx, output_file in enumerate(_output_files):
          try:
              os.makedirs(os.path.dirname(output_file))
          except OSError:
              pass
          with open(output_file, 'w') as f:
              f.write(_output_serializers[idx](_outputs[idx]))
    args:
    - --number
    - {inputValue: number}
    - '----output-paths'
    - {outputPath: Output}
  • 코드 실행 시 print_and_return_number.yaml 파일에 위 내용으로 생성된다.
from kfp.components import load_component_from_file

print_and_return_number = load_component_from_file("print_and_return_number.yaml")
  • 생성된 파일을 공유해서 kfp.components.load_component_from_file을 통해 pipeline에서 사용이 가능해진다.

How Kubeflow executes component

    1. docker pull <image>
    • 정의된 컴포넌트의 실행 환경 정보가 담긴 이미지를 pull
    1. run command : pull 이미지에서 component contents 실행
1. docker pull python:3.7
2. print(number)
  • 이전 코드를 예시로 들면 위 처럼 동작한다.

InputPath/OutputPath

def train_from_csv(
    train_data_path: str,
    train_target_path: str,
    model_path: str,
    kernel: str,
):
    import dill
    import pandas as pd

    from sklearn.svm import SVC

    train_data = pd.read_csv(train_data_path)
    train_target = pd.read_csv(train_target_path)

    clf = SVC(kernel=kernel)
    clf.fit(train_data, train_target)

    with open(model_path, mode="wb") as file_writer:
        dill.dump(clf, file_writer)
  • 컴포넌트의 입력과 출력에대해 Type Hint가 필요하다는 것은 알고 있다. 하지만 json에서 사용할 수 없는 Type인 dataframe, model과 같이 복잡한 객체들은 어떻게 해야할까?
  • 컴포넌트들은 기본적으로 컨테이너 위에서 동작하므로 서로 독립적으로 실행되어 메모리를 공유하고 있지 않다. 이는 컴포넌트 간 넘겨줄 수 있는 정보는 json으로만 가능하다는 것이다.
  • 따라서, DataFrame, Model 같은 것은 json 형식으로 변활 할 수 없는 타입 객체이며 다른 방법을 통해야한다. 이를 해결하기위해 kubeflow에서는 json-serializable하지 않은 타입의 객체는 메모리 대신 파일에 데이터를 저장한 뒤 그 파일을 이용해 정보를 전달한다.
    • 저장된 파일의 경로는 str이어서 컴포넌트 간에 전달이 가능하기 때문이다.
  • kubeflow에서는 minio를 통해 파일을 저장하므로 User는 실행하기 전에는 파일의 경로를 알 수 없다. 이를 위해 kubeflow에서는 입출력 경로 매직을 제공하는데, InputPath, OutputPath가 그것들이다.
from kfp.components import InputPath, OutputPath

def train_from_csv(
    train_data_path: InputPath("csv"),
    train_target_path: InputPath("csv"),
    model_path: OutputPath("dill"),
    kernel: str,
):
    import dill
    import pandas as pd

    from sklearn.svm import SVC

    train_data = pd.read_csv(train_data_path)
    train_target = pd.read_csv(train_target_path)

    clf = SVC(kernel=kernel)
    clf.fit(train_data, train_target)

    with open(model_path, mode="wb") as file_writer:
        dill.dump(clf, file_writer)
  • 데이터를 생성하고 반환하는 컴포넌트에서는 data_path: OutputPath(), 데이터를 받는 컴포넌트에서는 data_path: InputPath() 이런식으로 argument를 생성하면 된다.
  • 위 처럼 만든 파이프라인에서 서로 연결하면 kubeflow에서 필요한 경로를 자동으로 생성 후 입력해주므로 유저는 경로를 신경쓰지 않고 컴포넌트 간의 관계만 신경쓰면 된다.
  • InputPath, OutputPath에 입력한 string은 입력 또는 출력하고자 하는 파일의 format이다.
    • 해당 format을 입력한다고, 강제되는 것은 아니다.
    • 고정된 file format이 아니면 입력하지 않으면된다. (Any와 같은 역할)

Convert to Kubeflow format

from kfp.components import InputPath, OutputPath, create_component_from_func


@create_component_from_func
def train_from_csv(
    train_data_path: InputPath("csv"),
    train_target_path: InputPath("csv"),
    model_path: OutputPath("dill"),
    kernel: str,
):
    import dill
    import pandas as pd

    from sklearn.svm import SVC

    train_data = pd.read_csv(train_data_path)
    train_target = pd.read_csv(train_target_path)

    clf = SVC(kernel=kernel)
    clf.fit(train_data, train_target)

    with open(model_path, mode="wb") as file_writer:
        dill.dump(clf, file_writer)

Rule to use InputPath/OutputPath

  • 위 두 argument는 파이프라인으로 작섷알 때 지켜야하는 규칙이 있다.

Load Data Component

  • 데이터를 생성하는 Component
from functools import partial

from kfp.components import InputPath, OutputPath, create_component_from_func


@create_component_from_func
def load_iris_data(
    data_path: OutputPath("csv"),
    target_path: OutputPath("csv"),
):
    import pandas as pd
    from sklearn.datasets import load_iris

    iris = load_iris()

    data = pd.DataFrame(iris["data"], columns=iris["feature_names"])
    target = pd.DataFrame(iris["target"], columns=["target"])

    data.to_csv(data_path, index=False)
    target.to_csv(target_path, index=False)

Write Pipeline

from kfp.dsl import pipeline


@pipeline(name="complex_pipeline")
def complex_pipeline(kernel: str):
    iris_data = load_iris_data()
    model = train_from_csv(
        train_data=iris_data.outputs["data"],
        train_target=iris_data.outputs["target"],
        kernel=kernel,
    )
  • argument 중 경로와 관련된 것들에 대해 _path 접미사가 모두 사라졌다.
    • iris_data.ouputs["data_path"]가 아닌 iris_data.outputs["data"]로 접근
  • 이는 kubeflow에서 정한 법칙으로 InputPath, OutputPath로 생성된 경로는 파이프라인에서 접근할 때 _path 접미사를 생략하여 접근한다.

Component Environment

  • 이전에 만든 코드대로 Pipeline 실행 시 실패하게 된다.

Convert to kubeflow format

from kfp.components import InputPath, OutputPath, create_component_from_func


@create_component_from_func
def train_from_csv(
    train_data_path: InputPath("csv"),
    train_target_path: InputPath("csv"),
    model_path: OutputPath("dill"),
    kernel: str,
):
    import dill
    import pandas as pd

    from sklearn.svm import SVC

    train_data = pd.read_csv(train_data_path)
    train_target = pd.read_csv(train_target_path)

    clf = SVC(kernel=kernel)
    clf.fit(train_data, train_target)

    with open(model_path, mode="wb") as file_writer:
        dill.dump(clf, file_writer)


if __name__ == "__main__":
    train_from_csv.component_spec.save("train_from_csv.yaml")
  • 위 스크립트를 실행한다.
name: Train from csv
inputs:
- {name: train_data, type: csv}
- {name: train_target, type: csv}
- {name: kernel, type: String}
outputs:
- {name: model, type: dill}
implementation:
  container:
    image: python:3.7
    command:
    - sh
    - -ec
    - |
      program_path=$(mktemp)
      printf "%s" "$0" > "$program_path"
      python3 -u "$program_path" "$@"
    - |
      def _make_parent_dirs_and_return_path(file_path: str):
          import os
          os.makedirs(os.path.dirname(file_path), exist_ok=True)
          return file_path

      def train_from_csv(
          train_data_path,
          train_target_path,
          model_path,
          kernel,
      ):
          import dill
          import pandas as pd

          from sklearn.svm import SVC

          train_data = pd.read_csv(train_data_path)
          train_target = pd.read_csv(train_target_path)

          clf = SVC(kernel=kernel)
          clf.fit(train_data, train_target)

          with open(model_path, mode="wb") as file_writer:
              dill.dump(clf, file_writer)

      import argparse
      _parser = argparse.ArgumentParser(prog='Train from csv', description='')
      _parser.add_argument("--train-data", dest="train_data_path", type=str, required=True, default=argparse.SUPPRESS)
      _parser.add_argument("--train-target", dest="train_target_path", type=str, required=True, default=argparse.SUPPRESS)
      _parser.add_argument("--kernel", dest="kernel", type=str, required=True, default=argparse.SUPPRESS)
      _parser.add_argument("--model", dest="model_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
      _parsed_args = vars(_parser.parse_args())

      _outputs = train_from_csv(**_parsed_args)
    args:
    - --train-data
    - {inputPath: train_data}
    - --train-target
    - {inputPath: train_target}
    - --kernel
    - {inputValue: kernel}
    - --model
    - {outputPath: model}
  • 스크립트 실행 시 위와 같은 내용의 train_from_csv.yaml 파일을 얻을 수 있다.
  • 이 컴포넌트는 2가지 실행이 이뤄진다.
      1. docker pull python:3.7
      1. run command
  • 컴포넌트 래퍼가 실행되는 방식때문에 이전 코드는 오류가 발생한다.
  • Kubeflow는 쿠버네티스를 이용하므로 컴포넌트 래퍼는 각각 독립된 컨테이너 위에서 Component content를 실행한다 !
    • train_from_csv.yaml은 python:3.7 image에서 실행된다. 이 이미지에는 dill, pandans, sklearn이 설치되어 있지 않다.

Package 추가 방법

def create_component_from_func(
    func: Callable,
    output_component_file: Optional[str] = None,
    base_image: Optional[str] = None,
    packages_to_install: List[str] = None,
    annotations: Optional[Mapping[str, str]] = None,
):
  • Kubeflow 변환 과정에서 두 가지 방법을 통해 패키지를 추가할 수 있다.
      1. base_image 사용
      1. package_to_install 사용
  • 위 컴포넌트 컴파일 시 사용했던 함수 create_component_from_func가 받는 argument를 나열한 것이다.
    • func : component로 만들 component wrapper 함수
    • base_image : component wrapper가 실행할 image
    • packages_to_install : component에서 사용해서 추가로 설치해야하는 패키지

1. base_image

  • Component가 실행되는 순서
      1. docker pull base_image
      1. pip install packages_to_install
      1. run command
  • base_image에 필요한 패키지가 모두 설치되어 있으면 따로 설치 없이 사용 가능하다.
    • docker file에 추가해서 다시 build 하든 하면된다.
from functools import partial
from kfp.components import InputPath, OutputPath, create_component_from_func

@partial(
    create_component_from_func,
    base_image="ghcr.io/mlops-for-all/base-image:latest",
)
def train_from_csv(
    train_data_path: InputPath("csv"),
    train_target_path: InputPath("csv"),
    model_path: OutputPath("dill"),
    kernel: str,
):
    import dill
    import pandas as pd

    from sklearn.svm import SVC

    train_data = pd.read_csv(train_data_path)
    train_target = pd.read_csv(train_target_path)

    clf = SVC(kernel=kernel)
    clf.fit(train_data, train_target)

    with open(model_path, mode="wb") as file_writer:
        dill.dump(clf, file_writer)

if __name__ == "__main__":
    train_from_csv.component_spec.save("train_from_csv.yaml")
name: Train from csv
inputs:
- {name: train_data, type: csv}
- {name: train_target, type: csv}
- {name: kernel, type: String}
outputs:
- {name: model, type: dill}
implementation:
  container:
    image: ghcr.io/mlops-for-all/base-image:latest
    command:
    - sh
    - -ec
    - |
      program_path=$(mktemp)
      printf "%s" "$0" > "$program_path"
      python3 -u "$program_path" "$@"
    - |
      def _make_parent_dirs_and_return_path(file_path: str):
          import os
          os.makedirs(os.path.dirname(file_path), exist_ok=True)
          return file_path

      def train_from_csv(
          train_data_path,
          train_target_path,
          model_path,
          kernel,
      ):
          import dill
          import pandas as pd

          from sklearn.svm import SVC

          train_data = pd.read_csv(train_data_path)
          train_target = pd.read_csv(train_target_path)

          clf = SVC(kernel=kernel)
          clf.fit(train_data, train_target)

          with open(model_path, mode="wb") as file_writer:
              dill.dump(clf, file_writer)

      import argparse
      _parser = argparse.ArgumentParser(prog='Train from csv', description='')
      _parser.add_argument("--train-data", dest="train_data_path", type=str, required=True, default=argparse.SUPPRESS)
      _parser.add_argument("--train-target", dest="train_target_path", type=str, required=True, default=argparse.SUPPRESS)
      _parser.add_argument("--kernel", dest="kernel", type=str, required=True, default=argparse.SUPPRESS)
      _parser.add_argument("--model", dest="model_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
      _parsed_args = vars(_parser.parse_args())

      _outputs = train_from_csv(**_parsed_args)
    args:
    - --train-data
    - {inputPath: train_data}
    - --train-target
    - {inputPath: train_target}
    - --kernel
    - {inputValue: kernel}
    - --model
    - {outputPath: model}
  • 스크립트 실행 후 yaml 파일을 보면 image가 바뀐 것을 확인할 수 있다.

2. packages_to_install

  • 패키지 추가될 때마다 docker image를 계속 생성하는 작업은 시간이 많이 소요된다.
  • packages_to_install argument로 패키지를 컨테이너에 쉽게 추가 가능하다.
from functools import partial
from kfp.components import InputPath, OutputPath, create_component_from_func

@partial(
    create_component_from_func,
    packages_to_install=["dill==0.3.4", "pandas==1.3.4", "scikit-learn==1.0.1"],
)
def train_from_csv(
    train_data_path: InputPath("csv"),
    train_target_path: InputPath("csv"),
    model_path: OutputPath("dill"),
    kernel: str,
):
    import dill
    import pandas as pd

    from sklearn.svm import SVC

    train_data = pd.read_csv(train_data_path)
    train_target = pd.read_csv(train_target_path)

    clf = SVC(kernel=kernel)
    clf.fit(train_data, train_target)

    with open(model_path, mode="wb") as file_writer:
        dill.dump(clf, file_writer)

if __name__ == "__main__":
    train_from_csv.component_spec.save("train_from_csv.yaml")
name: Train from csv
inputs:
- {name: train_data, type: csv}
- {name: train_target, type: csv}
- {name: kernel, type: String}
outputs:
- {name: model, type: dill}
implementation:
  container:
    image: python:3.7
    command:
    - sh
    - -c
    - (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
      'dill==0.3.4' 'pandas==1.3.4' 'scikit-learn==1.0.1' || PIP_DISABLE_PIP_VERSION_CHECK=1
      python3 -m pip install --quiet --no-warn-script-location 'dill==0.3.4' 'pandas==1.3.4'
      'scikit-learn==1.0.1' --user) && "$0" "$@"
    - sh
    - -ec
    - |
      program_path=$(mktemp)
      printf "%s" "$0" > "$program_path"
      python3 -u "$program_path" "$@"
    - |
      def _make_parent_dirs_and_return_path(file_path: str):
          import os
          os.makedirs(os.path.dirname(file_path), exist_ok=True)
          return file_path

      def train_from_csv(
          train_data_path,
          train_target_path,
          model_path,
          kernel,
      ):
          import dill
          import pandas as pd

          from sklearn.svm import SVC

          train_data = pd.read_csv(train_data_path)
          train_target = pd.read_csv(train_target_path)

          clf = SVC(kernel=kernel)
          clf.fit(train_data, train_target)

          with open(model_path, mode="wb") as file_writer:
              dill.dump(clf, file_writer)

      import argparse
      _parser = argparse.ArgumentParser(prog='Train from csv', description='')
      _parser.add_argument("--train-data", dest="train_data_path", type=str, required=True, default=argparse.SUPPRESS)
      _parser.add_argument("--train-target", dest="train_target_path", type=str, required=True, default=argparse.SUPPRESS)
      _parser.add_argument("--kernel", dest="kernel", type=str, required=True, default=argparse.SUPPRESS)
      _parser.add_argument("--model", dest="model_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
      _parsed_args = vars(_parser.parse_args())

      _outputs = train_from_csv(**_parsed_args)
    args:
    - --train-data
    - {inputPath: train_data}
    - --train-target
    - {inputPath: train_target}
    - --kernel
    - {inputValue: kernel}
    - --model
    - {outputPath: model}
  • 스크립트 실행 후의 yaml파일이다.
  • 실행 순서를 살펴보자.
      1. docker pull python:3.7
      1. pip install ....
      1. run command
    command:
    - sh
    - -c
    - (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
      'dill==0.3.4' 'pandas==1.3.4' 'scikit-learn==1.0.1' || PIP_DISABLE_PIP_VERSION_CHECK=1
      python3 -m pip install --quiet --no-warn-script-location 'dill==0.3.4' 'pandas==1.3.4'
      'scikit-learn==1.0.1' --user) && "$0" "$@"
  • 위 구문이 자동으로 추가되어 필요한 패키지가 설치되므로 오류 없이 정상적으로 실행된다.
profile
OnePunchLotto

0개의 댓글