Airflow 2.0 setup with Docker

hansung.dev·2021년 2월 28일
1
post-thumbnail

Airflow 2.0는 2020년 12월에 Release 되었습니다. 데이터 파이프라인 구축 시 운영을 도와주는 Airflow에 대해서 알아보도록 하겠습니다. 비슷한 도구는 Oozie, Luigi, Azkaban, Jenkins Pipeline 등이 있습니다.

Apache Airflow는 오픈 소스 워크 플로우 관리 플랫폼입니다. 2014년 10월 에어 비앤비에서 회사의 점점 복잡 해지는 워크 플로우를 관리하기위한 솔루션으로 시작했습니다. 위키백과

Getting Started

Airflow 아키텍처를 살펴보고 docker-compose로 서비스를 시작합니다.

Basic Airflow architecture

실습 환경은 아래와 같습니다.

Docker : 20.10.2
Docker Compose : 1.27.4, build 40524192
Python : 3.9.0
Airflow : 2.0.1

docker-compose 디렉토리 구조 및 파일을 살펴보도록 하겠습니다.

File Directory with Docker-Compose

airflow-2.0.1
├── docker-compose.yaml
├── dags
│   └── hello_world.py
├── logs
└── plugins

docker-compose.yml

Airflow 아키텍처의 postgres, redis, webserver, scheduler, worker 서비스들을 구성 합니다.

version: '3'
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.1}
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__WEBSERVER__WORKERS: 1
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  depends_on:
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    ports:
      - 6379:6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    restart: always

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    restart: always

  airflow-init:
    <<: *airflow-common
    command: version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

volumes:
  postgres-db-volume:

dags/hello_world.py

DAGs에 PythonOperator를 이용해서 hello world를 구현해봅니다.

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

def print_hello():
    return 'Hello world!'

dag = DAG('hello_world', description='Simple tutorial DAG',
          schedule_interval='1 * * * *',
          start_date=datetime(2021, 2, 28), catchup=False)

dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)

hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag)

dummy_operator >> hello_operator

Setting up and running

Docker-Compose Run

docker-compose를 실행하여 airflow 서비스를 시작합니다.

# init
mkdir ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
docker-compose up airflow-init

# Docker-Compose Run
docker-compose up

# Docker-Compose Stop
# docker-compose down --volumes --rmi all

Quick start > Running Airflow in Docker

Trouble Shooting

현상 : Some workers seem to have died and gunicorn did not restart them as expected
원인/분석 : Worker 기본 4개가 기동되는데 기동 중 메모리가 부족 하여 webServer가 반복적으로 재기동되는 현상.
조치 : 메모리 증설 또는 Worker 기동 수를 조정합니다. (4 > 3 ~ 1)

"docker-compose.yaml"에 Webserver의 Workers수를 기본값 보다 낮은 값으로 지정 후 기동 합니다.

version: '3'
....
  environment:
    &airflow-common-env
    ......
    AIRFLOW__WEBSERVER__WORKERS: 1  << 추가

Connect to the Airflow

http://localhost:8080/ 에 접속합니다. id/pw는 airflow/airflow를 입력합니다.

접속하면 DAGs 목록을 확인할수 있습니다.

hello world Dag의 실행 이력을 확인할수 있습니다. 1분 단위로 실행되록 설정되어있습니다.

github 에서 모든 코드를 확인할수 있습니다.

profile
Data Engineer

1개의 댓글

comment-user-thumbnail
2021년 8월 3일

안녕하세요
질문이 있어서 댓글을 남기게되었습니다.

  • docker-compose up airflow-init <- airflow-init은 서비스 명일까요??
  • echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env 이 명령어가 의미하는 것은 먼가요??
답글 달기