Component
- 컴포넌트 작성을 위해서는 2가지 내용을 작성해야 한다.
- Component Contents 작성
- Component Wrapper 작성
Write
Component Contents
- 흔히 작성하는 Python Code와 다르지 않다.
print(number)
- 숫자를 입력받고 출력한 뒤 반환하는 컴포넌트의 예시다.
- 하지만 number라는 변수가 정의되어있지 않아 오류가 날 것이다.
- 이런 Component contents에서 필요한 값들은 Config로 정의한다.
- 컴포넌트 콘텐츠를 실행하기 위해 필요한 Config는 컴포넌트 래퍼에서 전달 되어야 한다.
Component Wrapper
Define a standalone Python function
def print_and_return_number(number: int) -> int:
print(number)
return number
- 콘텐츠에서 필요한 Config를 래퍼의 argument로 추가한다. Type Hint도 작성해야된다.
- Kubeflow에서는 Pipeline을 Kubeflow Format으로 변환할할 때 컴포넌트 간 연결에서 정해진 입력과 출력의 타입이 일치하는지 체크한다.
- 만약, 컴포넌트가 필요로 하는 입력과 다른 컴포넌트로부터 전달받은 출력의 포맷이 일치하지 않을 경우 파이프라인 생성을 할 수 없다.
- 위 처럼 argument + type hint, 반환 type hint를 통해 Component Wrapper를 완성한다.
from typing import NamedTuple
def divide_and_return_number(
number: int,
) -> NamedTuple("DivideOutputs", [("quotient", int), ("remainder", int)]):
from collections import namedtuple
quotient, remainder = divmod(number, 2)
print("quotient is", quotient)
print("remainder is", remainder)
divide_outputs = namedtuple(
"DivideOutputs",
[
"quotient",
"remainder",
],
)
return divide_outputs(quotient, remainder)
- 단일 값이 아닌 여러 값을 반환하려면,
collentions.namedtuple
을 이용해야 한다.
- 위 예시는 숫자를 2로 나눈 몫과 나머지를 반환하는 컴포넌트이다.
from kfp.components import create_component_from_func
@create_component_from_func
def print_and_return_number(number: int) -> int:
print(number)
return number
- 작성한 Component를 Kubeflow에서 사용할 수 있는 format으로 변환한다.
kfp.components.create_component_from_func
사용
Share component with yaml file
- python code로 공유할 수 없는 경우, yaml 파일로 컴포넌트를 공유해 사용할 수 있다.
from kfp.components import create_component_from_func
@create_component_from_func
def print_and_return_number(number: int) -> int:
print(number)
return number
if __name__ == "__main__":
print_and_return_number.component_spec.save("print_and_return_number.yaml")
- 컴포넌트를 yaml 파일로 변환하는 코드이다.
name: Print and return number
inputs:
- {name: number, type: Integer}
outputs:
- {name: Output, type: Integer}
implementation:
container:
image: python:3.7
command:
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def print_and_return_number(number):
print(number)
return number
def _serialize_int(int_value: int) -> str:
if isinstance(int_value, str):
return int_value
if not isinstance(int_value, int):
raise TypeError('Value "{}" has type "{}" instead of int.'.format(str(int_value), str(type(int_value))))
return str(int_value)
import argparse
_parser = argparse.ArgumentParser(prog='Print and return number', description='')
_parser.add_argument("--number", dest="number", type=int, required=True, default=argparse.SUPPRESS)
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = print_and_return_number(**_parsed_args)
_outputs = [_outputs]
_output_serializers = [
_serialize_int,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
args:
- --number
- {inputValue: number}
- '----output-paths'
- {outputPath: Output}
- 코드 실행 시
print_and_return_number.yaml
파일에 위 내용으로 생성된다.
from kfp.components import load_component_from_file
print_and_return_number = load_component_from_file("print_and_return_number.yaml")
- 생성된 파일을 공유해서
kfp.components.load_component_from_file
을 통해 pipeline에서 사용이 가능해진다.
How Kubeflow executes component
docker pull <image>
- 정의된 컴포넌트의 실행 환경 정보가 담긴 이미지를 pull
- run
command
: pull 이미지에서 component contents 실행
1. docker pull python:3.7
2. print(number)
def train_from_csv(
train_data_path: str,
train_target_path: str,
model_path: str,
kernel: str,
):
import dill
import pandas as pd
from sklearn.svm import SVC
train_data = pd.read_csv(train_data_path)
train_target = pd.read_csv(train_target_path)
clf = SVC(kernel=kernel)
clf.fit(train_data, train_target)
with open(model_path, mode="wb") as file_writer:
dill.dump(clf, file_writer)
- 컴포넌트의 입력과 출력에대해 Type Hint가 필요하다는 것은 알고 있다. 하지만 json에서 사용할 수 없는 Type인 dataframe, model과 같이 복잡한 객체들은 어떻게 해야할까?
- 컴포넌트들은 기본적으로 컨테이너 위에서 동작하므로 서로 독립적으로 실행되어 메모리를 공유하고 있지 않다. 이는 컴포넌트 간 넘겨줄 수 있는 정보는 json으로만 가능하다는 것이다.
- 따라서, DataFrame, Model 같은 것은 json 형식으로 변활 할 수 없는 타입 객체이며 다른 방법을 통해야한다. 이를 해결하기위해 kubeflow에서는 json-serializable하지 않은 타입의 객체는 메모리 대신 파일에 데이터를 저장한 뒤 그 파일을 이용해 정보를 전달한다.
- 저장된 파일의 경로는 str이어서 컴포넌트 간에 전달이 가능하기 때문이다.
- kubeflow에서는 minio를 통해 파일을 저장하므로 User는 실행하기 전에는 파일의 경로를 알 수 없다. 이를 위해 kubeflow에서는 입출력 경로 매직을 제공하는데, InputPath, OutputPath가 그것들이다.
from kfp.components import InputPath, OutputPath
def train_from_csv(
train_data_path: InputPath("csv"),
train_target_path: InputPath("csv"),
model_path: OutputPath("dill"),
kernel: str,
):
import dill
import pandas as pd
from sklearn.svm import SVC
train_data = pd.read_csv(train_data_path)
train_target = pd.read_csv(train_target_path)
clf = SVC(kernel=kernel)
clf.fit(train_data, train_target)
with open(model_path, mode="wb") as file_writer:
dill.dump(clf, file_writer)
- 데이터를 생성하고 반환하는 컴포넌트에서는
data_path: OutputPath()
, 데이터를 받는 컴포넌트에서는 data_path: InputPath()
이런식으로 argument를 생성하면 된다.
- 위 처럼 만든 파이프라인에서 서로 연결하면 kubeflow에서 필요한 경로를 자동으로 생성 후 입력해주므로 유저는 경로를 신경쓰지 않고 컴포넌트 간의 관계만 신경쓰면 된다.
- InputPath, OutputPath에 입력한 string은 입력 또는 출력하고자 하는 파일의 format이다.
- 해당 format을 입력한다고, 강제되는 것은 아니다.
- 고정된 file format이 아니면 입력하지 않으면된다. (Any와 같은 역할)
from kfp.components import InputPath, OutputPath, create_component_from_func
@create_component_from_func
def train_from_csv(
train_data_path: InputPath("csv"),
train_target_path: InputPath("csv"),
model_path: OutputPath("dill"),
kernel: str,
):
import dill
import pandas as pd
from sklearn.svm import SVC
train_data = pd.read_csv(train_data_path)
train_target = pd.read_csv(train_target_path)
clf = SVC(kernel=kernel)
clf.fit(train_data, train_target)
with open(model_path, mode="wb") as file_writer:
dill.dump(clf, file_writer)
- 위 두 argument는 파이프라인으로 작섷알 때 지켜야하는 규칙이 있다.
Load Data Component
from functools import partial
from kfp.components import InputPath, OutputPath, create_component_from_func
@create_component_from_func
def load_iris_data(
data_path: OutputPath("csv"),
target_path: OutputPath("csv"),
):
import pandas as pd
from sklearn.datasets import load_iris
iris = load_iris()
data = pd.DataFrame(iris["data"], columns=iris["feature_names"])
target = pd.DataFrame(iris["target"], columns=["target"])
data.to_csv(data_path, index=False)
target.to_csv(target_path, index=False)
Write Pipeline
from kfp.dsl import pipeline
@pipeline(name="complex_pipeline")
def complex_pipeline(kernel: str):
iris_data = load_iris_data()
model = train_from_csv(
train_data=iris_data.outputs["data"],
train_target=iris_data.outputs["target"],
kernel=kernel,
)
- argument 중 경로와 관련된 것들에 대해
_path
접미사가 모두 사라졌다.
iris_data.ouputs["data_path"]
가 아닌 iris_data.outputs["data"]
로 접근
- 이는 kubeflow에서 정한 법칙으로
InputPath
, OutputPath
로 생성된 경로는 파이프라인에서 접근할 때 _path
접미사를 생략하여 접근한다.
Component Environment
- 이전에 만든 코드대로 Pipeline 실행 시 실패하게 된다.
from kfp.components import InputPath, OutputPath, create_component_from_func
@create_component_from_func
def train_from_csv(
train_data_path: InputPath("csv"),
train_target_path: InputPath("csv"),
model_path: OutputPath("dill"),
kernel: str,
):
import dill
import pandas as pd
from sklearn.svm import SVC
train_data = pd.read_csv(train_data_path)
train_target = pd.read_csv(train_target_path)
clf = SVC(kernel=kernel)
clf.fit(train_data, train_target)
with open(model_path, mode="wb") as file_writer:
dill.dump(clf, file_writer)
if __name__ == "__main__":
train_from_csv.component_spec.save("train_from_csv.yaml")
name: Train from csv
inputs:
- {name: train_data, type: csv}
- {name: train_target, type: csv}
- {name: kernel, type: String}
outputs:
- {name: model, type: dill}
implementation:
container:
image: python:3.7
command:
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def train_from_csv(
train_data_path,
train_target_path,
model_path,
kernel,
):
import dill
import pandas as pd
from sklearn.svm import SVC
train_data = pd.read_csv(train_data_path)
train_target = pd.read_csv(train_target_path)
clf = SVC(kernel=kernel)
clf.fit(train_data, train_target)
with open(model_path, mode="wb") as file_writer:
dill.dump(clf, file_writer)
import argparse
_parser = argparse.ArgumentParser(prog='Train from csv', description='')
_parser.add_argument("--train-data", dest="train_data_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--train-target", dest="train_target_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--kernel", dest="kernel", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--model", dest="model_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = train_from_csv(**_parsed_args)
args:
- --train-data
- {inputPath: train_data}
- --train-target
- {inputPath: train_target}
- --kernel
- {inputValue: kernel}
- --model
- {outputPath: model}
- 스크립트 실행 시 위와 같은 내용의
train_from_csv.yaml
파일을 얻을 수 있다.
- 이 컴포넌트는 2가지 실행이 이뤄진다.
docker pull python:3.7
- run
command
- 컴포넌트 래퍼가 실행되는 방식때문에 이전 코드는 오류가 발생한다.
- Kubeflow는 쿠버네티스를 이용하므로 컴포넌트 래퍼는 각각 독립된 컨테이너 위에서 Component content를 실행한다 !
- train_from_csv.yaml은
python:3.7
image에서 실행된다. 이 이미지에는 dill, pandans, sklearn
이 설치되어 있지 않다.
Package 추가 방법
def create_component_from_func(
func: Callable,
output_component_file: Optional[str] = None,
base_image: Optional[str] = None,
packages_to_install: List[str] = None,
annotations: Optional[Mapping[str, str]] = None,
):
- Kubeflow 변환 과정에서 두 가지 방법을 통해 패키지를 추가할 수 있다.
base_image
사용
package_to_install
사용
- 위 컴포넌트 컴파일 시 사용했던 함수
create_component_from_func
가 받는 argument를 나열한 것이다.
- func : component로 만들 component wrapper 함수
- base_image : component wrapper가 실행할 image
- packages_to_install : component에서 사용해서 추가로 설치해야하는 패키지
1. base_image
- Component가 실행되는 순서
docker pull base_image
pip install packages_to_install
- run
command
- base_image에 필요한 패키지가 모두 설치되어 있으면 따로 설치 없이 사용 가능하다.
- docker file에 추가해서 다시 build 하든 하면된다.
from functools import partial
from kfp.components import InputPath, OutputPath, create_component_from_func
@partial(
create_component_from_func,
base_image="ghcr.io/mlops-for-all/base-image:latest",
)
def train_from_csv(
train_data_path: InputPath("csv"),
train_target_path: InputPath("csv"),
model_path: OutputPath("dill"),
kernel: str,
):
import dill
import pandas as pd
from sklearn.svm import SVC
train_data = pd.read_csv(train_data_path)
train_target = pd.read_csv(train_target_path)
clf = SVC(kernel=kernel)
clf.fit(train_data, train_target)
with open(model_path, mode="wb") as file_writer:
dill.dump(clf, file_writer)
if __name__ == "__main__":
train_from_csv.component_spec.save("train_from_csv.yaml")
name: Train from csv
inputs:
- {name: train_data, type: csv}
- {name: train_target, type: csv}
- {name: kernel, type: String}
outputs:
- {name: model, type: dill}
implementation:
container:
image: ghcr.io/mlops-for-all/base-image:latest
command:
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def train_from_csv(
train_data_path,
train_target_path,
model_path,
kernel,
):
import dill
import pandas as pd
from sklearn.svm import SVC
train_data = pd.read_csv(train_data_path)
train_target = pd.read_csv(train_target_path)
clf = SVC(kernel=kernel)
clf.fit(train_data, train_target)
with open(model_path, mode="wb") as file_writer:
dill.dump(clf, file_writer)
import argparse
_parser = argparse.ArgumentParser(prog='Train from csv', description='')
_parser.add_argument("--train-data", dest="train_data_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--train-target", dest="train_target_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--kernel", dest="kernel", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--model", dest="model_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = train_from_csv(**_parsed_args)
args:
- --train-data
- {inputPath: train_data}
- --train-target
- {inputPath: train_target}
- --kernel
- {inputValue: kernel}
- --model
- {outputPath: model}
- 스크립트 실행 후 yaml 파일을 보면 image가 바뀐 것을 확인할 수 있다.
2. packages_to_install
- 패키지 추가될 때마다 docker image를 계속 생성하는 작업은 시간이 많이 소요된다.
packages_to_install
argument로 패키지를 컨테이너에 쉽게 추가 가능하다.
from functools import partial
from kfp.components import InputPath, OutputPath, create_component_from_func
@partial(
create_component_from_func,
packages_to_install=["dill==0.3.4", "pandas==1.3.4", "scikit-learn==1.0.1"],
)
def train_from_csv(
train_data_path: InputPath("csv"),
train_target_path: InputPath("csv"),
model_path: OutputPath("dill"),
kernel: str,
):
import dill
import pandas as pd
from sklearn.svm import SVC
train_data = pd.read_csv(train_data_path)
train_target = pd.read_csv(train_target_path)
clf = SVC(kernel=kernel)
clf.fit(train_data, train_target)
with open(model_path, mode="wb") as file_writer:
dill.dump(clf, file_writer)
if __name__ == "__main__":
train_from_csv.component_spec.save("train_from_csv.yaml")
name: Train from csv
inputs:
- {name: train_data, type: csv}
- {name: train_target, type: csv}
- {name: kernel, type: String}
outputs:
- {name: model, type: dill}
implementation:
container:
image: python:3.7
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'dill==0.3.4' 'pandas==1.3.4' 'scikit-learn==1.0.1' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location 'dill==0.3.4' 'pandas==1.3.4'
'scikit-learn==1.0.1' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def train_from_csv(
train_data_path,
train_target_path,
model_path,
kernel,
):
import dill
import pandas as pd
from sklearn.svm import SVC
train_data = pd.read_csv(train_data_path)
train_target = pd.read_csv(train_target_path)
clf = SVC(kernel=kernel)
clf.fit(train_data, train_target)
with open(model_path, mode="wb") as file_writer:
dill.dump(clf, file_writer)
import argparse
_parser = argparse.ArgumentParser(prog='Train from csv', description='')
_parser.add_argument("--train-data", dest="train_data_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--train-target", dest="train_target_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--kernel", dest="kernel", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--model", dest="model_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = train_from_csv(**_parsed_args)
args:
- --train-data
- {inputPath: train_data}
- --train-target
- {inputPath: train_target}
- --kernel
- {inputValue: kernel}
- --model
- {outputPath: model}
- 스크립트 실행 후의 yaml파일이다.
- 실행 순서를 살펴보자.
docker pull python:3.7
pip install ....
- run
command
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'dill==0.3.4' 'pandas==1.3.4' 'scikit-learn==1.0.1' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location 'dill==0.3.4' 'pandas==1.3.4'
'scikit-learn==1.0.1' --user) && "$0" "$@"
- 위 구문이 자동으로 추가되어 필요한 패키지가 설치되므로 오류 없이 정상적으로 실행된다.