안녕하세요!
이번엔 Kubeflow 파이프라인을 개발할 때 자주 사용했던 kfp 모듈에 대해 알아보고자 합니다.
SDK라고도 하고, 파이썬 입장에서 보면 패키지이기도 합니다.
파이프라인을 컴파일할 때 주로 사용하지만, sdk만으로 파이프라인을 컴파일해서 업로드하고, 리스트를 조회하고 삭제하는 등 KubeflowUI를 사용하지 않고서도 리소스를 조작할 수 있다는 장점이 있습니다!
하지만 쿠버네티스 리소스 관련 모든 작업을 지원하지는 않는데, 할 수 있는 것을 차례대로 정리해보고자 합니다.
가상환경을 사용하신다면 자유롭게 가상환경을 구축하고 접속하신 후 진행하시면 됩니다.
다만 python 3.5 버전 이상만 지원하니 3.7버전 이상으로 여유롭게 설정해주세요.
$ pip3 install kfp --upgrade
$ export PATH=$PATH:~/.local/bin
$ which dsl-compile # 해당 위치를 정상적으로 찾아야 잘 설치된 것입니다.
접속 정보를 입력하여 kfp Client와 연결합니다.
저는 로드밸런서 타입으로 생성한 istio-ingressgateway의 external-ip를 통해 Kubeflow UI에 접근할 수 있기 때문에 HOST에 external-ip를 입력해주었습니다.
아래와 같은 방식 말고도, 직접 Kubeflow UI에 접근해서 도구 더 보기 - 개발자 도구 - Application - Storage - Cookies - <localhost:port> - authservice-session
에 생성되어 있는 토큰값을 하드코딩으로 입력해도 되긴 합니다만, 언제 쿠키가 새로 발급될지 모르고 번거로우니 아래처럼 requests로 받아오는 방식을 이용합니다!
import kfp
import requests
USERNAME = "user@example.com"
PASSWORD = "12341234"
NAMESPACE = "kubeflow-user-example-com"
HOST = "http://192.168.0.244:80" # istio-ingressgateway's external-ip created by the load balancer.
session = requests.Session()
response = session.get(HOST)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
}
data = {"login": USERNAME, "password": PASSWORD}
session.post(response.url, headers=headers, data=data)
session_cookie = session.cookies.get_dict()["authservice_session"]
client = kfp.Client(
host=f"{HOST}/pipeline",
namespace=f"{NAMESPACE}",
cookies=f"authservice_session={session_cookie}",
)
print(client.list_pipelines())
# KF UI에 업로드된 파이프라인의 ID 찾기
pipeline_id = client.get_pipeline_id("automl_pipeline")
# KF UI에 업로드된 파이프라인 정보 획득
info_pipeline = client.get_pipeline(pipeline_id=pipeline_id)
info_pipeline
# 실행된 Run 정보를 확인합니다. 실행중이면 {'status': 'Running'}, 종료되었으면 {'status': 'Succeeded'} 입니다.
info_run = client.get_run(run_id="34b2ee3a-1aa6-43cb-a2d8-6a1dab9e24cb")
info_run
# 로컬 컨텍스트 구성에서 사용자 네임스페이스를 확인합니다. 지정되어있지 않으면 공백입니다.
client.get_user_namespace()
# experiment 리스트를 확인합니다.
list_experiments = client.list_experiments()
print(list_experiments)
experiment_id = list_experiments.experiments[0].id
print(experiment_id)
# 해당 id의 파이프라인 버전을 나열합니다.
list_pipeline_versions = client.list_pipeline_versions(pipeline_id=info_pipeline.default_version.id)
list_pipeline_versions
# 모든 파이프라인 정보를 나열합니다.
list_pipelines = client.list_pipelines(page_size=50)
list_pipelines
# 모든 파이프라인 이름만 나열합니다.
for i in range(list_pipelines.total_size):
print(list_pipelines.pipelines[i].default_version.name)
# experiment_id 내에 생성되어 있는 반복 작업 정보를 가져옵니다.
list_recurring_runs = client.list_recurring_runs(experiment_id=experiment_id)
list_recurring_runs
recurring_job_id = list_recurring_runs.jobs[0].id
# recurring_run 세부 정보를 가져옵니다.
info_recurring_run = client.get_recurring_run(job_id=recurring_job_id)
info_recurring_run
# 생성된 run을 나열하고, run id를 모두 가져옵니다.
list_runs = client.list_runs(page_size=50)
for i in range(list_runs.total_size):
print(list_runs.runs[i].id)
# 상태체크같은데 정확이 어떤 상태를 보는건지는 모르겠습니다.
client.get_kfp_healthz()
# KF UI 내 Experiments(KFP) 정보 획득
info_experiment = client.get_experiment(experiment_name="haram", namespace=NAMESPACE)
info_experiment
# 파이프라인을 KF 클러스터에 업로드합니다.(UI에 등록), 같은 pipeline_name으로 중복 업로드는 불가능합니다.
client.upload_pipeline(pipeline_name="kfp_sdk_test", pipeline_package_path="./automl_pipeline.tar.gz", description="the pipeline uploaded by python sdk")
# 파이프라인 버전을 업데이트합니다(UI에 등록)
updated_pipeline = client.upload_pipeline_version(pipeline_name="kfp_sdk_test", pipeline_version_name="v220614_2", pipeline_package_path="./automl_pipeline.tar.gz", description="the pipeline uploaded by python sdk")
updated_pipeline
# 파이프라인을 실행해 run을 생성합니다. job_name=run_name이며, 컴파일된 파이프라인 파일의 경로를 지정합니다.
exec_run = client.run_pipeline(experiment_id=experiment_id, job_name="kfp_skd_test", pipeline_package_path="./automl_pipeline.tar.gz", enable_caching=False, params=None)
exec_run
# Run이 완료될 때까지 기다렸다가 결과를 리턴합니다.
client.wait_for_run_completion(run_id=exec_run.id, timeout=345600)
# 리소스 삭제 관련해서는 쓸만한 메서드가 없습니다.
정리해주셔서 감사합니다 ^^ 많은 도움 받고가요!