AirFlow 설치(Celery Cluster)

Q·2022년 12월 13일
3

개요

  • 처음 Airflow를 구성했을때 LocalExecutor를 사용했습니다.
  • 하지만 하나의 인스턴스에 실행시킨다면 인스턴스에 문제가 생겼을때 Airflow도 동작하지 않을 수 있는 문제가 있고, 인프라가 커졌을 때 확장이 힘든 문제가 생겼습니다.
  • 따라서 리소스가 필요할 때 worker의 갯수만 늘리면 되고, 리소스가 필요 없을 때 불필요하게 낭비하지 않고 worker의 갯수를 줄일 수 있는 클러스터로 구축하기로 결정했습니다.
  • 이에 적합한 Executor로는 Celery Executorkubernetes Executor가 있는데 저는 이 중 Celery Executor를 사용, Master Node와 Worker Node로 분리해서 Cluster를 구성하기로 결정했습니다.
  • Master node에는 airflow에서 공식적으로 제공하는 docker-compose celery를 사용 하고 Worker node에는 따로 Dockerfile을 만들어 사용할 것 입니다.

Celery Executor

Celery Executor는 Task를 메시지 브로커에 전달하고, Celery Worker가 Task를 가져가서 실행하는 방식입니다. Worker 수를 스케일아웃 할 수 있다는 장점이 있지만, 메시지 브로커를 따로 관리해야하고 워커 프로세스에 대한 모니터링도 필요하다는 단점이 있습니다.

구성

airflow webserver

  • airflow UI
  • workflow 상태 표시하고 실행, 재시작, 수동 조작, 로그 확인

airflow scheduler

  • 작업 기준이 충족되는지 여부를 확인
  • 종속 작업이 성공적으로 완료되었고, 예약 간격이 주어지면 실행할 수 있는 작업인지, 실행 조건이 충족되는지 등
  • 위 충족 여부가 DB에 기록되면, task들이 worker에게 선택되서 작업을 실행함

airflow celery worker

  • 할당된 작업을 실행하는 실질적인 일꾼
  • 여러개의 worker로 작업
  • default는 한개의 worker로 로컬에서 작업이 돌아감
  • celery를 따로 설치하지 않고, airflow 설치 후 airflow celery worker -H worker_name -q queue_name으로 실행

airflow celery flower

  • celery UI
  • worker 상태, queue 등 확인

Database

  • tasks, DAGs, 변수, connections 정보들 등의 상태에 대한 정보등 메타데이터 저장

redis

  • Key, Value 구조의 비정형 데이터를 저장하고 관리하기 위한 오픈 소스 기반의 비관계형 데이터 베이스 관리 시스템 (DBMS)
  • 데이터베이스, 캐시, 메세지 브로커로 사용되며 인메모리 데이터 구조를 가진 저장소
  • message broker를 redis로 사용
  • 실행할 명령을 queue에 저장

result backend

  • 완료된 명령의 상태 저장

통신 메커니즘 정리

1. Web server –> Workers : 작업 실행 로그 가져오기
2. Web server –> DAG files : DAG 구조 공개
3. Web server –> Database : 작업 상태 가져오기
4. Workers –> DAG files : DAG 구조 공개 및 작업 실행
5. Workers –> Database : 연결 구성, 변수 및 XCOM에 대한 정보를 가져오고 저장합니다.
6. Workers –> Celery’s result backend : 작업 상태 저장
7. Workers –> Celery’s broker : 실행 명령 저장
8. Scheduler –> DAG files : DAG 구조 공개 및 작업 실행
9. Scheduler –> Database : DAG 실행 및 관련 작업 저장
10. Scheduler –> Celery’s result backend : 완료된 작업의 상태에 대한 정보를 가져옵니다.
11. Scheduler –> Celery’s broker : 실행할 명령을 입력합니다.


설치

Server 구성

Servermasterworker01worker02worker03
OScentos7centos7centos7centos7
Disk Size1000G1000G1000G1000G
Memory32G16G16G16G
Processors12121212

전제 조건(Prerequisites)

1. 필수 소프트웨어 (Required Software)

  • Docker
  • Docker-compose
  • Python3.X(6,7,8)

2. 소프트웨어 설치 (Installing Software)

3. Master Node 구성

  • airflow scheduler
  • airflow webserver
  • airflow trigger
  • airflow flower
  • mysql
  • redis

4. Worker Node 구성

  • airflow worker

5. 서버의 접속한 계정에 sudo 권한

6. /etc 에 접근할 수 있는 권한

7. 사용한 hostname들은 (master, worker01, worker02, worker03) 은 예시 입니다.

※ 이 3가지(Docker, Docker-compose, Python) 소프트웨어는 모든 Server에 필히 설치되어 있어야 합니다.


Master Node 설치 (Docker-compose)

1. 사전 작업

  • docker-compose.yaml 파일을 받을 디렉토리에서 폴더를 생성해줍니다.
    $ mkdir ~/airflow_master ~/airflow_master/dags ~/airflow_master/logs ~/airflow_master/plugins
    $ sudo chmod 777 ~/airflow_master/dags ~/airflow_master/logs ~/airflow_master/plugins
    $ sudo chown (유저):root ~/airflow_master/dags ~/airflow_master/logs ~/airflow_master/plugins

    ./dags : DAG 파일을 집어 넣을 곳
    ./logs : task와 scheduler의 log를 넣는 곳
    ./plugins : 커스텀 플러그인을 넣는 곳

2. docker-compose.yaml 다운

  • 위에서 생성한 docker-compose.yaml 파일을 받을 디렉토리에 접속
    • $ cd ~/airflow_master
  • airflow documentation에 접속
  • airflow 요소들이 담겨져있는 최신 docker-compose.yaml 파일 다운로드
    • $ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.0/docker-compose.yaml'

3. docker-compose.yaml 커스터 마이징

  • 주요 부분들 위주로 설명 하겠습니다.
version: '3'  # docker-compose 버전입니다. 작성일 기준으로 3.0을 권장합니다.
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.0}
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: mysql+mysqldb://airflow:airflow@mysql:3306/airflow
    # For backward compatibility, with Airflow <2.3
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: mysql+mysqlconnector://airflow:airflow@mysql:3306/airflow   
    AIRFLOW__CELERY__RESULT_BACKEND: db+mysql://airflow:airflow@mysql:3306/airflow 
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__WEBSERVER__SECRET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    mysql:
      condition: service_healthy
  • AIRFLOW__CORE__EXECUTOR : 어떤 Executor를 사용할지 지정해줍니다. 저는 Celery를 사용할 것입니다.
  • AIRFLOW__DATABASE__SQL_ALCHEMY_CONN : airflow에서 Database로 사용할 옵션 정보 입니다. 저는 mysql로 하였습니다.
  • AIRFLOW__CORE__SQL_ALCHEMY_CONN : Airflow가 해당 정보의 데이터베이스를 인지합니다.
  • AIRFLOW__CELERY__RESULT_BACKEND : Celery Worker가 Airflow metaStore와 통신하기위해 설정해주는 정보 입니다.
  • AIRFLOW__CELERY__BROKER_URL : broker에 대한 airflow 설정정보 입니다. 이 설정정보로 스케줄러가 큐에 메시지를 보낼 수 있게 됩니다. 저는 redis를 연결해주었습니다.
  • AIRFLOW__CORE__FERNET_KEY : FERNET_KEY는 db 안에서의 암호화된 통신을 위한 key로 Master와 Worker가 동일한 값으로 지정해야 합니다.
  • AIRFLOW__WEBSERVER__SECRET_KEY : Airflow의 웹 서버인 Flask가 사용할 session secret key입니다. 이 값은 Master와 Worker가 동일한 값으로 지정해야 합니다.
  • AIRFLOW__CORE__LOAD_EXAMPLES : 이 값을 True로 하면 예제 Dags이 생성됩니다. 저는 이 값을 False로 설정했습니다.
  • _PIP_ADDITIONAL_REQUIREMENTS : airflow의 pip requirements를 설치해줍니다.

services: 
  mysql: # db는 mysql로 커스터 마이징 했습니다.			
    container_name: mysql
    image: mysql:5.7 	# mysql5.7버전 공식 이미지를 가져옵니다.
    ports:				# port를 열어 줍니다.  
      - 3306:3306
    environment:		# db 로그인 설정을 해줍니다.
      MYSQL_ROOT_PASSWORD: root
      MYSQL_USER: airflow
      MYSQL_PASSWORD: airflow
      MYSQL_DATABASE: airflow
    cap_add:
      - SYS_NICE
    volumes:			# db를 마운트 해줍니다.
      - mysql-db-volume:/var/lib/mysql
    command: # 명령어 실행
      - --character-set-server=utf8mb4
      - --collation-server=utf8mb4_unicode_ci
      - --default-authentication-plugin=mysql_native_password
      - --explicit_defaults_for_timestamp=1
      - --lower-case-table-names=1
      - --sql-mode=ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
    healthcheck:
            test: ["CMD", "mysqladmin" ,"ping", "-h", "localhost"]
            timeout: 20s
            retries: 10
  • airflow 공식 홈페이지에서는 postgresql을 사용하지만 저는 이 부분을 mysql로 커스터 마이징 했습니다.

  redis:	# mq를 담당할 redis입니다.
    image: redis:latest	# 레디스 최신버전 공식 이미지를 가져옵니다.
    ports:
      - 6379:6379	# port를 열어 줍니다.
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always
  • mq로 사용할 redis 입니다.
  • 6379 포트를 열어줍니다.

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully
  • airflow UI를 구성하는 webserver 부분 입니다.
  • 8080 포트를 열어줍니다.
  • redis, mysql, airflow-init에 의존성을 갖고 있습니다.

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully
  • airflow로 할당된 work들을 스케쥴링 해주는 scheduler입니다.
  • redis, mysql, airflow-init에 의존성을 갖고 있습니다.

  #airflow-worker: # 실제 작업을 수행하는 worker를 띄우는 컨테이너입니다. 
  #  <<: *airflow-common
  #  command: celery worker
  #  healthcheck:
  #    test:
  #      - "CMD-SHELL"
  #      - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
  #    interval: 10s
  #    timeout: 10s
  #    retries: 5
  #  environment:
  #    <<: *airflow-common-env
  #    # Required to handle warm shutdown of the celery workers properly
  #    # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
  #    DUMB_INIT_SETSID: "0"
  #  restart: always
  #  depends_on:
  #    <<: *airflow-common-depends-on
  #    airflow-init:
  #      condition: service_completed_successfully
  • airflow의 실질적 일꾼 worker 입니다.
  • 저는 master와 worker 서버를 분리하기 위해 이 부분을 주석 처리했습니다.
  • worker는 아래에서 Dockerfile로 따로 작성할 것 입니다.
  • 만약 한 서버에서 worker도 같이 띄우고 싶다면 이 부분의 주석을 제거하고
  • docker-compose up -d --scale airflow-worker=(실행 할 worker의 갯수)를 해주시면 됩니다.

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash

    command:
      - -c
      - |
        function ver() {
          printf "%04d%04d%04d%04d" $${1//./ }
        }
        airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
        airflow_version_comparable=$$(ver $${airflow_version})
        min_airflow_version=2.2.0
        min_airflow_version_comparable=$$(ver $${min_airflow_version})
        if (( airflow_version_comparable < min_airflow_version_comparable )); then
          echo
          echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
          echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
          echo
          exit 1
        fi
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
          echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
          echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
  
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: "0:0"
    volumes:
      - .:/sources
  • 서비스를 시작해주는 airflow-init 부분 입니다.

  flower:
    <<: *airflow-common
    command: celery flower
    profiles:
      - flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

volumes:
  mysql-db-volume:
  • celery worker들의 상황을 모니터링하는 web ui인 flower 입니다.
  • 5555 포트를 열어줍니다.
  • redis, mysql, airflow-init에 의존성을 갖고 있습니다.
전체 docker-compose.yaml 소스 코드
version: '3'
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.0}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: mysql+mysqldb://airflow:airflow@mysql:3306/airflow
    # For backward compatibility, with Airflow <2.3
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: mysql+mysqlconnector://airflow:airflow@mysql:3306/airflow   
    AIRFLOW__CELERY__RESULT_BACKEND: db+mysql://airflow:airflow@mysql:3306/airflow 
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__WEBSERVER__SECRET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    mysql:
      condition: service_healthy

services:
  mysql:
    container_name: mysql
    image: mysql:5.7
    ports:
      - 3306:3306
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_USER: airflow
      MYSQL_PASSWORD: airflow
      MYSQL_DATABASE: airflow
    cap_add:
      - SYS_NICE
    volumes:
      - mysql-db-volume:/var/lib/mysql
    command: # 명령어 실행
      - --character-set-server=utf8mb4
      - --collation-server=utf8mb4_unicode_ci
      - --default-authentication-plugin=mysql_native_password
      - --explicit_defaults_for_timestamp=1
      - --lower-case-table-names=1
      - --sql-mode=ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
    healthcheck:
            test: ["CMD", "mysqladmin" ,"ping", "-h", "localhost"]
            timeout: 20s
            retries: 10

  redis:
    image: redis:latest
    ports:
      - 6379:6379
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  #airflow-worker:
  #  <<: *airflow-common
  #  command: celery worker
  #  healthcheck:
  #    test:
  #      - "CMD-SHELL"
  #      - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
  #    interval: 10s
  #    timeout: 10s
  #    retries: 5
  #  environment:
  #    <<: *airflow-common-env
  #    # Required to handle warm shutdown of the celery workers properly
  #    # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
  #    DUMB_INIT_SETSID: "0"
  #  restart: always
  #  depends_on:
  #    <<: *airflow-common-depends-on
  #    airflow-init:
  #      condition: service_completed_successfully

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash

    command:
      - -c
      - |
        function ver() {
          printf "%04d%04d%04d%04d" $${1//./ }
        }
        airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
        airflow_version_comparable=$$(ver $${airflow_version})
        min_airflow_version=2.2.0
        min_airflow_version_comparable=$$(ver $${min_airflow_version})
        if (( airflow_version_comparable < min_airflow_version_comparable )); then
          echo
          echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
          echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
          echo
          exit 1
        fi
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
          echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
          echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version

    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: "0:0"
    volumes:
      - .:/sources

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
    command:
      - bash
      - -c
      - airflow

  flower:
    <<: *airflow-common
    command: celery flower
    profiles:
      - flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

volumes:
  mysql-db-volume:

4. docker-compose 실행

  • docker-compose.yaml 파일을 받은 디렉토리로 이동합니다.
    $ cd ~/airflow_master
  • airflow-init을 먼저 실행시킵니다.
    $ sudo docker compose up airflow-init
  • 나머지 image들도 전부 실행시켜줍니다.
    $ sudo docker compose up -d
  • 마지막으로 flower를 실행시켜줍니다.
    $ sudo docker compose up -d flower

5. FERNET_KEY 및 SECRET_KEY 생성

  • airflow-scheduler 컨테이터에 접속합니다.

    $ docker ps # airflow-scheduler 컨테이너 이름 확인
    
    $ docker exec -it (airflow-scheduler 컨테이터 이름) bash
  • python에 접속해서 다음 커맨드로 FERNET_KEY를 획득 합니다.

    $ python
    >>> from cryptography.fernet import Fernet 
    >>> FERNET_KEY = Fernet.generate_key().decode() 
    >>> print(FERNET_KEY)
    
    46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
  • python에 접속해서 다음 커맨드로 SECRET_KEY를 획득 합니다.

    $ python
    >>> import os 
    >>> print(os.urandom(16))
    
    b'\xd0;G\xd1g()\xacU\x0ed\xd7\xban\xc8\xb0
  • 획득한 FERNET_KEY와 SECRET_KEY를 위의 docker-compose.yaml의 AIRFLOW__CORE__FERNET_KEYAIRFLOW__WEBSERVER__SECRET_KEY에 각각 집어 넣습니다.

  • docker-compose.yaml의 변경 사항을 컨테이너에 적용합니다.

    $ sudo docker compose up --build --force-recreate -d airflow-init
    $ sudo docker compose up --build --force-recreate -d 
    $ sudo docker compose up --build --force-recreate -d flower
    • --build: Build images before starting containers -> 변경된 이미지를 다시 build
    • --force-recreate: Recreate containers. docker-compose up을 하면 변경된 사항을 적용하여 컨테이너를 재생성하지만 up을 했을때에도 변경이 적용이 안되는 경우에 해당 옵션을 주어보자.
    • -d: Run containers in the background

Worker Node 설치 (Dockerfile)

1. 사전 작업

  • Worker를 구성할 서버에서 Dockerfile을 구성할 디렉토리를 생성해줍니다.

    $ mkdir ~/airflow_worker ~/airflow_worker/dags ~/airflow_worker/log ~/airflow_worker/plugins
    
    $ chmod 777 -R ~/airflow_worker 

2. Dockerfile 생성

  • 위에서 생성한 Dockerfile을 생성 할 디렉토리에 접속
    $ cd ~/airflow_worker
  • vi로 Dockerfile 생성
    $ vi Dockerfile
Dockerfile 소스 코드
FROM ubuntu:latest

# env
ENV AIRFLOW_HOME="/root/airflow"
ENV AIRFLOW__WORKER__NAME="worker_node"

ENV AIRFLOW__MASTER__IP="Master IP"
ENV AIRFLOW__MYSQL__HOST=${AIRFLOW__MASTER__IP}:3306
ENV AIRFLOW__REDIS__HOST=${AIRFLOW__MASTER__IP}:6379
ENV AIRFLOW__CORE__EXECUTOR="CeleryExecutor"

ENV AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="mysql+mysqldb://airflow:airflow@${AIRFLOW__MYSQL__HOST}/airflow"
ENV AIRFLOW__CORE__SQL_ALCHEMY_CONN="mysql+mysqlconnector://airflow:airflow@${AIRFLOW__MYSQL__HOST}/airflow"
ENV AIRFLOW__CELERY__RESULT_BACKEND="db+mysql://airflow:airflow@${AIRFLOW__MYSQL__HOST}/airflow"
ENV AIRFLOW__CELERY__BROKER_URL="redis://:@${AIRFLOW__REDIS__HOST}/0"
ENV AIRFLOW__LOGGING__BASE_LOG_FOLDER="${AIRFLOW_HOME}/log"
ENV AIRFLOW__SCHEDULER__CHILD_PROCESS_LOG_DIRECTORY="${AIRFLOW_HOME}/log/scheduler"
ENV AIRFLOW__LOGGING__DAG_PROCESSOR_MANAGER_LOG_LOCATION="${AIRFLOW_HOME}/log/dag_processor_manager/dag_processor_manager.log"
ENV AIRFLOW__CORE__HOSTNAME_CALLABLE="airflow.utils.net.get_host_ip_address"
ENV AIRFLOW__CORE__FERNET_KEY=""
ENV AIRFLOW__WEBSERVER__SECRET_KEY=""
ENV AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION="true"
ENV AIRFLOW__CORE__LOAD_EXAMPLES="false"
ENV AIRFLOW__API__AUTH_BACKENDS="airflow.api.auth.backend.basic_auth"
ENV _PIP_ADDITIONAL_REQUIREMENTS="${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-google}"

# python3.7 설치 및 pip 설치
RUN apt-get update && apt-get install -y software-properties-common
RUN add-apt-repository ppa:deadsnakes/ppa
RUN apt-get update && apt-get install -y python3.7 python3-pip
RUN apt-get install -y python3.7-distutils
RUN python3.7 -m pip install pip
RUN apt-get update && apt-get install -y python3-distutils python3-setuptools
RUN python3.7 -m pip install pip --upgrade pip
RUN apt-get install python3.7-dev libmysqlclient-dev gcc -y
RUN python3.7 -m pip install cffi

# install airflow
RUN pip install apache-airflow[celery]==2.5.0 
RUN pip install pymysql
RUN pip install psycopg2-binary 
RUN pip install Redis
RUN pip install mysqlclient
RUN airflow db init
RUN mkdir ${AIRFLOW_HOME}/dags
RUN mkdir ${AIRFLOW_HOME}/plugins

# install vim
RUN apt-get install -y vim

# healthcheck
HEALTHCHECK --interval=10s --timeout=10s --retries=5 CMD airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"
  • Master Node에서 설정한 것들과 대부분의 환경변수가 겹치므로 중요한 것들만 설명하겠습니다.
  • AIRFLOW__MASTER__IP : Master Node의 IP주소를 입력해줍니다.
  • AIRFLOW__CORE__HOSTNAME__CALLABLE : Master Node의 IP주소를 가져오게 하는건데 바로 IP주소를 입력하는것이 아닌 위와같은 방식으로 입력해야 인식이 됩니다. default는 socket.getfqdn으로 현재 컴퓨터의 Hostname을 입력하게됩니다. 해당 값이 세팅을 안할경우 Master Node Webserver에서 WorkerNode의 Log를 볼 수 없습니다.
  • AIRFLOW__CORE__FERNET_KEY : Master Node 설치의 5. 에서 얻은 FERNET_KEY 입력
  • AIRFLOW__WEBSERVER__SECRET_KEY : Master Node 설치의 5. 에서 얻은 SECRET_KEY 입력
  • AIRFLOW__LOGGING_BASE_LOG_FOLDER : Master 에서 Worker의 로그를 가져올때 logs폴더를 보는것이 아닌 log폴더를 보고있습니다. 그래서 위와같이 수정했고 그밑에 LOG_LOCATION, LOG_DIRECTORY 두개도 동일하게 logs에서 log로 바꿔주었습니다.

3. Dockerfile 실행

  • Dockerfile이 위치한 디렉토리로 이동합니다.
    $ cd ~/airflow_worker
  • Dockerfile을 build 합니다.
    $ docker build . -t worker # worker라는 이름의 이미지를 생성합니다. 
  • 생성한 이미지를 run 시켜줍니다.
    $ docker run -d -it --restart=always --name worker1 -p 8080:8080 \
    -v ~/airflow_worker/dags:/root/airflow/dags \
    -v ~/airflow_worker/plugins:/root/airflow/plugins \
    -v ~/airflow_worker/log:/root/airflow/log \
    -v /etc/localtime:/etc/localtime:ro -e TZ=Asia/Seoul \
    worker \
    airflow celery worker -H worker1 -q queue1
    • -name : worker X ( 저는 각각 서버에 따라 worker1,2,3 으로 만들 것 입니다.)
    • -p 8080:8080 : 8080 port를 열어줍니다.
    • -v ~/airflow_worker/dags:/root/airflow/dags : dags폴더를 마운트시켜줍니다.
    • -v ~/airflow_worker/plugins:/root/airflow/plugins : plugins폴더를 마운트시켜줍니다.
    • -v ~/airflow_worker/log:/root/airflow/log : log폴더를 마운트시켜줍니다.
    • -v /etc/localtime:/etc/localtime:ro -e TZ=Asia/Seoul : Timeline을 Asia/Seoul로 지정해줍니다.
    • airflow celery worker -H worker1 -q queue1 : airflow celery를 실행시켜줍니다. 저는 예시로 worker1과 queue1 이라고 하였습니다.
    • -H : celery worker 이름
    • -q : queue 이름

flower 결과 확인

  • (Master Node IP):5555 로 접속하면 아래와 같이 Master NodeWorker Node가 연결된 것을 볼 수 있습니다.

Reference

profile
Data Engineer

5개의 댓글

comment-user-thumbnail
2023년 2월 22일

내용 잘 보았습니다.
근데 한가지 질문이 있어 댓글남깁니다.

글 마지막에 있는 사진에 master / worker 이렇게 2가지의 celery가 있는데
master는 어떤 역활을 하는 celery인가요?

master설정에 있는 docker-compose 파일에는 설정하는 곳이 안보이는 것 같아서요...ㅠㅠ
그리고 한 서버 내에서 worker를 2개 이상 실행하려면 docker-compose 시 scale 옵션으로 늘리라 해주셨는데
이렇게 하면 worker가 헬스 체크하다가 죽어버리는데 이런 경험이 있으실까요??

1개의 답글
comment-user-thumbnail
2024년 3월 16일

안녕하세요. 두대의 pc로 master와 worker node를 구축하였습니다.
근데 dags가 master에 있는데. worker node에는 없다고 worker node가 task를 할당 받아 진행할 때 error가 뜹니다. 제 생각엔 master에 worker가 cluster로 등록될 때 master에 있는 file을 어떻게 잘 하면... mount해서 받아올 수 있을 것 같은데.. 방법이 있을까요??

요약하자면.
master와 worker가 다른 pc에 설치되어있는 상태에서 worker가 dags를 master로 부터 공유받아올 수 있는 방법이 궁금합니다.

1개의 답글
comment-user-thumbnail
2024년 4월 18일

내용 감사합니다.

Master IP라는 것은 어떤 걸까요 ??

답글 달기