AWS EC2를 통한 Airflow CeleryExcutor 구축기 1 (VPC, RDS, ElastiCache)
위 포스팅에 이어서 EC2에 대한 부분들을 정리해보겠습니다.
간단하게 표현하자면 보안을 위해 private subnet에 접근할 수 있도록 Bastion Host를 생성해주고, 이후 데이터 파이프라인이 동작하기 위한 Airflow 시스템을 EC2로 생성했습니다.
추가적으로, 분산 처리를 위한 CeleryExecutor 관련 설정 값들과 Airflow worker의 AutoScaling을 위한 AutoScalingGroup 설정까지 다뤄보겠습니다.
Private subnet에 위치한 Airflow 서버에 접근하기 위해
Public subnet에 구축한 Bastion Host입니다.
ubuntu 22.04 버전을 사용하였고,
단순히 터널링 용도로만 사용할 것이기 때문에 t2.micro로 최소사양 설정을 해주었습니다.
그리고 IP가 계속해서 변동되지 않게하기 위해,
Elastic IP 설정까지 해주었습니다.

Private subnet 내에 EC2 인스턴스를 생성하여 그 안에서 Airflow 서버를 구성하였습니다.
주최 측에서 허용한 최대치인 t3.medium으로 인스턴스를 생성해주었고요.
( t3.medium의 메모리 크기는 4GB )
인스턴스의 IAM 역할은 아래와 같이 부여해주었습니다.

또, 위의 RDS와 연결을 해주었기에 ec2-rds-4라는 보안 그룹이 추가된 것을 확인할 수 있습니다.

여기서 ec2-rds-4는 인바운드 규칙이 없고,
아웃바운드 규칙에 해당 RDS만을 허용하는 포트가 개방되어있습니다.

-> 우분투 자체 프로세스 기반 Airflow 구축 시도 실패
EC2 환경 :

# mac의 경우
# EC2 생성할 때 사용한 private key를 통해 원격 호스트 접속
ssh -i private-key.pem ubuntu@{baston_host_ip}
# 윈도우의 경우
# putty를 사용
sudo su -
cd /home/ubuntu/.ssh
# EC2 생성할 때 사용한 private key의 내용을 복사하여 baston host 내에서 생성
vi private-key.pem
# 읽기 권한 부여
chmod 400 private-key.pem
# 원격 호스트 접속
ssh -i private-key.pem ubuntu@{ec2_airflow_ip}
# Add Docker's official GPG key:
sudo apt-get update
sudo apt-get install ca-certificates curl gnupg
sudo install -m 0755 -d /etc/apt/keyrings
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
sudo chmod a+r /etc/apt/keyrings/docker.gpg
# Add the repository to Apt sources:
echo \
"deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu \
$(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt-get update
# Install the Docker packages
sudo apt-get install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
# Verify that the Docker Engine installation is successful
sudo docker run hello-world
# Add user to the Docker group (필수!)
sudo usermod -aG docker ubuntu
참고문서: https://docs.docker.com/engine/install/ubuntu/#install-using-the-repository
# docker-compose 설치
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
# 접근 권한 부여
sudo chmod +x /usr/local/bin/docker-compose
# 심볼릭 링크 설정
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose
# 설치 여부 확인
docker-compose --version
# 권한 부여
sudo chmod 666 /var/run/docker.sock
# psql 사용을 위한 client 설치
sudo apt-get install -y postgresql-client
# 설치 확인
psql --version
# RDS 접속
psql -U postgres -d postgres -h {RDS_endpoint} -p 5432
# Airflow 계정 및 DB 생성
postgres=# CREATE USER airflow PASSWORD 'airflow';
postgres=# CREATE DATABASE airflow;
# Airflow 계정에 권한 부여
postgres=# GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;
# 나가기
postgres=# \q
sudo apt update
# redis-cli를 사용하기 위해 redis-tools를 설치
sudo apt-get install redis-tools
# Elasticache - redis에 접속
redis-cli -h {Elasticache_endpoint에 접속, not port}
# 나가기
exit
Airflow 서비스를 ubuntu 사용자로 진행하면 보안 상 취약할 수 있으므로, 별도의 airflow 사용자 계정을 생성하여 진행하겠습니다.
sudo groupadd airflow
# airflow 계정의 홈디렉토리는 /var/lib/airflow로 설정
sudo useradd -s /bin/bash airflow -g airflow -d /var/lib/airflow -m
# Docker 설치 시에 진행했던 과정으로,
# Docker group에 airflow 계정을 추가해줘야합니다. (필수!)
sudo usermod -aG docker airflow
# airflow 계정으로 전환
sudo su airflow
Airflow 설치 및 Docker-compose 실행은 Airflow 계정 내에서 진행이 되야합니다.
# Airflow docker-compose.yaml
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
# airflow user 설정
# ( 컨테이너 내부에서 Airflow 관련 파일과 폴더의 소유자를 지정하기 위함 )
echo -e "AIRFLOW_UID=$(id -u)" > .env
# 관련 폴더 생성
mkdir -p ./dags ./logs ./plugins ./config ./data
# celery executor 사용을 위한 값 설정
vi docker-compose.yaml
Airflow에 dependencies를 추가하기 위해 requirements.txt를 만들어주었습니다.
requirements 내부에는 pip install을 할 패키지들을 담았습니다.
docker-compose를 실행할 때 이 dependencies를 적용한 Docker 이미지를 base로 사용해야하므로,
requirements를 설치하는 Dockerfile을 새로 정의해주었습니다.
requirements.txt
# Airflow 2.5.1과 호환성 여부를 확인하여 버전 지정
apache-airflow-providers-amazon==7.1.0
apache-airflow-providers-celery==3.1.0
apache-airflow-providers-postgres==5.4.0
apache-airflow-providers-redis==3.1.0
boto3==1.26.51
boto==2.49.0
botocore==1.29.51
celery==5.2.7
flower==1.2.0
gunicorn==20.1.0
kombu==5.2.4
imbalanced-learn==0.8.1
selenium
scikit-learn
imblearn
pymysql
Dockerfile
# Airflow 2.5.1에 dependencies를 적용한 이미지를 생성
FROM apache/airflow:2.5.1
COPY requirements.txt /requirements.txt
RUN pip install --user --upgrade pip
RUN pip install -r /requirements.txt
Celery Executor를 사용하므로 Worker를 여러 개를 사용할 수 있습니다.
또한 이 Worker들을 별도의 분리된 환경에서 실행시킬 수도 있습니다.
그러나, 이 여러 곳에 있는 Worker들을 총 집계하고 지시를 내릴 수 있는 메인 서버는 하나여야겠죠.
그 메인 서버에 대한 docker 구성을 docker-compose-main.yaml이라하고
worker를 scale out할 수 있는 docker 구성을 docker-compose-worker.yaml이라 하겠습니다.
docker-compose-main.yaml에 들어갈 서비스들은 다음과 같습니다.
Airflow init, scheduler, trigger, webserver, flower
이렇게 worker를 완전히 분리해도 괜찮지만 저는 Swap Memory 설정 후 메모리에 약간의 여유가 있었기에 여기에 Worker 한 개와 특정 task에 필요한 Chrome driver도 추가해주었습니다.
version: '3'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-extending_airflow:2.5.1}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
# Airflow webserver의 varaibles,connections 등의 Metadata를 저장하는 DB
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@rds_endpoint:5432/airflow
# For backward compatibility, with Airflow <2.3
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@rds_endpoint:5432/airflow
# Celery Executor의 task state를 기록하여 다수의 Worker로 분산 처리를 가능하게 해주는 DB
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@rds_endpoint:5432/airflow
# Worker가 가져갈 수 있도록 task들을 담아두는 Queue
AIRFLOW__CELERY__BROKER_URL: redis://redis_endpoint:6379/0
# log들을 S3에 원격으로 연결
AIRFLOW__CORE__REMOTE_LOGGING: 'true'
AIRFLOW__CORE__REMOTE_LOG_CONN_ID: s3_conn
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: s3://s3_bucket_name
# DB 암호화
AIRFLOW__CORE__FERNET_KEY: '...'
# DAG 생성 중에는 정지
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
# DAG 예제 생성 X
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
# 실험적인 부분으로 사용을 권장하지 않음 -> pip install을 쉽게 해주는 환경 변수
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
# DAGs의 임시 파일 저장소
- ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data
user: "${AIRFLOW_UID:-50000}:0"
services:
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 60s
retries: 5
restart: always
depends_on:
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 60s
retries: 5
environment:
<<: *airflow-common-env
C_FORCE_ROOT: 'true'
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airflow_version_comparable )); then
echo
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
echo
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- ${AIRFLOW_PROJ_DIR:-.}:/sources
airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
# Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
command:
- bash
- -c
- airflow
# You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
# or by explicitly targeted on the command line e.g. docker-compose up flower.
# See: https://docs.docker.com/compose/profiles/
flower:
<<: *airflow-common
command: celery flower
#profiles:
#- flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 120s
retries: 5
restart: always
depends_on:
airflow-init:
condition: service_completed_successfully
airflow-worker:
condition: service_healthy
airflow-scheduler:
condition: service_healthy
airflow-triggerer:
condition: service_healthy
selenium:
container_name: remote_chromedriver
image: seleniarm/standalone-chromium:latest
ports:
- 4444:4444
restart: always
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data
user: "${AIRFLOW_UID:-50000}:0"
주의사항 :
depends_on에 Redis와 Postgres가 빠졌으므로,
EC2 실행 시 RDS와 Elasticache가 먼저 동작하고 있음을 확인된 후에 실행해야합니다.
# Dockerfile을 기반으로 Docker image를 생성
docker build . --tag extending_airflow:2.5.1
docker compose -f docker-compose-main.yaml up airflow-init
docker compose -f docker-compose-main.yaml up -d
# flower가 에러가 났다면 아래 명령어로 별도 실행
docker compose -f docker-compose-main.yaml up -d flower
docker compose -f docker-compose-main.yaml down
어느 정도 시간 이후에
docker ps를 하여
healthy 상태가 된 것을 확인
DAG 상에서 Selenium을 동작시키려면 단순히 requirments로 등록해주는 경우,
Selenium을 사용하기 위해 크롬을 설치해줘야하는 부분에서 복잡한 문제가 발생합니다.
이와 같은 방법이 아닌 Docker container로 띄우는 방식을 팀원분이 알아내셔서 이를 반영했습니다.
docker-compose.yaml의 services에
selenium에 크롬 드라이버가 포함된 다중 아키텍쳐 이미지를 추가합니다.
# docker-compose.yaml에 selenium을 추가
services:
selenium:
container_name: remote_chromedriver
image: seleniarm/standalone-chromium:latest
ports:
- 4444:4444
restart: always
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data
네트워크 매핑 :
VPC를 선택 후 외부에서 접근할 수 있도록 해야하므로 public 영역을 선택해줍니다.보안 그룹 :
프로젝트를 참여하는 팀원들의 IP만 접근할 수 있도록 허용해두었습니다.리스너 : HTTP 80포트
대상 그룹 :
인스턴스 선택 ->
airflow 작업 중인 EC2 인스턴스를 선택 ->
Webserver가 8080포트를 사용하므로 8080포트로 라우팅단, healthcheck 부분에서 webserver에 정상적으로 연결이 됐을시 http 200이 아닌 http 302가 반환되므로 302로 설정해주어야합니다.
즉, ALB로 외부의 80포트 -> private EC2 공간의 8080포트를 포트포워딩
마지막으로, ALB에 등록된 DNS 주소로 접속
flower
webserver와 동일한 방식으로 같은 ALB에서 리스너를 추가해 포트번호 81 -> 5555로 포트포워딩을 해주었습니다.
주의사항 :
해당 EC2 인스턴스의 인바운드 규칙에 8080포트가 열려있는지, 5555포트가 열려있는지 확인해야합니다. 저는 EC2 인스턴스에 5555포트가 안열려있는 것을 모르고 엉뚱하게 ALB가 문제인지 Flower가 문제인지 찾느라 정말 오랜 시간 헤맸습니다.
-> 관련 포스팅 : Flower Troubleshooting - EC2 Airflow 구축기
Celery Executor를 사용한 주 목적인 분산 처리를 하기 위해서는 다수의 worker가 필요합니다.
그러나, 수행해야 할 Task가 많지도 않은데 무턱대고 Worker만 많이 생성해두는 것은 상당히 비효율적일 것입니다.
따라서 수행해야할 Task가 많으면 자동으로 scale out하여 Worker를 늘리고
수행해야할 Task가 줄어들면 자동으로 scale in하여 Worker가 줄어들게 설정해줄 것입니다.
이를 위해 EC2 Auto Scaling Group 사용했습니다.
Auto Scaling Group을 만들기에 앞서,
분리된 환경에서 worker만 실행시키기 위한 docker-compose-worker.yaml을 작성하겠습니다.
Worker 뿐 아니라 특정 task에 필요한 Chrome driver도 추가해주었습니다.
version: '3'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-extending_airflow:2.5.1}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
# Airflow webserver의 varaibles,connections 등의 Metadata를 저장하는 DB
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@rds_endpoint:5432/airflow
# For backward compatibility, with Airflow <2.3
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@rds_endpoint:5432/airflow
# Celery Executor의 task state를 기록하여 다수의 Worker로 분산 처리를 가능하게 해주는 DB
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@rds_endpoint:5432/airflow
# Worker가 가져갈 수 있도록 task들을 담아두는 Queue
AIRFLOW__CELERY__BROKER_URL: redis://redis_endpoint:6379/0
# log들을 S3에 원격으로 연결
AIRFLOW__CORE__REMOTE_LOGGING: 'true'
AIRFLOW__CORE__REMOTE_LOG_CONN_ID: s3_conn
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: s3://s3_bucket_name
# DB 암호화
AIRFLOW__CORE__FERNET_KEY: '...'
# DAG 생성 중에는 정지
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
# DAG 예제 생성 X
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
# 실험적인 부분으로 사용을 권장하지 않음 -> pip install을 쉽게 해주는 환경 변수
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
# DAGs의 임시 파일 저장소
- ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data
user: "${AIRFLOW_UID:-50000}:0"
services:
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 60s
retries: 5
environment:
<<: *airflow-common-env
C_FORCE_ROOT: 'true'
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
selenium:
container_name: remote_chromedriver
image: seleniarm/standalone-chromium:latest
ports:
- 4444:4444
restart: always
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data
user: "${AIRFLOW_UID:-50000}:0"