Airflow를 이용한 데이터 파이프라인 구축하기

..·2025년 1월 29일

finder

목록 보기
16/23

데이터 파이프라인을 구축하면서 부족한 점을 보완하기 위해 airflow를 적용한 과정과 이 과정에서 마주한 오류에 대해서 작성해 보았다.

Airflow를 사용하려는 이유

데이터 파이프라인의 완성도

데이터를 다루는 일을 하게 되면서, 기존의 데이터 수집 프로세스를 유지보수하는 것뿐만 아니라 내가 직접 새로운 데이터를 수집할 일도 생겼다.
이 과정에서 대부분의 스크립트는 기존 프로세스의 내용을 참고하여 작성했는데, 내가 생각하기에 가장 개선이 필요한 부분은 crontab을 이용한다는 점이었다.

crontab을 이용하면 "주기적으로 프로세스를 실행"할 수 있지만, 프로세스 내 역할을 분리할 필요가 있었고 실행한 모든 프로세스에 대해 통합된 로그에서 오류를 찾아야 한다는 번거로움이 있었다.
Airflow는 스크립트 내에서 Task라는 단위로 업무를 분리할 수 있고, Task마다 실행 성공 여부와 함께 각각의 로그도 확인할 수 있다는 장점이 있어서 사용하기로 결심했다.


Airflow 설치

mkdir airflow && cd airflow
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/[version]/docker-compose.yaml'

가장 먼저 airflow 디렉터리를 생성하고, Docker Compose 파일을 생성했다.

mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env

airflow의 하위 디렉터리를 생성하고 권한을 부여했다.

docker-compose up airflow-init
docker-compose up -d

airflow를 초기화하면 다음과 같은 화면을 확인할 수 있다.


기존 Python 스크립트 수정하기

기존 프로세스에서는 데이터를 비동기 수집하고, 수집한 데이터를 가공한 후에 적재하도록 작성했다. 하나로 통합된 데이터 수집, 가공, 적재 과정에서는 오류 발생 위치를 한 번에 파악하기 어려워 오류에 대한 모니터링 수준이 현저히 떨어졌고 오류 발견부터 수정까지 가는 과정에 번거로움이 있었다.

Airflow에서는 DAG(Directed Acyclic Graph)를 사용하는데, DAG는 작업을 순서에 맞게 진행할 수 있도록 한다.
1번 Task는 데이터를 수집한 후에 적재하도록, 2번 Task는 수집한 데이터를 가공하고 2차 적재하도록 작성했다.

스크립트 내에서 DAG를 정의할 때, Task 의존성에 대해 작성할 수 있다.
내가 작성한 DAG에서는 task1 >> task2라고 의존성에 대해 작성했는데, 이는 task1이 성공적으로 실행했을 경우에만 task2가 실행된다.
이처럼 DAG를 이용한다면 하나의 프로세스에서도 업무를 구분하여 실행할 수 있고, Task의 성공 여부에 따라 이후 Task 실행 여부를 판단하여 추가적인 오류 발생을 막을 수 있다.


Custom Module 추가하기

airflow/
├── dags/
│   ├── dag.py                 # DAG 정의 파일
├── plugins/                   # 커스텀 모듈 또는 플러그인 위치
│   ├── custom_module.py       # 직접 작성한 모듈
├── logs/                      # Airflow 로그 디렉터리
├── docker-compose.yaml        # Docker Compose 설정 파일

Airflow를 이용하기 위한 DAG를 작성하는 과정에서, 기존에 사용하고 있던 Custom Module을 사용할 일이 있었다.
Airflow는 위와 같은 디렉터리 구조를 가지며, Custom Module은 plugins 하위에 두고 import 할 수 있다.


DAG 활성화하기

http://localhost:8080

작성한 스크립트를 ~/airflow/dags/ 경로에 두고, docker-compose를 실행하면 Airflow UI에 접근할 수 있다.

주의사항: owner를 부여한 계정으로 로그인하기

Airflow 브라우저에 접속하기 위해서는 로그인이 필요한데, 스크립트에 작성한 DAG 기본 설정의 owner에 따라 브라우저(localhost:8080)에 접속했을 때 DAG가 보이지 않을 수 있다.
나는 docker-compose.yaml 파일에 airflow 웹서버를 실행할 때, admin 계정을 생성하도록 설정했기 때문에, 테스트 과정에서는 owner를 admin으로 사용했다.

# DAG 정의
default_args = {
    'owner': 'admin',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['test@ma.il'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

수정한 내용을 반영하여 docker-compose를 재실행한다.

docker-compose restart

접속한 Airflow 브라우저에서 DAG를 활성화하여 업무를 실행할 수 있다.

🚨 Broken plugin Error

내가 추가한 Custom Module에서는 pymysqlimport하여 사용하고 있는데, 이 부분에서 오류가 발생했다.
이를 해결하기 위해서는 Dockerfilepymysql을 설치하도록 작성하고, docker-compose.yaml 파일에서는 Dockerfile을 사용할 수 있도록 작성했다.

  • Dockerfile
FROM apache/airflow:[version]

RUN pip install pymysql

Dockerfile에서는 사용할 베이스 airflow 이미지를 설정하고, pymysql을 설치하도록 작성한다.


  • docker-compose.yaml
services:
  airflow-webserver:
	  build: .
      
      ...
      

Dockerfiledocker-compose.yaml 파일을 같은 디렉터리에 위치하게 하면, build: .를 통해 docker-compose.yaml 파일이 위치한 폴더에서 Dockerfile을 찾아 사용한다.


docker-compose up --build

수정한 내용을 반영하기 위해 docker-compose를 빌드하여 실행한다.



🚨 DAG가 실행되지 않는 현상

Airflow 브라우저에서 DAG를 활성화한 후에 실행했는데, queued 상태에서 아무 일도 일어나지 않았다.

airflow의 웹 서버와 스케줄러 컨테이너가 모두 실행되고 있었기 때문에 원인을 바로 파악할 수 없었는데, 결과적으로 원인은 스케줄러가 웹 서버에서 실행되고 있지 않아서 발생하는 오류였다.

해결 방법

docker exec -it <webserver-container-name> bash

실행 중인 airflow 웹 서버 컨테이너에 접속한다.

airflow scheduler

접속한 컨테이너에서 스케줄러를 실행한다.

웹 서버에서 스케줄러를 실행한 이후에는 DAG를 Trigger할 수 있었다.


마무리

처음에는 crontab을 이용해서도 주기적인 프로세스 실행이 가능한데, Airflow를 이용하는 이유가 무엇일지 궁금해서 사용하게 되었다.
실제로 사용해 보니, 브라우저를 이용하여 쉽게 모니터링할 수 있고 다양한 설정을 직관적으로 확인하여 적용할 수 있다는 장점이 있었다.
특히나 Task의 의존성을 다이어그램으로 확인할 수 있고, Task의 성공 여부와 실행 로그를 개별로 확인할 수 있다는 점이 좋았다.

아직 Airflow와 브라우저에 대해 모든 것을 알고 있는 것은 아니지만, 가장 최근에 구축한 데이터 파이프라인에 Airflow를 도입하면서 앞으로도 데이터 수집과 관리에 대해 효과적인 방법을 찾아가야겠다고 생각했다.

0개의 댓글