[Airflow] KubernetesPodOperator 란?

김지환·2022년 9월 21일
0

Airflow on Kubernetes

  • 에어플로우의 강정은 유연성에 있다. 여러 범주의 서비스들을 통합하여 사용할 수 있어 이를 통한 Multi platform workflow를 만들 수 있지만, 이런 다양한 서비스를 하나의 프로젝트로 관리하기에는 의존성 관리의 문제가 생기게 된다. 서로 다른 팀들 끼리 필요로 하는 Library가 모두 다를 수 있다.
  • 이러한 문제를 해결하기 위해서 Airflow 를 Kubernetes 환경에서 동작시키게 되면 각 서비스별 독립적인 Pod를 실행할 수 있다.

KubernetesPodOperator


동작 원리

KubernetesPodOperator 는 Kubernetes Python Client 를 이용하여 API Request를 보낸다. Pod 의 spec은 KubernetesPodOperator 에 유저가 정의한 spec으로 생성되게 된다.

해당 Operator를 동작시키는 Executor는 기본 설정 방식에 따라 동작을 하게 되고 Worker node에서 해당 API Request를 보내는 것이다.

Executor에 따른 전반적인 실행 과정

e.g. )

  1. CeleryExecutor
  • KubernetesPodOperator( KPO ) 에서 Pod의 affinity는 node 2로 가정한다.
  1. KubernetesExecutor
  • KubernetesPodOperator( KPO ) 에서 Pod의 affinity는 node 2로 가정한다.
  • pod_template.yaml 에서 기본 node affinity는 Node 1이라고 가정한다.

Pod 의 생성 및 Node할당되어 동작하기까지의 세부 동작

  1. airflow 에서 api request를 kube api-server에 날리게 된다.
  2. kube scheduler 는 각 node의 정보들을 갖고 있는데 해당 Pod 의 spec ( affinity, selector etc ) 등의 정보를 토대로 Node를 할당하여 etcd 에 저장하는 api request를 kube api-server에 보낸다.
  3. 각 node에는 kubelet component 가 설치돼 있는데 해당 component 또한 scheduler 처럼 주기적으로 kube api-server 에 watch request를 날리며 etcd에 본인에게 해당하는 pod정보가 있는지 확인을 한다. 이 때 본인에게 해당하는 pod 가 있다면 kubelet은 그 정보를 받아온다.
  4. kubelet 은 내부 docker에 container를 올리도록 요청을하고 kube-proxy 에 container에서 사용할 network생성을 요청하여 부여하게된다.

Code

KubernetesPodOperator 를 사용하기 위해서는

apache-airflow-providers-cncf-kubernetes 해당 package가 설치돼 있어야 한다.

from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s

k = KubernetesPodOperator(
    name="hello-dry-run",
    image="debian",
    cmds=["bash", "-cx"],
    arguments=["echo", "10"],
    labels={"foo": "bar"},
    task_id="dry_run_demo",
    do_xcom_push=True,
)

k.dry_run()
  • name : 실행할 Task의 이름 [a-z0-9.-] 로만 구성 가능.
  • image: Docker image 기본적으로 hub.docker.com 으로 위치가 잡혀있다. AWS, GCP 환경에서의 Kubernetes cluster라면 해당 container registry endpoint를 사용하여 가져올 수도 있다.
  • cmds: Container 의 entrypoint 만약 정의하지 않는다면 docker image에서의 entrypoint가 사용된다.
  • arguments: entrypoint에 사용될 argument 들 만약 설정되지 않으면 docker image의 CMD가 사용된다.
  • labels: Pod 에 적용될 label
  • task_id: 해당 task의 id
  • do_xcom_push: 각 task간 데이터 전달이 가능하게 하는 flag이다.

이 외에도 많은 parameters 들이 사용되는데 실제로 사용 예제들이 많지 않아서 자주 쓰이는 parameter 들이 어떻게 사용되는지 적어본다.

from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s

secret_env = k8s.V1Secret(
    deploy_type='env',  # env, volume 2가지 타입이 있다. volume을 사용하면 deploy_target 에 path를 적으면 된다.
    deploy_target=None, # deploy_target을 설정하지 않으면 모든 secrets들을 mount 한다. 
		# 특정 secret을 사용하기 위해서는 key parameter에 str 타입으로 이름을 쓰면 된다.
    secret="secret object name", #  Kuberntes 의 secret object 이름.
)

env_from = [
    k8s.V1EnvFromSource(
				# configmap fields를  key-value 형태의 dict 타입으로 전달한다. 
        config_map_ref=k8s.V1ConfigMapEnvSource(name="Your configmap in Kubernetes"),
				# secret fields를  key-value 형태의 dict 타입으로 전달한다.
        secret_ref=k8s.V1SecretEnvSource(name="Your secret in Kubernetes")),
]

envs = {
    "TEST1": os.getenv("TEST1", ""),
    "TEST2": os.getenv("TEST2", ""),
    "TEST3": os.getenv("TEST3", "")
}

# 환경변수에서 직접 가져와서 List 형태로 만들어 사용할 수도 있다.
env_vars = [k8s.V1EnvVar(name=_key, value=_value) for _key, _value in envs.items()]

# affinity 또는 selector 등을 param으로 주고 사용할 수 있다.
affinity = k8s.V1Affinity(
    node_affinity={
        'requiredDuringSchedulingIgnoredDuringExecution': {
            'nodeSelectorTerms': [
                {
                    'matchExpressions': [
                        {
                            'key': 'alpha.eksctl.io/nodegroup-name',
                            'operator': 'In',
                            'values': [
                                'ng-spot',
                            ]
                        }]
                }
            ]
        }
    })

# Pod 의 resource 를 할당해준다.
resources = k8s.V1ResourceRequirements(
    limits={"memory": "1Gi", "cpu": "1"},
    requests={"memory": "500Mi", "cpu": "0.5"},
)

example_operator = KubernetesPodOperator(
        task_id="example",
        # Name of task you want to run, used to generate Pod ID.
        name="example",
        cmds=["bash", "-cx"],
        arguments=[f"sleep 10000"],
        namespace="test",
        service_account_name="your service account", # 해당 Pod 를 돌리기 위한 service account 설정.
        secrets=[secret_env],
        env_vars=env_vars,
        env_from=env_from,
        is_delete_operator_pod=True,
        image=f"{os.getenv('IMAGE_PATH')}:{os.getenv('IMAGE_TAG', 'latest')}",
        image_pull_policy="Always", # image pull을 어떤 정책으로 할건지 설정.
        affinity=affinity,
        resources=resources
    )

example_operator.dry_run()

위와 같이 KubernetesPodOperator를 사용할 수 있는데 사용해보면서 느꼈던 몇 가지 유의사항을 적어보자면

  1. 사용했던 airflow version은 2.2.3 이 었는데 service_account_name 에 원하는 account를 등록해도 해당 namepsace의 default service account를 사용하는 이슈가 있었는데 trouble shooting이 필요해 보인다.
  2. image_pull_policy 설정에 주의하자 Always 로 설정을 해주면 매번 pull 을 받아오게 된다. IfNotPresent를 사용하게 되면 kubelet 해당 image의 cache정보가 있다면 가져와서 사용하게 된다. 해당 policy를 따로 설정하지 않으면 IfNotPresent로 사용되서 자동으로 cache를 사용하게 되므로 image 업데이트 했는데 반영이 안된다고 삽질을 하면 안된다.
  3. pod 의 resource 선정은 왠만하면 해두는게 좋다. pod 의 resource설정이 되어 있지 않으면 리소스 문제가 발생시 K8s 의 QoP class에 의해서 가장 우선순위로 resource 설정이 되어있지 않은 pod부터 제거가 된다. 또한 설정하지 않았다면 그에 따른 node 분배도 제대로 되지 않을 것이다.
  4. is_delete_operator_pod 설정을 해주지 않으면 성공된 파드도 삭제되지 않아서 resource 문제를 야기할 수 있다. 따로 이유가 있지 않다면 True로 설정하는것이 좋다.
profile
Developer

0개의 댓글