졸업프로젝트 과제로 ADAS Simulation을 위한 클라우드 인프라 구축을 진행하게 되었다. 자율주행 시뮬레이션은 데이터 수집, 전처리, 시뮬레이션 실행, 결과 분석 등 다양한 단계로 구성된다. 이때 방향성 비순환 그래프(DAG)로 모델링하면, 각 단계 간의 종속성과 순서를 정의할 수 있어 올바른 순서로 작업이 실행되도록 보장한다. 이를 통해 잡한 워크플로를 체계적으로 모델링하고 관리할 수 있다.
이런 이유 때문에 이미 Airflow를 시뮬레이션에서 사용하는 사례를 본적이 있지만, 나는 HPC 쿠버네티스 클러스터 구성을 통해 시뮬레이션을 개선하는 과제를 목표로 하고 있어, k8s에 더 친화적인 Argo Workflow를 사용하기로 했다.
그러나, Airflow도 k8s를 지원하긴 해서,,,(물론 k8s면 그냥 argo workflow가 쓰는게) 둘의 차이에 대해서 잘 모르고 있는 것 같아서 이참에 둘을 비교해 보려고 한다.

워크플로우는 특정 순서에 따라 발생하는 반복적이고 체계적인 프로세스와 작업을 관리하는 시스템이다.
위 사진처럼 순서들의 흐름을 하나의 목표를 수행하는 것을 하나의 워크플로우라고 하고, 이러한 워크플로우들이 개발자가 정해놓은 간격으로 수행되게 예약하기, 그리고 그것들이 잘 동작했는지, 모니터링, 시각화 한 툴들이 워크플로우 관리 시스템이다.
DAG는 Directed Acyclic Graph의 약자로 방향성 있는 비순환 그래프라고 불린다. 예를 들어, ETL 파이프라인을 구축한다고 하면 각각의 Taks는 Extract, Trnasform, Load일 것이고 다음과 같은 화살표를 가진 DAG를 만들 수 있다.

복잡한 DAG는 다음과 같이 구성 가능하다.

여러 작업이 있을 때 병렬 작업까지 가능하다.
자세히는 다루지 않겠다. 아래 블로그를 참고하면 좋을듯하다.
https://steemit.com/dag/@cryptodreamers/dag-dag-directed-acyclic-graph



처음에는 단순 kubernetes 호환성만 고려해서 Kubeflow와 Argo Workflow 중에도 고민을 했었다. Kubeflow가 어차피 내부적으로 Argo Workflow를 사용하기도(워크플로우 orchestration) 하지만, 이의 목적은 MLOps 전반을 다루는 포괄적인 플랫폼이다. ADAS Simulation에서 어떤 시나리오들 간에는 AI적 요소가 들어가면 더 좋을 수도 있겠다고는 생각하지만, 내 능력 밖이기도 하고 지금은 최소한의 연구 목표(k8s로 hpc환경 구축)만 만족해야하므로 더 순수한 워크플로우 관리 도구인 Argo Workflow를 사용하기로 했다.
| 구분 | Airflow | Argo workflow |
|---|---|---|
| UI | webserver | Argo UI |
| 스케줄러 | scheduler | kube-scheduler |
| 메세지 큐 | Celery | ARgo Controller |
| 메타데이터 DB | PostgreSQL | etcd |
| Worker | Worker | Node |
| DAG 정의 | Python script | YAML |
| Job 단위 | Operator | Container |
확장성

Argo workflow가 구글 트렌드로 봤을 때 확실히 화제성은 부족한 것 같다.. Github star로 봐도 그렇고,,
다만, 내가 구축하려는 시뮬레이션 환경에서 실제 시나리오는 매우 복잡할 수 있다. 따라서 요구 사항에 따라 Airflow로 데이터 수집, 전처리, 모델 학습 등의 복잡한 워크플로우를 정의하거나, 개별 시나리오를 더 구체화 하여 세분화 해서 Argo Workflow의 장점을 살리는 법도 고민해봐야겠다.
아래는 동일한 내용을 각각 순서대로 Airflow(Python)와 Argo Workflow(Yaml)에서 정의하는 예시이다.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
with DAG('example_dag', start_date=datetime(2023, 1, 1), schedule_interval=None) as dag:
task_A = BashOperator(
task_id='task_A',
bash_command='echo hello A'
)
task_B = BashOperator(
task_id='task_B',
bash_command='echo hello B',
depends_on_past=True,
trigger_rule='all_success'
)
task_C = BashOperator(
task_id='task_C',
bash_command='echo hello C',
depends_on_past=True,
trigger_rule='all_success'
)
task_A >> task_B >> task_C
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: loops-dag-
spec:
entrypoint: dag-example
templates:
- name: dag-example
dag:
tasks:
- name: A
template: A
- name: B
template: B
dependencies: [A]
- name: C
template: C
dependencies: [B]
- name: A
container:
image: alpine:3.7
command: [echo, hello A]
- name: B
container:
image: alpine:3.7
command: [echo, hello B]
- name: C
container:
image: alpine:3.7
command: [echo, hello C]
참고