[K8S] Python으로 Kubernetes Job을 실행시키는 방법들

NewNewDaddy·2024년 3월 26일
0

DOCKER-KUBERNETES

목록 보기
9/10
post-thumbnail

0. INTRO

  • Kubernetes 리소스들을 Python으로 보거나 제어할 수 있도록 도와주는 kubernetes라는 라이브러리가 있다. 해당 라이브러리를 활용하여 특정 Job을 실행하는 방법을 다뤄볼 것이다.
  • 아래와 같이 크게 두 가지 방법으로 활용할 수 있을 것이다.
    1. Job 리소스 YAML 파일을 실행하도록 코드 구성
    2. API 형태로 실행되도록 코드 구성
  • 본문에 들어가기 앞서 아래 명령어를 통해 kubernetes 라이르러리를 설치해준다.
    pip install kubernetes

1. YAML 파일 직접 실행

  • Kubernetes Job 리소스를 실행시키기 위한 Yaml 파일을 Python 코드로 실행하는 방법이다.
  • kubectl -f apply 명령어를 통해서도 실행이 가능하지만 python 코드에 통합될 수 있다는 장점이 있다.
  • 리소스의 설정이 많아지면 kubernetes 라이브러리에서 제공해주는 코드를 활용했을 때가 오히려 더 복잡해질 수 있기 때문에 Yaml 파일 자체를 실행시키는 것이 더 나은 경우가 있다.
from kubernetes import client as kubernetes_client
from kubernetes import config
from kubernetes import utils
from icecream import ic
import time
import yaml

## kube config 파일 위치 및 변수 설정
config.load_kube_config('/home/ubuntu/.kube/config')
ns_name = 'default'

## YAML 파일 읽기
with open([Job YAML 파일 경로]) as f:
    pipeline_job = yaml.safe_load(f)
    
## kubernetes client 선언
k8s_client = kubernetes_client.api_client.ApiClient()

## YAML 내용 실행
utils.create_from_yaml( k8s_client, 
						yaml_objects=[pipeline_job],
                        namespace=ns_name )

## Pod 생성까지 대기 시간 설정
time.sleep(3)

## Pod의 상태 출력 (Job이 완료되면 종료)
core_v1_api = kubernetes_client.CoreV1Api()

while True:
    pods = core_v1_api.list_namespaced_pod(ns_name)
    pod_status = [pod.status.phase for pod in pods.items]
    if pod_status.count("Succeeded") == len(pod_status):
        ic(pod_status)
        break
    else:
        ic(pod_status)
        time.sleep(1)

2. API 형태로 실행

  • kubernetes 라이브러리에서 제공하는 method들을 사용하여 Job 리소스에 대한 생성, 실행, 종료, 상태 조회 기능을 담은 Class를 만들어보았다.
  • KubeJob Class에 대한 객체 생성 후 내부 method들을 실행하면 python native하게 해당 리소스를 실행할 수 있다.
  • 아래에서 사용한 dataclasses 라이브러리는 Class 작성시 __init__을 생략할 수 있도록 해주고 변수 타입 및 입력 형태 강제 등 많은 옵션들을 주어 Class 작성을 조금 더 편하고 깔끔하게 할 수 있도록 도와주는 python 내장 라이브러리이다. (Docs 참고 -> dataclasses Docs )
from time import sleep
from os import path
from icecream import ic
from time import sleep
from kubernetes import client, config
from dataclasses import dataclass


@dataclass
class KubeJob:
	## 기본 input내용 및 config 값 선언
    config.load_kube_config()
    JOB_NAME: str
    NAMESPACE: str
    
    ## 객체 생성과 동시에 적용될 작업들 선언
    def __post_init__(self):
        self.api_instance = client.BatchV1Api()
        self.create_object()
    
    ## Job 객체 생성
    def create_object(self, **kwargs):
        ic("CREATE JOB OBJECT")
        container = client.V1Container(
            name="test_job",
            image="alpine",
            command=["printenv"],
            env=[
                client.V1EnvVar(name="NAME", value="apple"),
                client.V1EnvVar(name="TYPE", value="banana"),
                ],
            volume_mounts=[
                client.V1VolumeMount(mount_path="/mnt", name="test_vol")
                ]
            )
        # Create and configure a spec section
        template = client.V1PodTemplateSpec(
            metadata=client.V1ObjectMeta(labels={"app": self.JOB_NAME}),
            spec=client.V1PodSpec(
                restart_policy="Never",
                containers=[container],
                volumes=[client.V1Volume(
                    name="test_vol",
                    persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(claim_name="pvc_test_vol")
                    )]))
        # Create the specification of deployment
        spec = client.V1JobSpec(
            template=template,
            backoff_limit=4,
            ttl_seconds_after_finished=30
            )
        # Instantiate the job object
        self.job_obj = client.V1Job(
            api_version="batch/v1",
            kind="Job",
            metadata=client.V1ObjectMeta(name=self.JOB_NAME),
            spec=spec)
        return self

	## Job 상태 조회
    def get_job_status(self):
        job_completed = False
        while not job_completed:
            api_response = self.api_instance.read_namespaced_job_status(
                name=self.JOB_NAME,
                namespace=self.NAMESPACE)
            if api_response.status.succeeded is not None or \
                    api_response.status.failed is not None:
                job_completed = True
            sleep(1)
            print(f"Job status='{str(api_response.status)}'")

    ## Job 생성
    def create_job(self):
        ic("CREATE JOB")
        api_response = self.api_instance.create_namespaced_job(
            body=self.job_obj,
            namespace=self.NAMESPACE)
        print(f"Job created. status='{str(api_response.status)}'")
        # self.get_job_status()
    
    ## Job 삭제
    def delete_job(self):
        ic("DELETE JOB")
        api_response = self.api_instance.delete_namespaced_job(
            name=self.JOB_NAME,
            namespace=self.NAMESPACE,
            body=client.V1DeleteOptions(
                propagation_policy='Foreground',
                grace_period_seconds=5))
        print(f"Job deleted. status='{str(api_response.status)}'")
    
if __name__ == '__main__':

    job1 = KubeJob("test", "default")
    
    job1.create_job()

3. OUTRO

  • 이번 글에서는 Job 리소스에 대해서만 다뤄 보았는데 kubernetes 라이브러리에는 거의 모든 리소스들에 대한 API를 지원해주고 있다.
  • 물론 YAML 형태로 작성하는 것이 가독성이나 직관성 측면에서는 훨씬 낫고 Job이 아닌 Deployment를 통한 App의 배포라면 Jenkins나 ArgoCD 등과 같이 이러한 작업에 특화된 오픈소스들을 이용하는 것이 더 좋을 것이라 생각한다.
  • 하지만 리소스 실행하는 부분이 python의 pipeline안에 녹아들어 통합적으로 운영되어야 하는 상황이라면 python native한 코드로 kubernetes의 리소스들을 다룰 수 있어 활용성이 상당히 좋을 것 같다.

4. 참고 자료

profile
데이터 엔지니어의 작업공간 / #PYTHON #CLOUD #SPARK #AWS #GCP #NCLOUD

0개의 댓글