Product Serving(3): Airflow practice 1

SeongGyun Hong·2024년 12월 10일

NaverBoostCamp

목록 보기
42/64

1. 기본 설치 및 실행

  1. Airflow 설치
pip install "apache-airflow==2.6.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.9.txt"
  • Airflow 2.6.3 버전 설치.
  1. Airflow 디렉토리 설정
export AIRFLOW_HOME=`pwd`
  • Airflow 홈 디렉토리 설정.
  1. Airflow DB 초기화
airflow db init
  • Airflow DB 초기화.
  1. Airflow 사용자 계정 생성
airflow users create --username <사용자이름> --firstname <이름> --lastname <> --email <이메일> --role Admin --password <비밀번호>
  • 관리자 계정 생성.
  1. 웹 서버 실행
airflow webserver --port 8080
  • Airflow 웹 서버 실행.
  1. 스케줄러 실행
airflow scheduler
  • Airflow 스케줄러 실행.

  • 웹 UI는 localhost:8080에서 접속.
  • Python 3.7 이상 필요.

2. Airflow Hello world!: Script

  1. pwd로 설정했던 AIRFLOW_HOME에서 DAG를 저장할 dags 폴더 만들기

    mkdir dags

  2. Airflow DAG 파일은 크게 3가지 파트로 나뉘는데
    1) DAG 정의
    2) TASK 정의
    3) Task 순서 정의(연결)

    위 세가지다.

  3. 실무에서 Airflow를 사용할 때 중요한 인자

  • catchup

    • 과거에 지나간 일자의 DAG를 실행할지 결정하는 옵션

      • True: DAG에서 정의한 start_date부터 현재까지 미실행된 모든 스케줄에 대해 DAG를 실행함. 과거 데이터를 처리할 필요가 있을 때 유용하다.

      • False: DAG에서 정의한 start_date와 상관없이 앞으로 실행될 DAG를 실행

  • depends_on_past

    • 특정 Task가 이전 DAG 실행 결과에 의존할지 여부를 결정함

    • 이전 Task와 상관없이 작업을 수행하고 싶은지 고민

    • 하루 단위의 작업들이 의존성이 있다면 True를 주고 순차적으로 실행

      • True: 이전 DAG이 성공으로 완료되어야 이후 DAG이 실행됨
      • False: 이전 DAG의 성공 여부와 상관없이 스케줄이 되면 DAG를 실행
        다만, Executor에 따라 여러가지 작업을 한번에 실행할 수도 있음

실습 요약

  1. DAG 디렉토리 설정

    • AIRFLOW_HOME/dags 디렉토리를 생성.
    • DAG 파일은 이 디렉토리에 .py 파일로 저장.
  2. DAG 파일 구성

    • DAG 정의: Workflow 전체 구조를 정의 (DAG 이름, 스케줄 간격, 기본 설정 포함).
    • Task 정의: DAG 내에서 실행되는 작업을 Python Operator로 정의 (또는 BashOperator, Operator는 다양하다)
    • Task 순서 정의: 작업 간의 실행 순서를 지정 (task1 >> task2 형태).
  3. DAG 파일 저장 및 확인

    • .py 파일로 저장 후, Airflow 웹 UI에서 DAG 목록을 확인 가능.
    • UI 경로: localhost:8080.
  4. DAG 활성화 및 실행

    • 웹 UI에서 해당 DAG를 ON으로 전환하면 스케줄러에 의해 DAG 실행.
  5. 실행 결과 확인

    • DAG 세부 페이지에서 실행된 DAG Run의 결과 및 상태 확인 가능.

추가설명:

  • DAG 파일은 Airflow가 자동으로 감지하므로 서버 재시작 불필요.
  • schedule_interval에 따라 주기적으로 실행 가능 (@daily, @hourly 등).
  • Task는 독립적 실행도 가능하며, 실패 시 재시도 옵션 설정 가능.

3. Airflow의 다양한 Operator

  • PythonOperator

    • 파이썬 함수를 실행
    • 함수 뿐 아니라, Callable한 객체를 파라미터로 넘겨 실행 가능
    • 실행할 파이썬 로직을 함수로 생성한 후 PythonOperator로 실행
        t2 = PythonOperator(
          task_id="print_world",
          python_callable=print_world,
          depends_on_past = True,
          owner = "hsk",
          retries = 3,
          retry_delay = timedelta(minutes =5),
      )
  • BashOperator

    • Bash 커맨드를 실행
    • 실행해야 할 프로세스가 파이썬이 아닌 경우에도 BashOperator로 실행 가능
      • ex) shell 스크립트, scala 파일 등
          t1 = BashOperator(
          task_id = "print_hello",
          bash_command = "echo Hello",
          owner="hsk", # 이 작업의 오너, 보통 작업담당자 이름,
          retries = 3, # Task 실패시 재시도할 횟수
          retry_delay = timedelta(minutes=5) # 재시도 간격 : 5분
      )
  • DummyOperator

    • 아무것도 실행하지 않음
    • DAG 내에서 Task를 구성할 때, 여러 개의 Task의 SUCCESS를 기다려야 하는 복잡한 TAsk 구성에서 사용

      위와 같은 식으로 F로 넘어가기 직전에 E로 작업을 다 저장해두는 역할
  • SimpleHttpOperator
    파이썬에서 requests 모듈로도 가능하지만 이런 걸로도 가능하다.

    • 특정 호스트로 HTTP 요청을 보내고, Response를 반환한다.
    • 파이썬 함수에서 requests 모듈을 사용한 뒤 PythonOperator로 실행시켜도 무방
    • 다만 이런 기능이 Airflow Operator에 이미 존재한다는 것을 알면 좋음
  • 클라우드 기능을 추상화한 Operator도 존재한다(AWS, GCP 등) - Provider Packages

    • 다만, 외부 Third Party와 연동해 사용하는 Operator의 경우 (docker, aws, gcp 등) Airflow 설치 시에 다음처럼 extra package를 설치해야 함.
      pip install "apache-airflow[aws]"
  • BranchPythonOperator

    • 특정 조건에 따라 실행을 제어하는 Operator
    • 특정 상황엔 A 작업, 없으면 Pass
      • ex1)
        학습한 결과 Accuracy가 기존 모델보다 좋으면 저장 후 모델 업데이트, 좋지 않으면 저장만 진행
      • ex2)
        특정 일자 전에는 A모델, 그 이후에는 B 모델
  • Operator 외에도 알아두면 좋은 내용
    Airflow DAG를 더 풍부하게 작성할 수 있는 개념

    • Variable:
      Airflow Console에서 변수(Variable)를 저장해 Airflow DAG에서 활용
    • Connection & Hooks:
      연결하기 위한 설정(MySQL, GCP 등)
    • Sensor:
      외부 이벤트를 기다리며 특정 조건이 만족하면 실행
    • XComs: Task 끼리 결과를 주고받고 싶은 경우 사용
    • Jinja Template: 파이썬의 템플릿 문법. FastAPI에서도 사용


      자세한 내용은 Airflow DAG를 하나씩 만들면서 사용해보는 것을 추천

4. Task가 실패할 때 Slack 메시지 전송

  • Slack 연동
  • DAG 실패 알림
    => Connection 사용

  1. Airflow Slack Provider 설치

  2. Slack Webhook URL 따기

  3. Webserver 통하여 [Admin]-[Connections]-[Add a new record] 경로로 Connection 생성

  4. Add Copnnection 채우기

  5. Task가 실패할 때 알림 보내주는 코드 작성

  • dags/utils 폴더 생성후에
  • init.py 생성
  • slack_notifier.py 생성
  • utils 폴더 안에서 작성한 파일을 import해서 사용하기 위함.
  • 이후 dags 폴더 안에서 DAG 코드 작성
    DAG 코드 예시는 아래와 같음
    DAG 코드 예시

5. Airflow 아키텍쳐

기본 아키텍쳐


출처: https://medium.com/@bageshwar.kumar/airflow-architecture-a-deep-dive-into-data-pipeline-orchestration-217dd2dbc1c3

  • Scheduler
    각종 메타 정보의 기록을 담당
    • DAG Directory 내 .py 파일에서 DAG을 파싱하여 DB에 저장
    • DAG들의 스케줄링 돤리 및 담당
    • 실행 진행 상황과 결과를 DB에 저장
    • Executor를 통해 스케줄링 기간이 된 DAG를 실행
    • Airflow에서 가장 중요한 컴포넌트

Executor
스케줄링 기간이 도래한 DAG를 실행하는 객체를 의미

  • Local Executor
    DAG Run을 프로세스 단위로 실행한다.
    • 하나의 DAG Run을 하나의 프로세스로 띄워서 실행하고
    • 최대로 생성할 프로세스 수를 정해야 한다.
    • Airflow를 간단하게 운영할 때 적합하다.
  • Sequential Executor
    • 하나의 프로세스에서 모든 DAG Run들을 처리
    • Airflow 기본 Executor로, 별도 설정이 없으면 이 Executor를 사용한다.
    • Airflow를 테스트로 잠시 운영할 때 적합하거나 잘 사용하지는 않음
  • Remote Executor
    DAG Run을 외부 프로세스로 실행
    • Celery Executor
      • DAG Run을 Celery Worker Process로 실행
      • 보통 Redis를 중간에 두고 같이 사용
      • Local Executor를 사용한다. Airflow 운영 규모가 좀 더 커지면 Celery Executor로 전환
  • Kubernetes Executor
    • 쿠버네티스 상에서 Airflow를 운영할 때 사용
    • DAG Run 하나가 하나의 Pod(쿠버네티스의 컨테니어 같은 개념)
    • Airflow 운영 규모가 큰 팀에서 사용
  • Workers
    DAG의 작업을 수행

    • Scheduler에 의해 생기고 실행
    • DAG Run을 실행하는 과정에서 생긴 로그를 저장
  • Metadata Database
    메타 정보를 저장

    • Scheduler에 의해 Metadata가 저장
    • 보통 MySQL이나 PostgresQL을 사용
    • 파싱한 DAG 정보, DAG Run 상태와 실행 내용, Task 정보 등을 저장
    • User와 Role (RBAC)에 대한 정보 저장
    • Scheduler와 더불어 핵심 컴포넌트
      • 트러블 슈팅 시, 디버깅을 위해 직접 DB에 연결해 데이터를 확인하기도 함
    • 실제 운영 환경에서는 GCP Cloud SQL이나, AWS Aurora DB 등 클라우드 DB를 사용
  • Webserver
    WEB UI를 담당

    • Metadata DB와 통신하며 유저에게 필요한 메타 데이터를 웹 브라우저에 보여주고 시각화
    • 보통 Airflow 사용자들은 이 웹서버를 이용하여 DAG를 ON/OFF 하며, 현 상황을 파악한다
    • REST API도 제공하므로, 꼭 WEB UI를 통해서 통신하지 않더라도 괜찮다.
    • Webserver가 당장 작동하지 않더라도, Airflow에 큰 장애가 발생하지 않음
      • 반면 Scheduler의 작동 여부는 운영에 매우 치명적이므로 중요

5. 실무에서는 Airflow를 어떻게 쓰나요?

  • Airflow를 구축하는 방법으로 보통 3가지 방법을 사용

      1. Managed Airflow (GCP Composer, AWS MWAA)
        클라우드 서비스 형태로 Airflow를 사용하는 방법
        어떤 클라우드를 쓸지는 선택의 여부
        보통 별도의 데이터 엔지니어가 없고, 분석가로 이루어진 데이터 팀의 초기에 활용하기 좋음
      • 장점
        • 설치와 구축을 클릭 몇번으로 클라우드 서비스가 다 진행해 줌
        • 유저는 DAG 파일을 스토리지(파일 업로드) 형태로 관리
      • 단점
        • 높은 비용, 적은 자유도
        • 클라우드에서 기능을 제공하지 않으면 불가능한 제약이 많음
      1. VM + Docker compose
        직접 VM 위에서 Docker compose로 Airflow로 배포하는 방법
      • 보통 데이터 엔지니어가 적게 존재하는 데이터 팀 성장 초반에 적합
      • 장점
        1) Managed Service 보다는 살짝 복잡하지만, 어려운 난이도가 아님
        2) Docekr와 Docker compose에 익숙한 사람이라면 금방 익힘
        3) 하나의 VM만 사용하기에 단순
      • 단점
        1) 각 도커 컨테니어 별로 환경이 다르므로, 관리포이트가 늘어남 >> 이슈 발생 가능성 상승
        2) 특정 컨테니어가 갑자기 죽을수도 있으며, 특정 컨테이너에 라이브러리를 설치했다면, 나머지 컨테이너에도 하나씩 설치해야함...
      1. Kubernetes + Helm
        Kubernetes 환경에서 helm 차트로 Airflow를 배포하는 방법
      • Kubernetes는 여러 개의 VM을 동적으로 운영하는 일종의 분산환경으로, 리소스 사용이 매우 유연한 것이 특징이다.(필요에 따라 VM 수를 알아서 늘려주고 줄여줌)
      • 이런 특징 덕분에, 특정 시간에 배치 프로세스를 실행시키는 Airflow와 궁합이 매우 잘 맞음
      • Airflow DAG 수가 몇 백개로 늘어나도 노드 오토 스케일링으로 모든 프로세스를 잘 처리할 수 있음
      • 하지만 쿠버네티스 자체가 난이도가 있는 만큼 구축과 운영이 어려움
      • 보통 데이터 팀에 엔지니어링 팀이 존재하고, 쿠버네티스 환경인 경우에 적극 사용함

6. 마무리...

Airflow는 많이 띄운다...
어떤 회사는 데이터 엔지니어링 전용 Airflow를 띄우고
어떤 회사는 MLOps 전용 Airflow를 띄운다.

Airflow를 띄우는 것도 컴퓨터(서버)를 사용하기에 비용을 생각해야한다.

운영하는 DAG 갯수가 적은 경우에 Data Engineering + MLOps 통합 Airflow 1개를 운영하는 경우 또한 존재한다.

  • 핵심 정리
    Apache Airflow
    • Batch Serving, 특정 시간 단위로 실행해야 할 때 사용하는 워크플로우 도구
    • A Task 후, B Task, 그 후 여러 Task를 병렬 실행 등 다양한 기능 지원
    • 데이터 엔지니어링, MLOps에서 많이 사용

언제 쓰는게 적당한가요?

MLOps 에서도 Airflow를 사용하는 경우

  • 주기적인 실행이 필요한 경우
  • Batch Training: 1주일 단위 모델 학습
  • Batch Serving(Batch Inference): 30분 단위로 인퍼런스
  • 인퍼런스 결과를 기반으로 일자별, 주차별 모델 퍼포먼스 Report 생성
  • MySQL에 저장된 메타데이터를 데이터 웨어하우스로 1시간 단위로 옮긴다.
  • Feature Store를 만들기 위해 Batch ETL 실행

관련 추천 글

  1. 버킷플레이스 - Airflow 도입기 - https://www.bucketplace.co.kr/post/2021-04-13-버킷플레이스-airflow-도입기/
  2. 라인 엔지니어링 - Airflow on Kubernetes - https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-1/
  3. 쏘카 Airflow 구축기 & Advanced - https://tech.socarcorp.kr/data/2021/06/01/data-engineering-with-airflow.html - https://tech.socarcorp.kr/data/2022/11/09/advanced-airflow-for-databiz.html
  4. Airflow Executors Explained - https://www.astronomer.io/guides/airflow-executors-explained
profile
헤매는 만큼 자기 땅이다.

0개의 댓글