AWS EKS 클러스터 기반에서 Kubeflow를 설치하고 pipeline을 생성하는 방법을 알아보겠습니다.
Amazon EKS는 자체 Kubernetes 제어 플레인을 설치 및 운영할 필요 없이 AWS에서 Kubernetes를 쉽게 사용할 수 있도록 해주는 관리형 서비스입니다.
CLI환경에서 Kubernetes를 관리하기 위한 프로그램입니다.
클러스터의 구성, Application의 배포 및 검사, 리소스 관리, 로그 확인 등의 기능이 있습니다.
설치 방법
- 최신 버전의 kubectl를 다운로드
curl -LO https://dl.k8s.io/release/v1.23.0/bin/linux/amd64/kubectl
- kubectl 파일에 실행권한 부여
chmod +x ./kubectl
- kubectl 파일을 바이너리 폴더로 이동
sudo mv ./kubectl /usr/local/bin/$ kubectl
- kubectl 버전 확인
kubectl version --client
AWS 계정 권한으로 AWS Application을 관리하기 위해 설치합니다.
설치
- AWS CLI2 설치
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" unzip awscliv2.zip
- AWS CLI 버전 확인
aws --version
IAM을 사용하여 Kubernetes 클러스터에 인증을 제공하기 위해 설치
# mac os
brew install aws-iam-authenticator
실행권한 부여
chmod +x ./aws-iam-authenticator
바이너리 파일을 $HOME/bin 위치로 이동
mkdir -p $HOME/bin && cp ./aws-iam-authenticator $HOME/bin/aws-iam-authenticator && export PATH=$PATH:$HOME/bin
환경변수 추가
echo 'export PATH=$PATH:$HOME/bin' >> ~/.bashrc
실행 확인
aws-iam-authenticator help
AWS EKS 클러스터 구축을 위한 프로그램입니다.
- 최신버전 설치
curl --silent --location "https://github.com/weaveworks/eksctl/releases/latest/download/eksctl_$(uname -s)_amd64.tar.gz" | tar xz -C /tmp
- 바이너리 파일을 /usr/local/bin 위치로 이동
sudo mv /tmp/eksctl /usr/local/bin
- 버전 확인
eksctl version
$ echo 'yq() {
docker run --rm -i -v "${PWD}":/workdir mikefarah/yq yq "$@"}' | tee -a ~/.bashrc && source ~/.bashrc
brew install jq
echo 'export LBC_VERSION="v2.0.0"' >> ~/.bash_profile
. ~/.bash_profile
Bastion Server가 관리자 권한으로 AWS Application에 접근할 수 있도록 IAM Role을 부여해야 합니다.
export ACCOUNT_ID=$(aws sts get-caller-identity --output text --query Account)
export AWS_REGION=$(curl -s 169.254.169.254/latest/dynamic/instance-identity/document | jq -r '.region')
test -n "$AWS_REGION" && echo AWS_REGION is "$AWS_REGION" || echo AWS_REGION is not set
echo "export ACCOUNT_ID=${ACCOUNT_ID}" | tee -a ~/.bash_profile
echo "export AWS_REGION=${AWS_REGION}" | tee -a ~/.bash_profile
aws configure set default.region ${AWS_REGION}
aws configure get default.region
aws sts get-caller-identity --query Arn | grep eksworkshop-admin-i -q && echo "IAM role valid" || e
해당 포스팅에서는 EC2를 노드로 구성하는 방법으로 진행됩니다.
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
name: kube-test
region: ap-northeast-2
version: '1.20'
nodeGroups:
- name: cpu-nodegroup
instanceType: m5.xlarge
desiredCapacity: 3
minSize: 0
maxSize: 6
volumeSize: 20
ssh:
allow: true
publicKeyPath: '~/.ssh/id_rsa.pub'
주의사항
yaml 파일은 들여쓰기가 중요합니다. 주의해서 작성해주세요.
포스팅 작성시점 기준 EKS에서 지원하는 Kubernetes 최신 버전은 1.21입니다. 1.21 버전은 jwt 호환성 문제가 있습니다. 1.21 버전으로 진행하셔도 좋으나 호환성 문제 해결을 별도로 진행해주셔야 합니다.
Auth Dex 에러 해결 : 참고
클러스터 구성 노드가 낮은 스펙인 경우 추후 kubeflow 설치에 문제가 발생할 수 있습니다.
(t2.micro 인스턴스에서 문제 발생)
ssh-keygen -t rsa
cat .ssh/id_rsa.pub
eksctl create cluster -f cluster.yaml
kubectl get nodes
AWS EKS와 호환이 검증된 kubeflow 1.2버전을 설치합니다.
curl -L -O -J https://github.com/kubeflow/kfctl/releases/download/v1.2.0/kfctl_v1.2.0-0-gbc038f9_linux.tar.gz
tar -xvf kfctl_v1.2.0-0-gbc038f9_linux.tar.gz
mv ./kfctl /usr/local/bin
export PATH=$PATH:"/usr/local/bin"
export CONFIG_URI="https://raw.githubusercontent.com/kubeflow/manifests/v1.2-branch/kfdef/kfctl_aws.v1.2.0.yaml"
export AWS_CLUSTER_NAME=kube-test
mkdir ${AWS_CLUSTER_NAME} && cd ${AWS_CLUSTER_NAME}
wget -O kfctl_aws.yaml $CONFIG_URI
kfctl apply -V -f kfctl_aws.yaml
주의사항
설치까지 수 분이 소요될 수 있습니다.
이전 포스트에서 언급했듯이 너무 낮은 스펙의 EC2를 노드로 설정하면 apply과정에서 에러가 발생할 수 있습니다.
kubectl -n kubeflow get all
kubectl get ingress -n istio-system
kubectl port-forward --address=0.0.0.0 -n istio-system svc/istio-ingressgateway 8080:80
주의사항
포트포워딩 하는 경우 접근해야 하는 ip는 클러스터 ip가 아닌 Bastion Server ip입니다.
기본 계정
계정: admin@kubeflow.org
비밀번호 : 12341234
본 예제에서는 보스턴 집값 예측 모델 학습 및 평가 코드를 사용합니다.
Source Git
scikit-learn에서 제공하는 예제 데이터를 로드한 뒤 train_test_split 함수로 학습 데이터와 테스트 데이터를 나눠 저장하는 작업을 합니다.
from sklearn import datasets
from sklearn.model_selection import train_test_split
import numpy as np
def _preprocess_data():
X, y = datasets.load_boston(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33)
np.save('x_train.npy', X_train)
np.save('x_test.npy', X_test)
np.save('y_train.npy', y_train)
np.save('y_test.npy', y_test)
if __name__ == '__main__':
print('Preprocessing data...')
_preprocess_data()
전처리된 학습 데이터를 입력으로 받아 확률적 경사하강법 모델을 학습 후 모델을 저장하는 작업을 합니다.
import argparse
import joblib
import numpy as np
from sklearn.linear_model import SGDRegressor
def train_model(x_train, y_train):
x_train_data = np.load(x_train)
y_train_data = np.load(y_train)
model = SGDRegressor(verbose=1)
model.fit(x_train_data, y_train_data)
joblib.dump(model, 'model.pkl')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--x_train')
parser.add_argument('--y_train')
args = parser.parse_args()
train_model(args.x_train, args.y_train)
전처리된 테스트 데이터로 학습된 모델의 MSE(Mean Squared Error)를 계산해 파일로 저장합니다.
import argparse
import joblib
import numpy as np
from sklearn.metrics import mean_squared_error
def test_model(x_test, y_test, model_path):
x_test_data = np.load(x_test)
y_test_data = np.load(y_test)
model = joblib.load(model_path)
y_pred = model.predict(x_test_data)
err = mean_squared_error(y_test_data, y_pred)
with open('output.txt', 'a') as f:
f.write(str(err))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--x_test')
parser.add_argument('--y_test')
parser.add_argument('--model')
args = parser.parse_args()
test_model(args.x_test, args.y_test, args.model)
평가된 모델을 deploy하는 작업을 합니다.(보통 S3같은 스토리지에 저장하게되는데 본 예제에서는 다루지 않습니다.)
import argparse
def deploy_model(model_path):
print(f'deploying model {model_path}...')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--model')
args = parser.parse_args()
deploy_model(args.model)
작성한 예제 코드와 Dockerfile을 이용해 Docker Hub에 이미지를 push합니다.
아래 작업 전 Docker Hub 계정 생성 및 Repository 생성이 필요합니다.(DockerHub)
docker build ./boston_housing/preprocess_data --tag bell/boston_pipeline_preprocessing
docker build ./boston_housing/train --tag bell/boston_pipeline_train
docker build ./boston_housing/test --tag bell/boston_pipeline_test
docker build ./boston_housing/deploy_model --tag bell/boston_pipeline_deploy_model
docker push bell/boston_pipeline_preprocessing
docker push bell/boston_pipeline_train
docker push bell/boston_pipeline_test
docker push bell/boston_pipeline_deploy_model
push한 컨테이너 이미지들을 이용하여 pipeline을 구축합니다.
from kfp import dsl
def preprocess_op():
return dsl.ContainerOp(
name='Preprocess Data',
image='bell/boston_pipeline_preprocessing:latest',
arguments=[],
file_outputs={
'x_train': '/app/x_train.npy',
'x_test': '/app/x_test.npy',
'y_train': '/app/y_train.npy',
'y_test': '/app/y_test.npy',
}
)
def train_op(x_train, y_train):
return dsl.ContainerOp(
name='Train Model',
image='bell/boston_pipeline_train:latest',
arguments=[
'--x_train', x_train,
'--y_train', y_train
],
file_outputs={
'model': '/app/model.pkl'
}
)
def test_op(x_test, y_test, model):
return dsl.ContainerOp(
name='Test Model',
image='bell/boston_pipeline_test:latest',
arguments=[
'--x_test', x_test,
'--y_test', y_test,
'--model', model
],
file_outputs={
'mean_squared_error': '/app/output.txt'
}
)
def deploy_model_op(model):
return dsl.ContainerOp(
name='Deploy Model',
image='bell/boston_pipeline_deploy_model:latest',
arguments=[
'--model', model
]
)
@dsl.pipeline(
name='Boston Housing Pipeline',
description='An example pipeline that trains and logs a regression model.'
)
def boston_pipeline():
_preprocess_op = preprocess_op()
_train_op = train_op(
dsl.InputArgumentPath(_preprocess_op.outputs['x_train']),
dsl.InputArgumentPath(_preprocess_op.outputs['y_train'])
).after(_preprocess_op)
_test_op = test_op(
dsl.InputArgumentPath(_preprocess_op.outputs['x_test']),
dsl.InputArgumentPath(_preprocess_op.outputs['y_test']),
dsl.InputArgumentPath(_train_op.outputs['model'])
).after(_train_op)
deploy_model_op(
dsl.InputArgumentPath(_train_op.outputs['model'])
).after(_test_op)
if __name__ == "__main__":
import kfp.compiler as compiler
compiler.Compiler().compile(boston_pipeline, __file__ + ".tar.gz")
dsl-compile --py pipeline.py --output ironkey-boston-pipeline.tar.gz
컴파일된 파이프라인을 업로드하고 실행시켜봅니다.
파이프라인 업로드
updload pipeline 버튼을 클릭한 뒤 생성한 tar.gz 파일을 업로드합니다.
환경 생성
create experiment 버튼을 클릭합니다.
run name을 입력후 Start 버튼을 클릭합니다.
참조 :
https://towardsdatascience.com/machine-learning-pipelines-with-kubeflow-4c59ad05522
https://github.com/gnovack/kubeflow-pipelines