LocalExecutor
를 사용했습니다. Celery Executor
와 kubernetes Executor
가 있는데 저는 이 중 Celery Executor
를 사용, Master Node와 Worker Node로 분리해서 Cluster를 구성하기로 결정했습니다.Celery Executor는 Task를 메시지 브로커에 전달하고, Celery Worker가 Task를 가져가서 실행하는 방식입니다. Worker 수를 스케일아웃 할 수 있다는 장점이 있지만, 메시지 브로커를 따로 관리해야하고 워커 프로세스에 대한 모니터링도 필요하다는 단점이 있습니다.
airflow celery worker -H worker_name -q queue_name
으로 실행1. Web server –> Workers : 작업 실행 로그 가져오기
2. Web server –> DAG files : DAG 구조 공개
3. Web server –> Database : 작업 상태 가져오기
4. Workers –> DAG files : DAG 구조 공개 및 작업 실행
5. Workers –> Database : 연결 구성, 변수 및 XCOM에 대한 정보를 가져오고 저장합니다.
6. Workers –> Celery’s result backend : 작업 상태 저장
7. Workers –> Celery’s broker : 실행 명령 저장
8. Scheduler –> DAG files : DAG 구조 공개 및 작업 실행
9. Scheduler –> Database : DAG 실행 및 관련 작업 저장
10. Scheduler –> Celery’s result backend : 완료된 작업의 상태에 대한 정보를 가져옵니다.
11. Scheduler –> Celery’s broker : 실행할 명령을 입력합니다.
Server | master | worker01 | worker02 | worker03 |
---|---|---|---|---|
OS | centos7 | centos7 | centos7 | centos7 |
Disk Size | 1000G | 1000G | 1000G | 1000G |
Memory | 32G | 16G | 16G | 16G |
Processors | 12 | 12 | 12 | 12 |
※ 이 3가지(Docker, Docker-compose, Python) 소프트웨어는 모든 Server에 필히 설치되어 있어야 합니다.
$ mkdir ~/airflow_master ~/airflow_master/dags ~/airflow_master/logs ~/airflow_master/plugins
$ sudo chmod 777 ~/airflow_master/dags ~/airflow_master/logs ~/airflow_master/plugins
$ sudo chown (유저):root ~/airflow_master/dags ~/airflow_master/logs ~/airflow_master/plugins
./dags
: DAG 파일을 집어 넣을 곳
./logs
: task와 scheduler의 log를 넣는 곳
./plugins
: 커스텀 플러그인을 넣는 곳
$ cd ~/airflow_master
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.0/docker-compose.yaml'
version: '3' # docker-compose 버전입니다. 작성일 기준으로 3.0을 권장합니다.
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.0}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: mysql+mysqldb://airflow:airflow@mysql:3306/airflow
# For backward compatibility, with Airflow <2.3
AIRFLOW__CORE__SQL_ALCHEMY_CONN: mysql+mysqlconnector://airflow:airflow@mysql:3306/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+mysql://airflow:airflow@mysql:3306/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__WEBSERVER__SECRET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
mysql:
condition: service_healthy
AIRFLOW__CORE__EXECUTOR
: 어떤 Executor를 사용할지 지정해줍니다. 저는 Celery를 사용할 것입니다.AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
: airflow에서 Database로 사용할 옵션 정보 입니다. 저는 mysql로 하였습니다.AIRFLOW__CORE__SQL_ALCHEMY_CONN
: Airflow가 해당 정보의 데이터베이스를 인지합니다.AIRFLOW__CELERY__RESULT_BACKEND
: Celery Worker가 Airflow metaStore와 통신하기위해 설정해주는 정보 입니다. AIRFLOW__CELERY__BROKER_URL
: broker에 대한 airflow 설정정보 입니다. 이 설정정보로 스케줄러가 큐에 메시지를 보낼 수 있게 됩니다. 저는 redis를 연결해주었습니다.AIRFLOW__CORE__FERNET_KEY
: FERNET_KEY는 db 안에서의 암호화된 통신을 위한 key로 Master와 Worker가 동일한 값으로 지정해야 합니다.AIRFLOW__WEBSERVER__SECRET_KEY
: Airflow의 웹 서버인 Flask가 사용할 session secret key입니다. 이 값은 Master와 Worker가 동일한 값으로 지정해야 합니다.AIRFLOW__CORE__LOAD_EXAMPLES
: 이 값을 True로 하면 예제 Dags이 생성됩니다. 저는 이 값을 False로 설정했습니다._PIP_ADDITIONAL_REQUIREMENTS
: airflow의 pip requirements를 설치해줍니다.services:
mysql: # db는 mysql로 커스터 마이징 했습니다.
container_name: mysql
image: mysql:5.7 # mysql5.7버전 공식 이미지를 가져옵니다.
ports: # port를 열어 줍니다.
- 3306:3306
environment: # db 로그인 설정을 해줍니다.
MYSQL_ROOT_PASSWORD: root
MYSQL_USER: airflow
MYSQL_PASSWORD: airflow
MYSQL_DATABASE: airflow
cap_add:
- SYS_NICE
volumes: # db를 마운트 해줍니다.
- mysql-db-volume:/var/lib/mysql
command: # 명령어 실행
- --character-set-server=utf8mb4
- --collation-server=utf8mb4_unicode_ci
- --default-authentication-plugin=mysql_native_password
- --explicit_defaults_for_timestamp=1
- --lower-case-table-names=1
- --sql-mode=ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
healthcheck:
test: ["CMD", "mysqladmin" ,"ping", "-h", "localhost"]
timeout: 20s
retries: 10
redis: # mq를 담당할 redis입니다.
image: redis:latest # 레디스 최신버전 공식 이미지를 가져옵니다.
ports:
- 6379:6379 # port를 열어 줍니다.
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
#airflow-worker: # 실제 작업을 수행하는 worker를 띄우는 컨테이너입니다.
# <<: *airflow-common
# command: celery worker
# healthcheck:
# test:
# - "CMD-SHELL"
# - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
# interval: 10s
# timeout: 10s
# retries: 5
# environment:
# <<: *airflow-common-env
# # Required to handle warm shutdown of the celery workers properly
# # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
# DUMB_INIT_SETSID: "0"
# restart: always
# depends_on:
# <<: *airflow-common-depends-on
# airflow-init:
# condition: service_completed_successfully
docker-compose up -d --scale airflow-worker=(실행 할 worker의 갯수)
를 해주시면 됩니다. airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airflow_version_comparable )); then
echo
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
echo
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- .:/sources
flower:
<<: *airflow-common
command: celery flower
profiles:
- flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
volumes:
mysql-db-volume:
version: '3'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.0}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: mysql+mysqldb://airflow:airflow@mysql:3306/airflow
# For backward compatibility, with Airflow <2.3
AIRFLOW__CORE__SQL_ALCHEMY_CONN: mysql+mysqlconnector://airflow:airflow@mysql:3306/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+mysql://airflow:airflow@mysql:3306/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__WEBSERVER__SECRET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
mysql:
condition: service_healthy
services:
mysql:
container_name: mysql
image: mysql:5.7
ports:
- 3306:3306
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_USER: airflow
MYSQL_PASSWORD: airflow
MYSQL_DATABASE: airflow
cap_add:
- SYS_NICE
volumes:
- mysql-db-volume:/var/lib/mysql
command: # 명령어 실행
- --character-set-server=utf8mb4
- --collation-server=utf8mb4_unicode_ci
- --default-authentication-plugin=mysql_native_password
- --explicit_defaults_for_timestamp=1
- --lower-case-table-names=1
- --sql-mode=ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
healthcheck:
test: ["CMD", "mysqladmin" ,"ping", "-h", "localhost"]
timeout: 20s
retries: 10
redis:
image: redis:latest
ports:
- 6379:6379
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
#airflow-worker:
# <<: *airflow-common
# command: celery worker
# healthcheck:
# test:
# - "CMD-SHELL"
# - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
# interval: 10s
# timeout: 10s
# retries: 5
# environment:
# <<: *airflow-common-env
# # Required to handle warm shutdown of the celery workers properly
# # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
# DUMB_INIT_SETSID: "0"
# restart: always
# depends_on:
# <<: *airflow-common-depends-on
# airflow-init:
# condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airflow_version_comparable )); then
echo
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
echo
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- .:/sources
airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
# Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
command:
- bash
- -c
- airflow
flower:
<<: *airflow-common
command: celery flower
profiles:
- flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
volumes:
mysql-db-volume:
$ cd ~/airflow_master
$ sudo docker compose up airflow-init
$ sudo docker compose up -d
$ sudo docker compose up -d flower
airflow-scheduler 컨테이터에 접속합니다.
$ docker ps # airflow-scheduler 컨테이너 이름 확인
$ docker exec -it (airflow-scheduler 컨테이터 이름) bash
python에 접속해서 다음 커맨드로 FERNET_KEY를 획득 합니다.
$ python
>>> from cryptography.fernet import Fernet
>>> FERNET_KEY = Fernet.generate_key().decode()
>>> print(FERNET_KEY)
46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
python에 접속해서 다음 커맨드로 SECRET_KEY를 획득 합니다.
$ python
>>> import os
>>> print(os.urandom(16))
b'\xd0;G\xd1g()\xacU\x0ed\xd7\xban\xc8\xb0
획득한 FERNET_KEY와 SECRET_KEY를 위의 docker-compose.yaml의 AIRFLOW__CORE__FERNET_KEY
와 AIRFLOW__WEBSERVER__SECRET_KEY
에 각각 집어 넣습니다.
docker-compose.yaml의 변경 사항을 컨테이너에 적용합니다.
$ sudo docker compose up --build --force-recreate -d airflow-init
$ sudo docker compose up --build --force-recreate -d
$ sudo docker compose up --build --force-recreate -d flower
--build
: Build images before starting containers -> 변경된 이미지를 다시 build--force-recreate
: Recreate containers. docker-compose up을 하면 변경된 사항을 적용하여 컨테이너를 재생성하지만 up을 했을때에도 변경이 적용이 안되는 경우에 해당 옵션을 주어보자.-d
: Run containers in the backgroundWorker를 구성할 서버에서 Dockerfile을 구성할 디렉토리를 생성해줍니다.
$ mkdir ~/airflow_worker ~/airflow_worker/dags ~/airflow_worker/log ~/airflow_worker/plugins
$ chmod 777 -R ~/airflow_worker
$ cd ~/airflow_worker
$ vi Dockerfile
FROM ubuntu:latest
# env
ENV AIRFLOW_HOME="/root/airflow"
ENV AIRFLOW__WORKER__NAME="worker_node"
ENV AIRFLOW__MASTER__IP="Master IP"
ENV AIRFLOW__MYSQL__HOST=${AIRFLOW__MASTER__IP}:3306
ENV AIRFLOW__REDIS__HOST=${AIRFLOW__MASTER__IP}:6379
ENV AIRFLOW__CORE__EXECUTOR="CeleryExecutor"
ENV AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="mysql+mysqldb://airflow:airflow@${AIRFLOW__MYSQL__HOST}/airflow"
ENV AIRFLOW__CORE__SQL_ALCHEMY_CONN="mysql+mysqlconnector://airflow:airflow@${AIRFLOW__MYSQL__HOST}/airflow"
ENV AIRFLOW__CELERY__RESULT_BACKEND="db+mysql://airflow:airflow@${AIRFLOW__MYSQL__HOST}/airflow"
ENV AIRFLOW__CELERY__BROKER_URL="redis://:@${AIRFLOW__REDIS__HOST}/0"
ENV AIRFLOW__LOGGING__BASE_LOG_FOLDER="${AIRFLOW_HOME}/log"
ENV AIRFLOW__SCHEDULER__CHILD_PROCESS_LOG_DIRECTORY="${AIRFLOW_HOME}/log/scheduler"
ENV AIRFLOW__LOGGING__DAG_PROCESSOR_MANAGER_LOG_LOCATION="${AIRFLOW_HOME}/log/dag_processor_manager/dag_processor_manager.log"
ENV AIRFLOW__CORE__HOSTNAME_CALLABLE="airflow.utils.net.get_host_ip_address"
ENV AIRFLOW__CORE__FERNET_KEY=""
ENV AIRFLOW__WEBSERVER__SECRET_KEY=""
ENV AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION="true"
ENV AIRFLOW__CORE__LOAD_EXAMPLES="false"
ENV AIRFLOW__API__AUTH_BACKENDS="airflow.api.auth.backend.basic_auth"
ENV _PIP_ADDITIONAL_REQUIREMENTS="${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-google}"
# python3.7 설치 및 pip 설치
RUN apt-get update && apt-get install -y software-properties-common
RUN add-apt-repository ppa:deadsnakes/ppa
RUN apt-get update && apt-get install -y python3.7 python3-pip
RUN apt-get install -y python3.7-distutils
RUN python3.7 -m pip install pip
RUN apt-get update && apt-get install -y python3-distutils python3-setuptools
RUN python3.7 -m pip install pip --upgrade pip
RUN apt-get install python3.7-dev libmysqlclient-dev gcc -y
RUN python3.7 -m pip install cffi
# install airflow
RUN pip install apache-airflow[celery]==2.5.0
RUN pip install pymysql
RUN pip install psycopg2-binary
RUN pip install Redis
RUN pip install mysqlclient
RUN airflow db init
RUN mkdir ${AIRFLOW_HOME}/dags
RUN mkdir ${AIRFLOW_HOME}/plugins
# install vim
RUN apt-get install -y vim
# healthcheck
HEALTHCHECK --interval=10s --timeout=10s --retries=5 CMD airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"
AIRFLOW__MASTER__IP
: Master Node의 IP주소를 입력해줍니다.AIRFLOW__CORE__HOSTNAME__CALLABLE
: Master Node의 IP주소를 가져오게 하는건데 바로 IP주소를 입력하는것이 아닌 위와같은 방식으로 입력해야 인식이 됩니다. default는 socket.getfqdn으로 현재 컴퓨터의 Hostname을 입력하게됩니다. 해당 값이 세팅을 안할경우 Master Node Webserver에서 WorkerNode의 Log를 볼 수 없습니다.AIRFLOW__CORE__FERNET_KEY
: Master Node 설치의 5. 에서 얻은 FERNET_KEY 입력AIRFLOW__WEBSERVER__SECRET_KEY
: Master Node 설치의 5. 에서 얻은 SECRET_KEY 입력AIRFLOW__LOGGING_BASE_LOG_FOLDER
: Master 에서 Worker의 로그를 가져올때 logs폴더를 보는것이 아닌 log폴더를 보고있습니다. 그래서 위와같이 수정했고 그밑에 LOG_LOCATION, LOG_DIRECTORY 두개도 동일하게 logs에서 log로 바꿔주었습니다.$ cd ~/airflow_worker
$ docker build . -t worker # worker라는 이름의 이미지를 생성합니다.
$ docker run -d -it --restart=always --name worker1 -p 8080:8080 \
-v ~/airflow_worker/dags:/root/airflow/dags \
-v ~/airflow_worker/plugins:/root/airflow/plugins \
-v ~/airflow_worker/log:/root/airflow/log \
-v /etc/localtime:/etc/localtime:ro -e TZ=Asia/Seoul \
worker \
airflow celery worker -H worker1 -q queue1
-name
: worker X ( 저는 각각 서버에 따라 worker1,2,3 으로 만들 것 입니다.)-p 8080:8080
: 8080 port를 열어줍니다.-v ~/airflow_worker/dags:/root/airflow/dags
: dags폴더를 마운트시켜줍니다.-v ~/airflow_worker/plugins:/root/airflow/plugins
: plugins폴더를 마운트시켜줍니다.-v ~/airflow_worker/log:/root/airflow/log
: log폴더를 마운트시켜줍니다.-v /etc/localtime:/etc/localtime:ro -e TZ=Asia/Seoul
: Timeline을 Asia/Seoul로 지정해줍니다. airflow celery worker -H worker1 -q queue1
: airflow celery를 실행시켜줍니다. 저는 예시로 worker1과 queue1 이라고 하였습니다.-H
: celery worker 이름-q
: queue 이름Master Node
와 Worker Node
가 연결된 것을 볼 수 있습니다.안녕하세요. 두대의 pc로 master와 worker node를 구축하였습니다.
근데 dags가 master에 있는데. worker node에는 없다고 worker node가 task를 할당 받아 진행할 때 error가 뜹니다. 제 생각엔 master에 worker가 cluster로 등록될 때 master에 있는 file을 어떻게 잘 하면... mount해서 받아올 수 있을 것 같은데.. 방법이 있을까요??
요약하자면.
master와 worker가 다른 pc에 설치되어있는 상태에서 worker가 dags를 master로 부터 공유받아올 수 있는 방법이 궁금합니다.
내용 잘 보았습니다.
근데 한가지 질문이 있어 댓글남깁니다.
글 마지막에 있는 사진에 master / worker 이렇게 2가지의 celery가 있는데
master는 어떤 역활을 하는 celery인가요?
master설정에 있는 docker-compose 파일에는 설정하는 곳이 안보이는 것 같아서요...ㅠㅠ
그리고 한 서버 내에서 worker를 2개 이상 실행하려면 docker-compose 시 scale 옵션으로 늘리라 해주셨는데
이렇게 하면 worker가 헬스 체크하다가 죽어버리는데 이런 경험이 있으실까요??