Airflow란 ?

  • Airflow는 파이썬으로 작성된 데이터 파이프라인 (ETL) 프레임워크
    • Airbnb에서 시작한 아파치 오픈소스 프로젝트
    • 가장 많이 사용되는 데이터 파이프라인 관리/작성 프레임워크
  • 데이터 파이프라인 스케쥴링 지원
    • 정해진 시간에 ETL 실행 혹은 한 ETL의 실행이 끝나면 다음 ETL 실행
    • 웹 UI를 제공하기도 함
      • 문제가 생긴 데이터파이프라인이 있으면 재실행도 쉬움 (Backfill과 연결)
      • 로그 확인
    • 데이터 파이프라인(ETL)을 쉽게 만들 수 있도록 해줌
      • 다양한 데이터 소스와 데이터 웨어하우스를 쉽게 통합해주는 모듈 제공
      • 데이터 파이프라인 관리 관련 다양한 기능을 제공해줌: 특히 Backfill
    • Airflow에서는 데이터 파이프라인을 DAG(Directed Acyclic Graph)라고 부름
      • 하나의 DAG는 하나 이상의 태스크로 구성됨
    • 2020년 12월에 Airflow 2.0이 릴리스됨
    • Airflow 버전 선택 방법 : 큰 회사에서 사용하는 버전이 무엇인지 확인.

Airflow 구성

  • 총 5개의 컴포넌트로 구성
  1. 웹 서버(Web Server) - Python flask로 구현되어있음
  2. 스케줄러 (Scheduler) - 데이터 파이프라인을 정해진 시간에 실행해주거나 다음꺼를 트리거해주는걸 전담
  3. 워커 (Worker) - 실제 태스크를 실행하는 모듈. 스케일링이 이 워커를 늘리는 것
  4. 메타 데이터 데이터베이스 - 스케쥴링 정보 워커 정보를 기록하는 용도로 쓰이는 곳.
    a. Sqlite가 기본으로 설치됨.싱글쓰레드 파일기반으로 테스트 사용으로도 어려워 별도로 mysql, postgres 설치받아 사용이 일반적
  5. 큐(다수 서버 구성인 경우에만 사용됨) - 태스크가 다수의 워커로 분산이 되어서 처리되려면 필요한 중간 매개체.
    a. 이 경우 Executor가 달라짐
  • 스케줄러는 DAG들을 워커들에게 배정하는 역할을 수행
  • 웹 UI는 스케줄러와 DAG의 실행 상황을 시각화해줌
  • 워커는 실제로 DAG를 실행하는 역할을 수행
  • 스케줄러와 각 DAG의 실행결과는 별도 DB에 저장됨
    • 기본으로 설치되는 DB는 SQLite
    • 실제 프로덕션에서는 MySQL이나 Postgres를 사용해야함

Airflow 구조 : 서버 한대


Worker의 수는 서버의 CPU개수에 따라 결정됨. CPU가 6대이면 worker도 6개가 돌게 됨.

한대로 가면 한대의 용량 제한(CPU)이 있다보니 동시에 실행할 수 있는 task 수 제약이 있게 되고, 데이터 파이프라인이 늘어나면 한대로 부족하게 될거임. 스케일링을 할 경우 워커를 별도의 서버에 세팅하고 워커가 있는 서버의 수를 늘리는 형태로 용량을 증대한다.

Airflow 스케일링 방법

  • 스케일 업 ( 더 좋은 사양의 서버 사용)
  • 스케일 아웃 ( 서버 추가)

Airflow 구조 : 다수 서버


마스터 노드에는 웹서버와 스케줄러가 있고 메타데이터DB와 연결되어있음. 다수의 워커 노드들이 있을거고 이것과 통신은 큐를 통해서 하게됨. 스케쥴러가 어떤 태스크를 바로 워커에 넘기는게 아니라 에어플로우에 세팅되는 executor를 통해서 워커에게 넘기게 되는데 executor가 무엇인지에 따라 큐를 쓰고 안쓰고가 달라진다.
bottleneck은 워커에 있으므로 서버들을 워커전용으로 할당하기 시작한다.


코드를 만드는 Dag Directory가 있다. airflow 서버안에 특정 폴더안에 파이썬으로 작성한 데이터 파이프라인 코드를 위치시키게 되는데 에어플로우가 주기적으로 이 디렉토리를 파싱한다.

  • 여러 종류의 Executor들
    • Sequential Executor
    • Local Executor
    • Celery Executor
    • Kubernetes Executor
    • CeleryKubernetes Executor
    • Dask Executor

Airflow 개발의 장단점

  • 장점
    • 데이터 파이프라인을 세밀하게 제어 가능
    • 다양한 데이터 소스와 데이터 웨어하우스를 지원
    • 백필(Backfill)이 쉬움
  • 단점
    • 배우기가 쉽지 않음
    • 상대적으로 개발환경을 구성하기가 쉽지 않음
    • 직접 운영이 쉽지 않음. 클라우드 버전 사용이 선호됨
      • GCP: Cloud Composer
      • AWS: Managed Workflows for Apache Airflow(MWAA)
      • Azure: Data Factory Managed Airflow

DAG란 무엇인가 ?

  • Directed Acyclic Graph의 줄임말
  • Airflow에서 ETL을 부르는 명칭
  • DAG는 태스크로 구성됨
    • 예를 3개의 태스크로 구성된다면 Extract, Transform, Load로 구성
  • 태스크란 ? - Airflow의 오퍼레이터(Operator)로 만들어짐
    • Airflow에서 이미 다양한 종류의 오퍼레이터를 제공함
    • 경우에 맞게 사용 오퍼레이터를 결정하거나 필요하다면 직접 개발
    • e.g., Redshift writing, Postgres query, S3 Read/Write, Hive Query, Spark job, shell script

DAG의 구성 예제

  • 3개의 Task로 구성된 DAG.
  • 먼저 t1이 실행되고 t2, t3의 순으로 일렬로 실행
  • 3개의 Task로 구성된 DAG.
  • 먼저 t1이 실행되고 여기서 t2와 t3로 분기

Airflow 설치 방법

  • 직접 설치하고 운영
    1. 개인 랩탑에 도커 설치 후 Airflow 설치
    2. AWS EC2 등의 리눅스 서버에 직접 설치
      a. ubuntu 20.04
  • 클라우드 사용(프로덕션 환경에서 선호됨)
    • AWS: MWAA
    • 구글 클라우드 : Cloud Composer 사용
    • Microsoft Azure: Azure Data Factory에 Airflow DAGs 기능 존재

Ubuntu 20.04 사용 Example

  • AWS EC2 t3.small나 t3a.small 인스턴스 사용
    • 2 CPU, 2GB Memory, 8GB SSD Disk 사용
  • AWS 계정 필요
    • 위 인스턴스 타입은 Free Tier가 아니므로 비용 발생
      • t3.small 인스턴스의 경우 한달 비용이 $18.72
      • t3a.small 인스턴스의 경우 한달 비용이 $16.85
    • 가능하면 실제 개발 실습은 Docker 환경 사용

리눅스 이해

  • Ubuntu: 리눅스 타입 중의 하나. 다른 타입은 데비안, 레드햇, 페도라, ...
  • ssh : 리눅스 혹은 유닉스 서버에 로그인해주는 프로그램 (터미널)
    • private key와 public key를 사용
  • sudo: 보통 슈퍼유저로서 프로그램을 구동할 수 있도록 하는 프로그램이다.
  • apt-get: 우분투/데비안 계열의 리눅스에서 프로그램 설치/삭제를 관리해주는 프로그램
    • apt-get update, apt-get install
  • su: substitute user의 약자로 현재 사용중인 사용자 계정을 로그아웃하지 않고 다른 사용자의 권한을 얻을 때 사용한다.
  • vi: 텍스트 에디터.

Airflow 2.9.1 설치 과정

  • 최신 안정 버전을 찾아서 2.9.1을 적절히 변경해서 실행할 것
  • Airflow 메타데이터베이스로 로컬 서버에 Postgres를 설치할 예정
  • Airflow는 /var/lib/airflow/ 밑에 설치됨
  • Airflow 서버에 총 3개의 어카운트가 사용됨
    • ubuntu: 메인 어카운트
    • postgres: (postgres 설치시 만들어지는 계정)
      • 이 계정을 이용해 postgres 액세스를 위한 airflow 계정을 별도로 생성
    • airflow: Airflow용 어카운트. Airflow 서비스는 이 계정으로 실행됨
  1. ubuntu로 로그인해서
  • python 업그레이드
  • airflow 사용자 생성
  • airflow 모듈 설치
  • postgres 설치
  1. postgres로 어카운트 변경
  • postgres에 airflow 계정 생성
  1. airflow로 어카운트 변경
  • airflow.cfg 변경
  • airflow 재설치
  • Airflow web 로그인 계정 생성
  1. ubuntu로 어카운트 변경
  • Airflow 웹서버와 스케줄러를 서비스로 등록하고 시작

이건 따로 실습 X

Mac에서 Docker 기반 Airflow 실행

docker-engine에 들어가보면 이렇게 airflow 실행을 위한 여러 컨테이너들이 구동되고 있는 것을 확인할 수 있다.

localhost:8080으로 접속하여 id airflow pwd airflow을 입력하면 로그인 가능하고, 위와 같은 화면을 확인할 수 있다. 이미 만들어져있는 dag들은 예제 pipeline이다.

Airflow 코드의 기본 구조

  • DAG 대표하는 객체를 먼저 만듬
    • DAG 이름, 실행주기, 실행날짜, 오너 등등
  • 다음으로 DAG를 구성하는 태스크들을 만듬
    • 태스크별로 적합한 오퍼레이터를 선택
    • 태스크 ID를 부여하고 해야할 작업의 세부사항 지정
  • 최종적으로 태스크들간의 실행순서를 결정
from airflow import DAG

dag = DAG(
  "dag_v1" #DAG name
  start_date=datetime(2020,8,7,hour=0,minute=00),
  schedule="0****",
  tags=["example"],
  catchup=False,
  # common settings
  default_args = default_args
)
  • start_date와 end_date는 DAG가 언제 시작하고 멈추는지를 특정한다.
    • can be used to do one-off backfilling
    • catchup의 의미를 이해하는 것이 중요하다.
      만약 현재가 2023년 8월 7일이면 지난 3년간의 실행 안된 날짜들에 대해서 매시간마다 대그를 실행하려고 한다. 그걸 원하지 않으면 catchup=False로 하면 갭에 대해서 아무 행동을 안한다. full-refresh일수록 더더욱 하면 안되고(어차피 결과는 똑같기 때문), increment 데이터파이프라인에서만 의미있음
  • Schedule interval은 cron 표현식이나 아래와 같은 preset으로 표현될 수 있다.
    • None, @once, @hourly, @daily, @weekly, @monthly, @hearly

Bash Operator를 사용한 예제

  • 3개의 태스크로 구성
  • t1은 현재 시간 출력
  • t2는 5초간 대기 후 종료
  • t3는 서버의 /tmp 디렉토리의 내용 출력
  • t1이 끝나고 t2와 t3를 병렬로 실행
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta


default_args = {
   'owner': 'keeyong',
   'start_date': datetime(2023, 5, 27, hour=0, minute=00),
   'email': ['keeyonghan@hotmail.com'],
   'retries': 1,
   'retry_delay': timedelta(minutes=3),
}

test_dag = DAG(
   "dag_v1", # DAG name
   schedule="0 9 * * *", 
   tags=['test'],
   catchup=False,
   default_args=default_args 
)

t1 = BashOperator(
   task_id='print_date',
   bash_command='date',
   dag=test_dag)

t2 = BashOperator(
   task_id='sleep',
   bash_command='sleep 5',
   dag=test_dag)

t3 = BashOperator(
   task_id='ls',
   bash_command='ls /tmp',
   dag=test_dag)

t1 >> [ t2, t3 ]

How to Trigger a DAG in Terminal

  • 먼저 Airflow 서버에 로그인하고 다음 명령 실행
  airflow dags list
  airflow tasks list DAG이름
  airflow tasks test DAG이름 Task이름 날짜 # test vs run
  • 날짜는 YYYY-MM-DD
    - start_date보다 과거인 경우는 실행이 되지만 오늘 날짜보다 미래인 경우 실행 안됨
    - 이게 바로 execution_date의 값이 됨

profile
keep growing

0개의 댓글