[MLOps]Vertex AI Pipeline(Kubeflow Pipeline)

JONGBAO·2024년 7월 30일
0

1. AI Platform과 ML workflow Pipeline

  • AI Platform은 머신러닝 워크플로우 각 단계에서 모델을 개발하는데 필요한 도구와 환경을 구축하는 수작업을 최소화하고 쉽게 개발하기 위한 기능을 제공한다. (Pipeline보다 큰 범위)
  • 대표적인 AI Platform은 구글 버텍스 AI(Google Vertex AI), 아마존 세이지메이커(Amazon SageMaker), 애저 머신러닝(Azure Machine Learning) 등이 있으며, 오픈소스로는 쿠브플로우(Kubeflow) 등이 있다.
  • Kubeflow는 머신러닝 워크플로우의 모델 학습 ~ 배포까지 작업에 필요한 도구와 환경을 쿠버네티스 위에서 쿠브플로우 컴포넌트 형태로 제공하며, Kubeflow Pipelines(KFP)는 ML workflow를 구축하고 배포하기 위한 ML Workflow Orchestration 도구이다.
  • Vertex AI Pipelines은 Google Cloud의 MLOps 솔루션 중 하나이며, ML Pipeline은 ML Workflow를 자동화하는 것이다.

  • Data Extraction -> Data Validation -> Data Preparation -> Model Training -> Model Evaluation -> Model Validation의 workflow를 자동화하고 MLOps의 Pipeline의 또 다른 구성요소들인 Feature Store, Metadata Store 등의 다양한 workflow orchestration을 파이프라인을 통해 수행하게 된다.
  • Pipeline 구성을 통해 반복되는 작업을 자동화할 수 있고, 재사용할 수 있다. 또한 파이프라인의 산출물을 Metadata Store에 저장하여 모델 버전별 성능 비교 및 모니터링이 가능하다. 각 step별 의존성을 줄이고 효율적으로 컨테이너를 갈아끼울 수 있다.

2. Vertex AI Pipeline

1) AI Platform Pipelines (구버전)

  • Kubernetes Cluster 필요

2) Vertex AI Pipelines (신버전)

  • Kubernetes Cluster 관리를 모두 내부 프로젝트에서 자동화
  • Tensorflow Extended (TFX) 보다 Kubeflow Pipelines SDK 권장
    • kubeflow cluster를 운영할 필요가 없고, custom component 외에 제공하는 GCPC(Google Cloud Pipeline Component)로 workflow를 상대적으로 쉽게 구현할 수 있음

3. Pipeline Workflow

  • Pipeline Step
    • input
    • output
    • container image
  • Pipeline에서 intput-output 연결을 기반으로 workflow graph DAG(directed acyclic grapth)를 만들어 step을 수행함
  • 각 파이프라인은 metadata와 artifact(model, dataset 등)을 생성하는데 이를 통해 가장 정확도가 높은 모델을 만드는 pipeline이 어떤 것인지 확인할 수 있고, hyperparameter도 확인할 수 있음

4. Pipeline 구성 코드

4-1. Pipeline 정의

from kfp.v2 import dsl

# Pipelines Workflow 정의
@dsl.pipeline(
    name='pipeline-test',
    description='pipeline test for ml workflow.',
    pipeline_root=PIPELINE_ROOT
)
# 하단의 component는 예시
def pipeline():
	# 각 단계에서의 output을 다음 component의 input으로 할당
	get_data_op = get_data()
	get_preprocessing_data_op = get_preprocessing_data(input_dataset = get_data_op.output)
    get_train_dataset_op = get_train_dataset(input_dataset = 	get_preprocessing_data_op.outputs['transformed_dataset'])
    train_causalimpact_op = train_causalimpact(train_dataset = get_train_dataset_op.outputs['train_dataset'],
                                                   index_dataset = get_preprocessing_data_op.outputs['index_dataset'],
                                                   period_dataset = get_train_dataset_op.outputs['period_dataset'])
	get_transform_bq_load_dataset_op = get_transform_bq_load_dataset(result_dataset = train_causalimpact_op.output)

4-2. YAML file로 Compile

  • json file로도 compile이 가능함
from kfp import compiler
compiler.Compiler().compile(
                            pipeline_func=pipeline,
                            package_path='pipeline_test.yaml'
                            )

4-3. Pipeline 실행

import google.cloud.aiplatform as aip
# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
        project=project_id,
        location=PROJECT_REGION,
        )
# Prepare the pipeline job
job = aip.PipelineJob(
                      display_name="pipeline-test",
                      template_path="pipeline_test.yaml",
                      pipeline_root=pipeline_root_path
                      )
job.submit()

5 Kubeflow Components

  • Component는 input과 user가 정의한 logic으로 구성되고 output을 생성할 수 있음. Component의 인스턴스화 -> task
  • KFP는 두가지의 Component 구성 방법을 제공함

5-1. Lightweight Python Components

  • @dsl.component decorater로 정의
  • Requirement
    • Type annotations: parameter와 artifacts에 대한 type annotation이 필요
    • Hermetic: function body 외에서 정의된 symbol은 참조하지 않음
# non-example! 오류 예시
invalid_val = 5
@dsl.component
def error_func(a: int) -> int:
  """Fails at runtime."""
  return invalid_val * a

5-2. Containerized Python Components

  • @dsl.container_component decorater로 정의
  • Lightweight Python Components와 달리 image, command, args를 직접 지정할 수 있음
  • dsl.ContainerSpec을 return
from kfp import dsl

@dsl.container_component
def say_hello():
  return dsl.ContainerSpec(image='alpine', command=['echo'], args=['Hello'])

0개의 댓글