데이터를 다루는 일을 하게 되면서, 기존의 데이터 수집 프로세스를 유지보수하는 것뿐만 아니라 내가 직접 새로운 데이터를 수집할 일도 생겼다.
이 과정에서 대부분의 스크립트는 기존 프로세스의 내용을 참고하여 작성했는데, 내가 생각하기에 가장 개선이 필요한 부분은 crontab을 이용한다는 점이었다.
crontab을 이용하면 "주기적으로 프로세스를 실행"할 수 있지만, 프로세스 내 역할을 분리할 필요가 있었고 실행한 모든 프로세스에 대해 통합된 로그에서 오류를 찾아야 한다는 번거로움이 있었다.
Airflow는 스크립트 내에서 Task라는 단위로 업무를 분리할 수 있고, Task마다 실행 성공 여부와 함께 각각의 로그도 확인할 수 있다는 장점이 있어서 사용하기로 결심했다.
mkdir airflow && cd airflow
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/[version]/docker-compose.yaml'
가장 먼저 airflow 디렉터리를 생성하고, Docker Compose 파일을 생성했다.
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
airflow의 하위 디렉터리를 생성하고 권한을 부여했다.
docker-compose up airflow-init
docker-compose up -d
airflow를 초기화하면 다음과 같은 화면을 확인할 수 있다.


기존 프로세스에서는 데이터를 비동기 수집하고, 수집한 데이터를 가공한 후에 적재하도록 작성했다. 하나로 통합된 데이터 수집, 가공, 적재 과정에서는 오류 발생 위치를 한 번에 파악하기 어려워 오류에 대한 모니터링 수준이 현저히 떨어졌고 오류 발견부터 수정까지 가는 과정에 번거로움이 있었다.
Airflow에서는 DAG(Directed Acyclic Graph)를 사용하는데, DAG는 작업을 순서에 맞게 진행할 수 있도록 한다.
1번 Task는 데이터를 수집한 후에 적재하도록, 2번 Task는 수집한 데이터를 가공하고 2차 적재하도록 작성했다.
스크립트 내에서 DAG를 정의할 때, Task 의존성에 대해 작성할 수 있다.
내가 작성한 DAG에서는 task1 >> task2라고 의존성에 대해 작성했는데, 이는 task1이 성공적으로 실행했을 경우에만 task2가 실행된다.
이처럼 DAG를 이용한다면 하나의 프로세스에서도 업무를 구분하여 실행할 수 있고, Task의 성공 여부에 따라 이후 Task 실행 여부를 판단하여 추가적인 오류 발생을 막을 수 있다.
airflow/
├── dags/
│ ├── dag.py # DAG 정의 파일
├── plugins/ # 커스텀 모듈 또는 플러그인 위치
│ ├── custom_module.py # 직접 작성한 모듈
├── logs/ # Airflow 로그 디렉터리
├── docker-compose.yaml # Docker Compose 설정 파일
Airflow를 이용하기 위한 DAG를 작성하는 과정에서, 기존에 사용하고 있던 Custom Module을 사용할 일이 있었다.
Airflow는 위와 같은 디렉터리 구조를 가지며, Custom Module은 plugins 하위에 두고 import 할 수 있다.

http://localhost:8080
작성한 스크립트를 ~/airflow/dags/ 경로에 두고, docker-compose를 실행하면 Airflow UI에 접근할 수 있다.
Airflow 브라우저에 접속하기 위해서는 로그인이 필요한데, 스크립트에 작성한 DAG 기본 설정의 owner에 따라 브라우저(localhost:8080)에 접속했을 때 DAG가 보이지 않을 수 있다.
나는 docker-compose.yaml 파일에 airflow 웹서버를 실행할 때, admin 계정을 생성하도록 설정했기 때문에, 테스트 과정에서는 owner를 admin으로 사용했다.
# DAG 정의
default_args = {
'owner': 'admin',
'depends_on_past': False,
'email_on_failure': True,
'email': ['test@ma.il'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
수정한 내용을 반영하여 docker-compose를 재실행한다.
docker-compose restart

접속한 Airflow 브라우저에서 DAG를 활성화하여 업무를 실행할 수 있다.

내가 추가한 Custom Module에서는 pymysql을 import하여 사용하고 있는데, 이 부분에서 오류가 발생했다.
이를 해결하기 위해서는 Dockerfile에 pymysql을 설치하도록 작성하고, docker-compose.yaml 파일에서는 Dockerfile을 사용할 수 있도록 작성했다.
FROM apache/airflow:[version]
RUN pip install pymysql
Dockerfile에서는 사용할 베이스 airflow 이미지를 설정하고, pymysql을 설치하도록 작성한다.
services:
airflow-webserver:
build: .
...
Dockerfile과 docker-compose.yaml 파일을 같은 디렉터리에 위치하게 하면, build: .를 통해 docker-compose.yaml 파일이 위치한 폴더에서 Dockerfile을 찾아 사용한다.
docker-compose up --build
수정한 내용을 반영하기 위해 docker-compose를 빌드하여 실행한다.
Airflow 브라우저에서 DAG를 활성화한 후에 실행했는데, queued 상태에서 아무 일도 일어나지 않았다.

airflow의 웹 서버와 스케줄러 컨테이너가 모두 실행되고 있었기 때문에 원인을 바로 파악할 수 없었는데, 결과적으로 원인은 스케줄러가 웹 서버에서 실행되고 있지 않아서 발생하는 오류였다.
docker exec -it <webserver-container-name> bash
실행 중인 airflow 웹 서버 컨테이너에 접속한다.
airflow scheduler
접속한 컨테이너에서 스케줄러를 실행한다.

웹 서버에서 스케줄러를 실행한 이후에는 DAG를 Trigger할 수 있었다.

처음에는 crontab을 이용해서도 주기적인 프로세스 실행이 가능한데, Airflow를 이용하는 이유가 무엇일지 궁금해서 사용하게 되었다.
실제로 사용해 보니, 브라우저를 이용하여 쉽게 모니터링할 수 있고 다양한 설정을 직관적으로 확인하여 적용할 수 있다는 장점이 있었다.
특히나 Task의 의존성을 다이어그램으로 확인할 수 있고, Task의 성공 여부와 실행 로그를 개별로 확인할 수 있다는 점이 좋았다.
아직 Airflow와 브라우저에 대해 모든 것을 알고 있는 것은 아니지만, 가장 최근에 구축한 데이터 파이프라인에 Airflow를 도입하면서 앞으로도 데이터 수집과 관리에 대해 효과적인 방법을 찾아가야겠다고 생각했다.