0. INTRO
- Kubernetes 리소스들을 Python으로 보거나 제어할 수 있도록 도와주는
kubernetes
라는 라이브러리가 있다. 해당 라이브러리를 활용하여 특정 Job을 실행하는 방법을 다뤄볼 것이다.
- 아래와 같이 크게 두 가지 방법으로 활용할 수 있을 것이다.
- Job 리소스 YAML 파일을 실행하도록 코드 구성
- API 형태로 실행되도록 코드 구성
- 본문에 들어가기 앞서 아래 명령어를 통해
kubernetes
라이르러리를 설치해준다.pip install kubernetes
1. YAML 파일 직접 실행
- Kubernetes Job 리소스를 실행시키기 위한 Yaml 파일을 Python 코드로 실행하는 방법이다.
kubectl -f apply
명령어를 통해서도 실행이 가능하지만 python 코드에 통합될 수 있다는 장점이 있다.
- 리소스의 설정이 많아지면 kubernetes 라이브러리에서 제공해주는 코드를 활용했을 때가 오히려 더 복잡해질 수 있기 때문에 Yaml 파일 자체를 실행시키는 것이 더 나은 경우가 있다.
from kubernetes import client as kubernetes_client
from kubernetes import config
from kubernetes import utils
from icecream import ic
import time
import yaml
config.load_kube_config('/home/ubuntu/.kube/config')
ns_name = 'default'
with open([Job YAML 파일 경로]) as f:
pipeline_job = yaml.safe_load(f)
k8s_client = kubernetes_client.api_client.ApiClient()
utils.create_from_yaml( k8s_client,
yaml_objects=[pipeline_job],
namespace=ns_name )
time.sleep(3)
core_v1_api = kubernetes_client.CoreV1Api()
while True:
pods = core_v1_api.list_namespaced_pod(ns_name)
pod_status = [pod.status.phase for pod in pods.items]
if pod_status.count("Succeeded") == len(pod_status):
ic(pod_status)
break
else:
ic(pod_status)
time.sleep(1)
2. API 형태로 실행
- kubernetes 라이브러리에서 제공하는 method들을 사용하여 Job 리소스에 대한 생성, 실행, 종료, 상태 조회 기능을 담은 Class를 만들어보았다.
KubeJob
Class에 대한 객체 생성 후 내부 method들을 실행하면 python native하게 해당 리소스를 실행할 수 있다.
- 아래에서 사용한
dataclasses
라이브러리는 Class 작성시 __init__
을 생략할 수 있도록 해주고 변수 타입 및 입력 형태 강제 등 많은 옵션들을 주어 Class 작성을 조금 더 편하고 깔끔하게 할 수 있도록 도와주는 python 내장 라이브러리이다. (Docs 참고 -> dataclasses Docs )
from time import sleep
from os import path
from icecream import ic
from time import sleep
from kubernetes import client, config
from dataclasses import dataclass
@dataclass
class KubeJob:
config.load_kube_config()
JOB_NAME: str
NAMESPACE: str
def __post_init__(self):
self.api_instance = client.BatchV1Api()
self.create_object()
def create_object(self, **kwargs):
ic("CREATE JOB OBJECT")
container = client.V1Container(
name="test_job",
image="alpine",
command=["printenv"],
env=[
client.V1EnvVar(name="NAME", value="apple"),
client.V1EnvVar(name="TYPE", value="banana"),
],
volume_mounts=[
client.V1VolumeMount(mount_path="/mnt", name="test_vol")
]
)
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": self.JOB_NAME}),
spec=client.V1PodSpec(
restart_policy="Never",
containers=[container],
volumes=[client.V1Volume(
name="test_vol",
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(claim_name="pvc_test_vol")
)]))
spec = client.V1JobSpec(
template=template,
backoff_limit=4,
ttl_seconds_after_finished=30
)
self.job_obj = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=client.V1ObjectMeta(name=self.JOB_NAME),
spec=spec)
return self
def get_job_status(self):
job_completed = False
while not job_completed:
api_response = self.api_instance.read_namespaced_job_status(
name=self.JOB_NAME,
namespace=self.NAMESPACE)
if api_response.status.succeeded is not None or \
api_response.status.failed is not None:
job_completed = True
sleep(1)
print(f"Job status='{str(api_response.status)}'")
def create_job(self):
ic("CREATE JOB")
api_response = self.api_instance.create_namespaced_job(
body=self.job_obj,
namespace=self.NAMESPACE)
print(f"Job created. status='{str(api_response.status)}'")
def delete_job(self):
ic("DELETE JOB")
api_response = self.api_instance.delete_namespaced_job(
name=self.JOB_NAME,
namespace=self.NAMESPACE,
body=client.V1DeleteOptions(
propagation_policy='Foreground',
grace_period_seconds=5))
print(f"Job deleted. status='{str(api_response.status)}'")
if __name__ == '__main__':
job1 = KubeJob("test", "default")
job1.create_job()
3. OUTRO
- 이번 글에서는 Job 리소스에 대해서만 다뤄 보았는데
kubernetes
라이브러리에는 거의 모든 리소스들에 대한 API를 지원해주고 있다.
- 물론 YAML 형태로 작성하는 것이 가독성이나 직관성 측면에서는 훨씬 낫고 Job이 아닌 Deployment를 통한 App의 배포라면 Jenkins나 ArgoCD 등과 같이 이러한 작업에 특화된 오픈소스들을 이용하는 것이 더 좋을 것이라 생각한다.
- 하지만 리소스 실행하는 부분이 python의 pipeline안에 녹아들어 통합적으로 운영되어야 하는 상황이라면 python native한 코드로 kubernetes의 리소스들을 다룰 수 있어 활용성이 상당히 좋을 것 같다.
4. 참고 자료