이번 글 부터는 본격적으로 Kubeflow 주요 리소스인 Pipeline의 개념과 사용법에 대해서 알아보겠습니다.
Kubeflow 공식 문서에 따르면 Kubeflow Pipeline의 구성과 목적은 다음과 같습니다.
실습 주제는 올해 7~8월 DACON에서 진행된 전력량 예측 AI 경진대회에서 사용한 데이터와 XGBoost Regressor 모델 학습 프로세스를 Kubeflow Pipeline으로 구축하는 것 입니다. (https://dacon.io/competitions/official/236125/overview/descripti)
#[control Node]
$ mkdir ~/db
$ cd db
$ vim mydb-pvc.yaml
# mydb-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: mydb-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 10Gi
# Storage Class를 직접 정의하지 않으면 default SC로 자동 설정됩니다.
# mydb.yaml
apiVersion: v1
kind: Pod
metadata:
name: mydb
labels:
app: mydb
spec:
containers:
- name: mydb
image: mysql:latest
env:
- name: MYSQL_ROOT_PASSWORD
value: qwer1234
ports:
- containerPort: 3306
volumeMounts:
- name: db
mountPath: /var/lib/mysql
volumes:
- name: db
persistentVolumeClaim:
claimName: mydb-pvc
# mydb-svc.yaml
apiVersion: v1
kind: Service
metadata:
name: mydb-svc
spec:
type: NodePort
ports:
- port: 3306
targetPort: 3306
selector:
app: mydb
# db 디렉토리에서 수행
$ kubectl create -f .
$ kubectl exec -it mydb -- mysql -u root -p # ROOT PW 입력하여 접속
...
mysql> CREATE DATABASE trainset; # "trainset"이라는 이름의 DB 생성
# 필요 패키지
import pandas as pd
import os
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sqlalchemy import create_engine
# 전처리 클래스 정의
class Preprocessing(object):
def __init__(self):
# 원본 훈련 데이터 불러오기
self.train=pd.read_csv('https://raw.githubusercontent.com/Parkjiwonha/prj_jwjk/main/train.csv')
def preprocessing(self):
train_df=self.train
# 전처리 관련 코드
...
return traindf
# 데이터 업로드
def data_upload(self, df):
db_connection_str = 'mysql+pymysql://root:qwer1234@10.233.51.86/trainset' # mydb-svc의 CLUSTER-IP 및 데이터베이스 입력
db_connection = create_engine(db_connection_str)
try:
conn = db_connection.connect()
print('DB 연결에 성공하였습니다.')
df.to_sql(name='traindf', con=conn, if_exists='replace',index=False) # traindf라는 이름의 테이블 생성
conn.close()
print('데이터 업로드를 완료하였습니다.')
except:
print('DB 연결/업로드에 실패하였습니다.')
$ kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kubernetes ClusterIP 10.233.0.1 <none> 443/TCP 65m
mydb-svc NodePort 10.233.51.86 <none> 3306:31078/TCP 20m
# Python
preproc = Preprocessing()
df = preproc.preprocessing()
preproc.data_upload(df)
# python
sql_statement = ''' SELECT * FROM traindf'''
new_df = pd.read_sql(sql=sql_statement, con=conn)
conn.close()
여기까지의 과정을 통해 원본 데이터를 전처리 한 후, Kubernetes 클러스터의 데이터베이스에 저장하였습니다.
이후부터 Kubeflow Pipeline을 통해서 데이터를 Load하고 Split, Train, Test & Evaluation을 컴포넌트로 하나씩 구성해 보도록 하겠습니다.