Airflow 2.0는 2020년 12월에 Release 되었습니다. 데이터 파이프라인 구축 시 운영을 도와주는 Airflow에 대해서 알아보도록 하겠습니다. 비슷한 도구는 Oozie, Luigi, Azkaban, Jenkins Pipeline 등이 있습니다.
Apache Airflow는 오픈 소스 워크 플로우 관리 플랫폼입니다. 2014년 10월 에어 비앤비에서 회사의 점점 복잡 해지는 워크 플로우를 관리하기위한 솔루션으로 시작했습니다. 위키백과
Airflow 아키텍처를 살펴보고 docker-compose로 서비스를 시작합니다.
실습 환경은 아래와 같습니다.
Docker : 20.10.2
Docker Compose : 1.27.4, build 40524192
Python : 3.9.0
Airflow : 2.0.1
docker-compose 디렉토리 구조 및 파일을 살펴보도록 하겠습니다.
airflow-2.0.1
├── docker-compose.yaml
├── dags
│ └── hello_world.py
├── logs
└── plugins
Airflow 아키텍처의 postgres, redis, webserver, scheduler, worker 서비스들을 구성 합니다.
version: '3'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.1}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__WEBSERVER__WORKERS: 1
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
airflow-scheduler:
<<: *airflow-common
command: scheduler
restart: always
airflow-worker:
<<: *airflow-common
command: celery worker
restart: always
airflow-init:
<<: *airflow-common
command: version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
flower:
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
volumes:
postgres-db-volume:
DAGs에 PythonOperator를 이용해서 hello world를 구현해봅니다.
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
def print_hello():
return 'Hello world!'
dag = DAG('hello_world', description='Simple tutorial DAG',
schedule_interval='1 * * * *',
start_date=datetime(2021, 2, 28), catchup=False)
dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)
hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag)
dummy_operator >> hello_operator
docker-compose를 실행하여 airflow 서비스를 시작합니다.
# init
mkdir ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
docker-compose up airflow-init
# Docker-Compose Run
docker-compose up
# Docker-Compose Stop
# docker-compose down --volumes --rmi all
Quick start > Running Airflow in Docker
현상 : Some workers seem to have died and gunicorn did not restart them as expected
원인/분석 : Worker 기본 4개가 기동되는데 기동 중 메모리가 부족 하여 webServer가 반복적으로 재기동되는 현상.
조치 : 메모리 증설 또는 Worker 기동 수를 조정합니다. (4 > 3 ~ 1)
"docker-compose.yaml"에 Webserver의 Workers수를 기본값 보다 낮은 값으로 지정 후 기동 합니다.
version: '3'
....
environment:
&airflow-common-env
......
AIRFLOW__WEBSERVER__WORKERS: 1 << 추가
http://localhost:8080/ 에 접속합니다. id/pw는 airflow/airflow를 입력합니다.
접속하면 DAGs 목록을 확인할수 있습니다.
hello world Dag의 실행 이력을 확인할수 있습니다. 1분 단위로 실행되록 설정되어있습니다.
github 에서 모든 코드를 확인할수 있습니다.
안녕하세요
질문이 있어서 댓글을 남기게되었습니다.