회사에서 다루는 데이터는 비트코인, 이더리움등 다양한 체인에서 발생하는 온체인 데이터로 이를 재가공하여 고객들에게 제공하고 있다.
일부 가공 데이터는 API 서버단에서 다이나믹하게 계산이 되는데 고객이 늘어남에 따라 계산에 많은 부하가 생기기 시작했다.
그래서 이제는 가공된 데이터를 미리 저장하기로 하였고, 여러 소스로부터 데이터를 가공하기 위한 인프라가 필요하게 됐다.
기존에는 다양한 온체인 데이터, 마켓 데이터 등을 각각 따로 ETL 서버를 두고 개발 및 운영을 하였다. 그렇다 보니 새로운 데이터를 추가할 때마다 혹은 scaling 작업을 하는데에 많은 리소스가 사용됐었다.
하여 제공할 수 있도록 데이터 워크플로우를 개선하기로 결정했다.
인프라 구성을 하면서 고민했던 부분 위주로 정리를 했다.
데이터 워크플로우를 새로 만들기 위해서 고민하다 airflow framework를 사용해보기로 했다.
Airflow 는 워크플로우를 프로그래밍 방식으로 작성, 운영, 모니터링 할 수있게 해주는 툴이다.
Airflow 의 task는 Directed Acyclic Graphs (DAGs) 으로 묶여 scheduler가 이를 실행하고 worker들에게 이들을 할당하여 task들을 dependencies에 맞게 실행한다.
Airflow 는 크게 4가지 원칙을 지키고 있는데 각각
Airflow의 Dag을 실행하는 executor는 LocalExecutor, CeleryExecutor, KubernetesExecutor, CeleryKubernetesExecutor 가 있다. 처음에는 KubernetesExecutor를 사용하기로 했었는데 사용하다보니 문제가 있었다. 주기가 짧고 양이 많은 Task의 경우에는 Pod를 생성하는데 걸리는 시간 때문에 효율성이 많이 떨어졌었다. 그래서 주기가 짧고 양이 많은 Task를 가진 Dag은 CeleryExecutor를 사용할 필요가 있었다.
그래서 최종적으로 선택한 executor는 CeleryKubernetesExecutor이다. airflow operator를 동작시킬 때 queue 매개변수에 인자값을 어떻게 주느냐에 따라 executor를 선택할 수 있다.
@task(queue="nft_worker")
def create_whale_table():
...
default값으로는 kubernetes queue를 사용하면 KubernetesExecutor를 사용하게 되고 이외에 다른 queue값을 넣거나 인자를 전달하지 않으면 default celery queue로 task가 등록되서 CeleryExecutor를 사용하게된다.
[core]
...
executor = CeleryKubernetesExecutor
...
[celery_kubernetes_executor]
kubernetes_queue = kubernetes
데이터 가공을 위해서는 Source가 되는 테이블과 의존성이 생기기 마련이다. 이를 Dag의 task로 구성하여 flow를 구축하고 주기적으로 cron을 돌리는 일련의 과정을 airflow가 굉장히 fit하게 맞춰줄 수 있을 것으로 보였다.
또한 이러한 배치잡을 python코드만으로 구성할 수 있는 점 또한 매력적으로 느껴졌다.
무엇보다 가파르게 지원해야하는 데이터들의 양이 많아짐에 따라 scalable한 환경구성이 필수요소였는데 이 또한 airflow에서 지원이 되기 때문에 크게 고민할 이유가 없었다.
Airflow 를 잘 활용하기 위해서 k8s 구조에 airflow 를 돌릴 수 있도록 하였다.
초기 인프라 구성은 AWS 의 MWAA를 이용하여 사용하려고 했었다. 하지만 인프라 구성중 인프라 운영비용 감축을 해야하는 이슈가 생겨서 mwaa를 사용하지않고 AWS EKS를 이용하여 직접 구축하기로 하였다.
AWS EKS 환경에 airflow framework 를 구성했다.
on-demand type의 node에 webserver, scheduler, monitoring ( prometheus & grafana ) 를 띄울 수 있도록 하였고 worker node같은 경우는 spot node를 활용하였다. 성격상 worker node는 계속해서 떠있지 않고 scheudling이 됐을 때만 실행되기 때문에 spot 이 좀 더 맞다고 판단했다. ( 가격적인 문제도 무시할 수 없음 )
Spot node group은 종류에 따라 pod_template을 구성하여서 사용할 수 있게 하였다.
Webserver UI 를 사용하기 위해서 Webserver node 에 ALB를 달았다.
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: monitoring-grafana-ingress
namespace: monitoring
labels:
name: monitoring-grafana-ingress
owner: ******
env: ${AIRFLOW_ENV}
annotations:
alb.ingress.kubernetes.io/scheme: internet-facing
alb.ingress.kubernetes.io/target-type: ip
alb.ingress.kubernetes.io/load-balancer-name: ******
alb.ingress.kubernetes.io/tags: ******
alb.ingress.kubernetes.io/success-codes: 200,404,301,302
alb.ingress.kubernetes.io/certificate-arn: ${AWS_SSL_ENDPOINT}
alb.ingress.kubernetes.io/listen-ports: '[{"HTTP": 80}, {"HTTPS":443}]'
alb.ingress.kubernetes.io/ssl-redirect: '443'
alb.ingress.kubernetes.io/group.name: tech
alb.ingress.kubernetes.io/group.order: '1'
spec:
ingressClassName: alb
rules:
- http:
paths:
- path: /monitoring
pathType: Prefix
backend:
service:
name: monitoring-grafana-service
port:
number: 80
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: airflow-celery-flower-ingress
namespace: airflow
labels:
name: airflow-celery-flower-ingress
owner: ******
env: ${AIRFLOW_ENV}
annotations:
alb.ingress.kubernetes.io/scheme: internet-facing
alb.ingress.kubernetes.io/target-type: ip
alb.ingress.kubernetes.io/load-balancer-name: ******
alb.ingress.kubernetes.io/tags: ******
alb.ingress.kubernetes.io/success-codes: 200,404,301,302
alb.ingress.kubernetes.io/certificate-arn: ${AWS_SSL_ENDPOINT}
alb.ingress.kubernetes.io/listen-ports: '[{"HTTP": 80}, {"HTTPS":443}]'
alb.ingress.kubernetes.io/ssl-redirect: '443'
alb.ingress.kubernetes.io/group.name: tech
alb.ingress.kubernetes.io/group.order: '2'
spec:
ingressClassName: alb
rules:
- http:
paths:
- path: /flower
pathType: Prefix
backend:
service:
name: airflow-celery-flower-service
port:
number: 80
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: airflow-agg-metric-service
namespace: airflow
labels:
name: airflow-agg-metric-service
owner: ******
env: ${AIRFLOW_ENV}
annotations:
alb.ingress.kubernetes.io/scheme: internet-facing
alb.ingress.kubernetes.io/target-type: ip
alb.ingress.kubernetes.io/load-balancer-name: ******
alb.ingress.kubernetes.io/tags: ******
alb.ingress.kubernetes.io/success-codes: 200,404,301,302
alb.ingress.kubernetes.io/certificate-arn: ${AWS_SSL_ENDPOINT}
alb.ingress.kubernetes.io/listen-ports: '[{"HTTP": 80}, {"HTTPS":443}]'
alb.ingress.kubernetes.io/ssl-redirect: '443'
alb.ingress.kubernetes.io/group.name: tech
alb.ingress.kubernetes.io/group.order: '3'
spec:
ingressClassName: alb
rules:
- http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: airflow-agg-metric-service
port:
number: 80
root를 webserver로 설정하고 해당 Loadbalancer에 monitoring, flower endpoint를 추가했다.
여러 ingress를 하나의 loadbalancer로 만들기 위해서
alb.ingress.kubernetes.io/group.name 을 설정하고 각 ingress 에 group.order 값을 순서대로 준다. 이 때 root path 는 가장 아래쪽으로 배치를 해줘야 위에 endpoint들이 제대로 동작한다. (맨 첫번째 order로 가게되면 무조건 root 로 redirect 된다. )
airflow 의 worker들의 log 정보를 가져오기 위해서는 remote 저장소를 이용해야한다. AWS에서는 s3, efs 등이 사용되는데 이 중 efs 를 사용하기로 했다.
EFS 를 사용하기 위해서는 CSIDriver 를 설정해줘야한다. CSI란 Kubernetes에 임의의 스토리지를 연동시켜주는 오픈형 API이다. Kubernetes 에서 제공하지 않은 외부 스토리지 서비스를 연동하기위해 사용한다고 생각하면 된다.
apiVersion: storage.k8s.io/v1
kind: CSIDriver
metadata:
name: efs.csi.aws.com
labels:
name: efs.csi.aws.com
owner: jhkim
env: ${AIRFLOW_ENV}
spec:
attachRequired: false
---
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
name: airflow.efs-csi.sc
labels:
name: airflow.efs-csi.sc
owner: jhkim
env: ${AIRFLOW_ENV}
provisioner: efs.csi.aws.com
mountOptions: #these options
- uid=50000
- gid=0
CSIDriver object를 만들어주고 이를 Storageclass에 연동해서 만들어주고 PV, PVC를 해당 StorageClass 에 연동해주면 된다.
Airflow 의 기본 component들과 dag을 함께 관리하게 되면 dag의 자잘한 수정과정이 있을 때 마다 전체 deploy를 해야하는 문제가 있다.
이 때문에 실제 사용되는 Dag 은 별도의 Repository로 관리를 하고 이를 git-sync sidecar 패턴을 이용하여 적용하기로 했다.
sidecar 패턴이란 말그대로 main container 옆을 보조하면서 떠있는 side container를 띄우는 것이다. side container 와의 volume을 공유하면서 side container 에게는 volume을 git-hub remote repo와 sync를 맞추게 하고 main container는 본인의 job을 하는 것이다.
- name: git-sync
image: k8s.gcr.io/git-sync:v3.1.1
envFrom:
- configMapRef:
name: airflow.git-sync.cm
volumeMounts:
- name: airflow-dag
mountPath: "/opt/airflow/dags"
securityContext:
runAsUser: 65533
runAsGroup: 0
allowPrivilegeEscalation: false
capabilities:
drop: [ "all" ]
deployment를 구성할 때 다음과 같이 git-sync container 설정하여 airflow-dag이라는 이름으로 volume-mount를 하고 deployment 의 volume을 webserver, scheduler 가 공유하면서 git-sync 가 sync를 맞추고 있는 volume을 각 container들이 바라볼 수 있게 설정했다.
추가적으로 git-sync에서 사용할 environment 값은 아래와 같다.
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow.git-sync.cm
namespace: airflow
labels:
name: airflow.git-sync.cm
owner: jhkim
env: ${AIRFLOW_ENV}
data:
GIT_SYNC_REPO: "https://${AIRFLOW_GIT_USER}:${AIRFLOW_GIT_PASSWORD}@github.com/Your username/your-repo.git"
GIT_SYNC_BRANCH: develop # repo의 어떤 branch를 이용할 것인가.
GIT_SYNC_ROOT: "/opt/airflow/dags" # root path 설정
GIT_SYNC_DEST: "repo 가 clone될 folder의 이름 설정하지 않으면 repo 이름 그대로 적용된다."
GIT_SYNC_DEPTH: "1" # commit history 깊이를 어느정도로 할 것인가 설정하지 않으면 모든 commit history를 가져옴.
GIT_SYNC_ONE_TIME: "false" # git-sync once or repe
GIT_SYNC_WAIT: "60" # sync 주기
GIT_SYNC_USERNAME: "Jihwan Kim"
GIT_SYNC_PERMISSIONS: "0755" repo chmod 값
GIT_KNOWN_HOSTS: "false"
GIT_SYNC_PASSWORD: "password"
git-sync 를 사용하게 되면 Dag repo 와 airflow 구성 repo를 따로 관리하기 때문에 잦은 전체 deploy를 하지 않을 수 있고 dag을 좀 더 효율적으로 관리할 수 있게 된다.
현재는 main 브랜치에 merge가 이루어지면 github action을 통해서 docker image build를 진행하고 이를 만들어둔 shell script를 동작시켜서 운영 환경에 배포하는 방식으로 배포가 이루어지고 있다. 해당 방식으로 자동화를 하기는 했으나 그 기능이 제한적이어서 ( 오로지 배포만을 위한 동작 ) argoCD를 이용한 버전관리, Auto failover 기능 등을 적용하여 좀 더 단단한 인프라를 만들 필요가 있다.
현재 단일한 dev 환경을 제공하고 있어 특정 개발자가 dag 테스트를 하려고 하면 새로 deploy를 해야해서 다른 작업자가 테스트하기 어려운 상황이다. 이를 위해서 각각에게 개발환경을 제공할 필요가 있다.
https://aws.amazon.com/ko/blogs/containers/running-airflow-workflow-jobs-on-amazon-eks-spot-nodes/
https://github.com/kubernetes/git-sync