복잡한 워크플로우를 프로그래밍 방식으로 작성해서 스케줄링하고 모니터링 할 수 있는 플랫폼
어떠한 스크립트들을 스케줄링 할때 crontab, cloudwatch 등을 사용하는데 스크립트끼리 서로의 의존성이 생기게 되면 컨트롤하기 어렵고 문제가 생겼을 시에 디버깅을 확인하고 어디서 잘못됐는지 체킹하는 작업들이 필요해진다.
Airflow에서는 서로에 대한 의존성을 표현할수 있고 스크립트가 실패했을때 알람을 보내 확인하고 쉽게 수정 후 재시도 할 수 있고, 이전 날짜 작업이 실패했을때 그 날짜만 다시 실행하는 등 많은 문제를 해결할 수 있다.
크게 세가지 카테고리를 나눠서 생각할 수 있습니다.
airflow에대한 체크할 사항들
1. 환경 셋업이 시간과 노력이 많이 들어간다. 그렇기때문에 스크립트로 만들어서 사용하기도함
2. 워커의 스케일링을 하기위해 계속 관리를 해줘야한다.
3. 시스템 보안관리가 필요하다.
4. 수만은 디펜던시에대한 보안이나 이슈를 관리하고 학습하는데 걸리는시간
5. maintenance를 지속적으로 해줘야한다.
DAG는 유향 비순환 그래프라고 하며 에어플로우의 워크플로우는 python을 사용하여 작성할 수 있다. 하나의 DAG안에는 한개 이상의 TASK가 있으며, Task는 실제 실행시키는 작업
DAG 파일 = 워크플로우
python 스크립트로 작성
Operator로 task 정의
DAG 등록


에어플로우는 Operator를 사용하여 python, bash, aws, slack 등 다양한 동작을 실행시킬 수 있다. 예를 들어 BashOperator를 사용하면 bash 스크립트를 실행시킬 수 있게 되는것입니다.
작업 실행을 시켜주는 실행기, Executor의 종류를 실행방법에 따라 선택해서 사용하면 된다.
Airflow는 기본값으로 sqlite를 사용하며 이 경우 Executor는 SequentialExcutor를 사용하게 된다.
그리고 저는 파이썬 가상환경에 설치할 것이기 때문에 다음명령어로 가상환경 설치및 실행 해서 작업을 진행했습니다. (참고)
pip3 install virtualenv
python3 -m virtualenv airflow // venv파일 생성
source ./airflow/bin/activate // 가상환경 실행
참고 사이트
# Airflow needs a home. `~/airflow` is the default, but you can put it
# somewhere else if you prefer (optional)
export AIRFLOW_HOME=~/airflow
# Install Airflow using the constraints file
AIRFLOW_VERSION=2.4.1
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# For example: 3.7
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.4.1/constraints-3.7.txt
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
# The Standalone command will initialise the database, make a user,
# and start all components for you.
airflow standalone
# Visit localhost:8080 in the browser and use the admin account details
# shown on the terminal to login.
# Enable the example_bash_operator dag in the home page
순서대로 명령어 실행하면됩니다.
pip install wheel
2. 환경변수를 venv환경이 아닌 시스템에 등록을 해야합니다.
# airflow 예제 제거하기
load_example = True
# DAG 업데이트 시간 조정하기 (UI), 이 시간을 0으로 둘 경우 아주 많은 CPU를 사용하게 된다.
min_file_process_interval = 60
dag_dir_list_interval = 30
# 데이터베이스 연결하기
sql_alchemy_conn = mysql+pymysql://db이름:비밀번호@localhost/airflow
# Executor 설정하기
# SequentialExecutor(Default)를 사용하면 한 번에 하나의 작업만 처리할 수 있다.
executor = LocalExecutor
parallelism = 64
dag_concurrency = 32
mysql commend에서 다음명령어 실행
SET explicit_defaults_for_timestamp=1;
airflow를 실행하기전 DB초기화
airflow db init
초기화가 성공한 후 미리 만들어 뒀던 airflow db를 들어가보면 meta data를 저장하는 테이블들이 생성돼 있을것입니다.
airflow users create --username admin --firstname airflow --lastname 25g --role Admin --email fj2008@naver.com -p 1234
airflow webserver --port 8080