Apache Airflow는 워크플로우 자동화 및 배치 처리를 위한 오픈소스 도구이다. DAG(Directed Acyclic Graph)를 기반으로 워크플로우를 정의하고, 실행 상태를 모니터링하며, 복잡한 데이터 파이프라인을 손쉽게 관리할 수 있도록 도와준다.
| 구성 요소 | 설명 |
|---|---|
| DAG (Workflow) | Task들의 실행 순서를 정의하는 그래프 |
| Operator | 개별 작업을 정의하는 단위 (ex: BashOperator, PythonOperator, SQLOperator 등) |
| Task | DAG 내에서 실행되는 개별 작업 |
| Scheduler | DAG 실행을 스케줄링하고 Task를 트리거하는 역할 |
| Executor | Task 실행을 담당하는 엔진 (LocalExecutor, CeleryExecutor, KubernetesExecutor 등) |
| Web UI | DAG 및 Task 상태를 모니터링할 수 있는 웹 기반 대시보드 |
Apache Airflow의 공식 Docker 이미지에는 일반 버전(Full)과 Slim 버전이 있다. 기본적으로 포함된 패키지의 수와 이미지 크기에서 차이가 난다. Airflow를 처음 사용하거나 빠르게 테스트하고 싶을 때나
대부분의 기능을 바로 사용하고 싶을 때 일반 버전을 사용한다. 운영 환경에서 불필요한 패키지를 줄이고, 보안 이슈로 최소한의 패키지만 포함한 최적화된 환경을 구성할 때 slim 버전을 사용한다.
Airflow는 기본적으로 SQLite를 사용하며, 로컬 테스트 용도로만 적합하다. airflow.cfg에서 sql_alchemy_conn = sqlite:////path/to/airflow.db와 같은 설정이 기본값이다.
SQLite는 단일 프로세스에서만 안정적으로 동작하며, 여러 개의 Airflow 서비스(Pod, Container)와 공유할 수 없다. 운영 환경에서는 다중 인스턴스 지원을 위해 PostgreSQL 또는 MySQL을 사용해야 한다.
프로덕션 환경에서는 PostgreSQL 또는 MySQL을 사용해야 한다. PostgreSQL을 사용하려면 직접 설치하고 설정해야 한다.
| 도구 | 특징 | 적합한 경우 |
|---|---|---|
| Argo Workflows | DAG 기반 Kubernetes 워크플로우, UI 제공 | Kubernetes에서 복잡한 배치를 운영하고 싶은 경우 |
| Prefect | Python 코드 기반, Airflow보다 가벼움 | 개발 친화적인 워크플로우 관리가 필요한 경우 |
| KubeCron | CronJob 개선, 모니터링/재시도 기능 제공 | 기존 CronJob을 유지하면서 기능을 확장하고 싶은 경우 |
| Kestra | YAML 기반 설정, UI 제공 | 간편하게 Kubernetes 배치를 운영하고 싶은 경우 |
운영환경에는 Helm 사용을 통해 PostgreSql 등으로 메타 데이터베이스를 교체하고, REDIS 사용을 추천한다. 아래는 Airflow 테스트를 위해 약식 설치 방식이다.
kubectl create namespace airflow
kubectl apply -f airflow-deploy.yml
kubectl apply -f airflow-svc.yml
airflow-svc.yml
apiVersion: v1
kind: Service
metadata:
name: airflow
namespace: airflow
spec:
selector:
app: airflow
ports:
- protocol: TCP
port: 8080
targetPort: 8080
type: NodePort
airflow-deploy.yml
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
namespace: airflow
spec:
replicas: 1 # SQLite는 1개 Replica만 가능
selector:
matchLabels:
app: airflow
template:
metadata:
labels:
app: airflow
spec:
securityContext:
fsGroup: 50000
containers:
- name: airflow
image: apache/airflow:latest
securityContext:
runAsUser: 50000
imagePullPolicy: IfNotPresent
env:
- name: AIRFLOW__CORE__EXECUTOR
value: "SequentialExecutor"
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
value: "sqlite:////opt/airflow/airflow.db"
volumeMounts:
- name: airflow-data
mountPath: /opt/airflow
command: ["bash", "-c"]
args:
- |
airflow db init &&
airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com &&
airflow webserver &&
airflow scheduler
volumes:
- name: airflow-data
persistentVolumeClaim:
claimName: mypvc
vagrant@slave2:~$ k get svc -n airflow
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
airflow NodePort 10.98.56.159 <none> 8080:30838/TCP 16m
웹콘솔은 위의 노드포트를 통해 접속 가능하다. 기본 계정은 admin/admin 이다.
vagrant@slave2:~$ k exec airflow-6bf7b46c79-scvks -n airflow -- airflow version
2.10.5
airflow@airflow-6bf7b46c79-scvks:/opt/airflow$ airflow config get-value core dags_folder
/opt/airflow/dags
airflow@airflow-6bf7b46c79-scvks:/opt/airflow$ airflow dags list
dag_id | fileloc | owners | is_paused
==========+========================+=========+==========
hello_dag | /opt/airflow/dags/h.py | airflow | True
airflow@airflow-5db85d78ff-x8tqw:/opt/airflow/dags$ airflow dags show hello_dag
[2025-03-10T11:02:49.242+0000] {dagbag.py:588} INFO - Filling up the DagBag from /opt/airflow/dags
digraph hello_dag {
graph [label=hello_dag labelloc=t rankdir=LR]
print_date [color="#000000" fillcolor="#f0ede4" label=print_date shape=rectangle style="filled,rounded"]
print_hello [color="#000000" fillcolor="#f0ede4" label=print_hello shape=rectangle style="filled,rounded"]
print_date -> print_hello
}
airflow@airflow-5db85d78ff-x8tqw:/opt/airflow/dags$ cat $AIRFLOW_HOME/logs/scheduler/latest/*.log | grep "DAG"
[2025-03-10T11:01:25.045+0000] {processor.py:925} INFO - DAG(s) 'hello_dag' retrieved from /opt/airflow/dags/h.py
[2025-03-10T11:01:25.138+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.137+0000] {override.py:1912} INFO - Created Permission View: can read on DAG:hello_dag
[2025-03-10T11:01:25.150+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.150+0000] {override.py:1912} INFO - Created Permission View: can edit on DAG:hello_dag
[2025-03-10T11:01:25.159+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.158+0000] {override.py:1912} INFO - Created Permission View: can delete on DAG:hello_dag
[2025-03-10T11:01:25.174+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.174+0000] {override.py:1912} INFO - Created Permission View: can read on DAG Run:hello_dag
[2025-03-10T11:01:25.184+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.183+0000] {override.py:1912} INFO - Created Permission View: menu access on DAG Run:hello_dag
[2025-03-10T11:01:25.192+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.192+0000] {override.py:1912} INFO - Created Permission View: can create on DAG Run:hello_dag
[2025-03-10T11:01:25.202+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.202+0000] {override.py:1912} INFO - Created Permission View: can delete on DAG Run:hello_dag
[2025-03-10T11:01:25.202+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.202+0000] {dag.py:3239} INFO - Sync 1 DAGs
[2025-03-10T11:01:25.211+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.211+0000] {dag.py:3262} INFO - Creating ORM DAG for hello_dag
[2025-03-10T11:01:56.055+0000] {processor.py:925} INFO - DAG(s) 'hello_dag' retrieved from /opt/airflow/dags/h.py
[2025-03-10T11:01:56.073+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:56.072+0000] {dag.py:3239} INFO - Sync 1 DAGs
[2025-03-10T11:02:27.136+0000] {processor.py:925} INFO - DAG(s) 'hello_dag' retrieved from /opt/airflow/dags/h.py
[2025-03-10T11:02:27.150+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:02:27.150+0000] {dag.py:3239} INFO - Sync 1 DAGs
[2025-03-10T11:02:58.040+0000] {processor.py:925} INFO - DAG(s) 'hello_dag' retrieved from /opt/airflow/dags/h.py
[2025-03-10T11:02:58.057+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:02:58.057+0000] {dag.py:3239} INFO - Sync 1 DAGs
h.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 6, 29),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'hello_dag',
default_args=default_args,
description='Hello DAG',
schedule_interval=timedelta(days=1),
)
task1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
task2 = BashOperator(
task_id='print_hello',
bash_command='echo "Hello World"',
dag=dag,
)
task1 >> task2
vagrant@slave2:~$ k get po -n airflow
NAME READY STATUS RESTARTS AGE
airflow-6bf7b46c79-scvks 1/1 Running 1 (33m ago) 26h
kubectl cp hello.py airflow-6bf7b46c79-scvks:/opt/airflow/dags -n airflow
위에서 복사한 DAG가 웹콘솔에 나타나는 것을 확인한다.
Apache Airflow는 워크플로우 자동화 및 배치 처리를 위한 오픈소스 도구이다. DAG(Directed Acyclic Graph)를 기반으로 워크플로우를 정의하고, 실행 상태를 모니터링하며, 복잡한 데이터 파이프라인을 손쉽게 관리할 수 있도록 도와준다.
| 구성 요소 | 설명 |
|---|---|
| DAG (Workflow) | Task들의 실행 순서를 정의하는 그래프 |
| Operator | 개별 작업을 정의하는 단위 (ex: BashOperator, PythonOperator, SQLOperator 등) |
| Task | DAG 내에서 실행되는 개별 작업 |
| Scheduler | DAG 실행을 스케줄링하고 Task를 트리거하는 역할 |
| Executor | Task 실행을 담당하는 엔진 (LocalExecutor, CeleryExecutor, KubernetesExecutor 등) |
| Web UI | DAG 및 Task 상태를 모니터링할 수 있는 웹 기반 대시보드 |
Apache Airflow의 공식 Docker 이미지에는 일반 버전(Full)과 Slim 버전이 있다. 기본적으로 포함된 패키지의 수와 이미지 크기에서 차이가 난다. Airflow를 처음 사용하거나 빠르게 테스트하고 싶을 때나
대부분의 기능을 바로 사용하고 싶을 때 일반 버전을 사용한다. 운영 환경에서 불필요한 패키지를 줄이고, 보안 이슈로 최소한의 패키지만 포함한 최적화된 환경을 구성할 때 slim 버전을 사용한다.
Airflow는 기본적으로 SQLite를 사용하며, 로컬 테스트 용도로만 적합하다. airflow.cfg에서 sql_alchemy_conn = sqlite:////path/to/airflow.db와 같은 설정이 기본값이다.
SQLite는 단일 프로세스에서만 안정적으로 동작하며, 여러 개의 Airflow 서비스(Pod, Container)와 공유할 수 없다. 운영 환경에서는 다중 인스턴스 지원을 위해 PostgreSQL 또는 MySQL을 사용해야 한다.
프로덕션 환경에서는 PostgreSQL 또는 MySQL을 사용해야 한다. PostgreSQL을 사용하려면 직접 설치하고 설정해야 한다.
| 도구 | 특징 | 적합한 경우 |
|---|---|---|
| Argo Workflows | DAG 기반 Kubernetes 워크플로우, UI 제공 | Kubernetes에서 복잡한 배치를 운영하고 싶은 경우 |
| Prefect | Python 코드 기반, Airflow보다 가벼움 | 개발 친화적인 워크플로우 관리가 필요한 경우 |
| KubeCron | CronJob 개선, 모니터링/재시도 기능 제공 | 기존 CronJob을 유지하면서 기능을 확장하고 싶은 경우 |
| Kestra | YAML 기반 설정, UI 제공 | 간편하게 Kubernetes 배치를 운영하고 싶은 경우 |
운영환경에는 Helm 사용을 통해 PostgreSql 등으로 메타 데이터베이스를 교체하고, REDIS 사용을 추천한다. 아래는 Airflow 테스트를 위해 약식 설치 방식이다.
kubectl create namespace airflow
kubectl apply -f airflow-deploy.yml
kubectl apply -f airflow-svc.yml
airflow-svc.yml
apiVersion: v1
kind: Service
metadata:
name: airflow
namespace: airflow
spec:
selector:
app: airflow
ports:
- protocol: TCP
port: 8080
targetPort: 8080
type: NodePort
airflow-deploy.yml
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
namespace: airflow
spec:
replicas: 1 # SQLite는 1개 Replica만 가능
selector:
matchLabels:
app: airflow
template:
metadata:
labels:
app: airflow
spec:
securityContext:
fsGroup: 50000
containers:
- name: airflow
image: apache/airflow:latest
securityContext:
runAsUser: 50000
imagePullPolicy: IfNotPresent
env:
- name: AIRFLOW__CORE__EXECUTOR
value: "SequentialExecutor"
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
value: "sqlite:////opt/airflow/airflow.db"
volumeMounts:
- name: airflow-data
mountPath: /opt/airflow
command: ["bash", "-c"]
args:
- |
airflow db init &&
airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com &&
airflow webserver &&
airflow scheduler
volumes:
- name: airflow-data
persistentVolumeClaim:
claimName: mypvc
vagrant@slave2:~$ k get svc -n airflow
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
airflow NodePort 10.98.56.159 <none> 8080:30838/TCP 16m
웹콘솔은 위의 노드포트를 통해 접속 가능하다. 기본 계정은 admin/admin 이다.
vagrant@slave2:~$ k exec airflow-6bf7b46c79-scvks -n airflow -- airflow version
2.10.5
airflow@airflow-6bf7b46c79-scvks:/opt/airflow$ airflow config get-value core dags_folder
/opt/airflow/dags
airflow@airflow-6bf7b46c79-scvks:/opt/airflow$ airflow dags list
dag_id | fileloc | owners | is_paused
==========+========================+=========+==========
hello_dag | /opt/airflow/dags/h.py | airflow | True
airflow@airflow-5db85d78ff-x8tqw:/opt/airflow/dags$ airflow dags show hello_dag
[2025-03-10T11:02:49.242+0000] {dagbag.py:588} INFO - Filling up the DagBag from /opt/airflow/dags
digraph hello_dag {
graph [label=hello_dag labelloc=t rankdir=LR]
print_date [color="#000000" fillcolor="#f0ede4" label=print_date shape=rectangle style="filled,rounded"]
print_hello [color="#000000" fillcolor="#f0ede4" label=print_hello shape=rectangle style="filled,rounded"]
print_date -> print_hello
}
airflow@airflow-5db85d78ff-x8tqw:/opt/airflow/dags$ cat $AIRFLOW_HOME/logs/scheduler/latest/*.log | grep "DAG"
[2025-03-10T11:01:25.045+0000] {processor.py:925} INFO - DAG(s) 'hello_dag' retrieved from /opt/airflow/dags/h.py
[2025-03-10T11:01:25.138+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.137+0000] {override.py:1912} INFO - Created Permission View: can read on DAG:hello_dag
[2025-03-10T11:01:25.150+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.150+0000] {override.py:1912} INFO - Created Permission View: can edit on DAG:hello_dag
[2025-03-10T11:01:25.159+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.158+0000] {override.py:1912} INFO - Created Permission View: can delete on DAG:hello_dag
[2025-03-10T11:01:25.174+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.174+0000] {override.py:1912} INFO - Created Permission View: can read on DAG Run:hello_dag
[2025-03-10T11:01:25.184+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.183+0000] {override.py:1912} INFO - Created Permission View: menu access on DAG Run:hello_dag
[2025-03-10T11:01:25.192+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.192+0000] {override.py:1912} INFO - Created Permission View: can create on DAG Run:hello_dag
[2025-03-10T11:01:25.202+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.202+0000] {override.py:1912} INFO - Created Permission View: can delete on DAG Run:hello_dag
[2025-03-10T11:01:25.202+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.202+0000] {dag.py:3239} INFO - Sync 1 DAGs
[2025-03-10T11:01:25.211+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:25.211+0000] {dag.py:3262} INFO - Creating ORM DAG for hello_dag
[2025-03-10T11:01:56.055+0000] {processor.py:925} INFO - DAG(s) 'hello_dag' retrieved from /opt/airflow/dags/h.py
[2025-03-10T11:01:56.073+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:01:56.072+0000] {dag.py:3239} INFO - Sync 1 DAGs
[2025-03-10T11:02:27.136+0000] {processor.py:925} INFO - DAG(s) 'hello_dag' retrieved from /opt/airflow/dags/h.py
[2025-03-10T11:02:27.150+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:02:27.150+0000] {dag.py:3239} INFO - Sync 1 DAGs
[2025-03-10T11:02:58.040+0000] {processor.py:925} INFO - DAG(s) 'hello_dag' retrieved from /opt/airflow/dags/h.py
[2025-03-10T11:02:58.057+0000] {logging_mixin.py:190} INFO - [2025-03-10T11:02:58.057+0000] {dag.py:3239} INFO - Sync 1 DAGs
h.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 6, 29),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'hello_dag',
default_args=default_args,
description='Hello DAG',
schedule_interval=timedelta(days=1),
)
task1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
task2 = BashOperator(
task_id='print_hello',
bash_command='echo "Hello World"',
dag=dag,
)
task1 >> task2
vagrant@slave2:~$ k get po -n airflow
NAME READY STATUS RESTARTS AGE
airflow-6bf7b46c79-scvks 1/1 Running 1 (33m ago) 26h
kubectl cp hello.py airflow-6bf7b46c79-scvks:/opt/airflow/dags -n airflow
위에서 복사한 DAG가 웹콘솔에 나타나는 것을 확인한다.
