Airflow 프로그래밍 기본 세팅 (TIL 32)

석형원·2024년 5월 22일

TIL

목록 보기
32/52

✏️ 오늘 학습한 내용

1. Airflow 설치
2. Airflow 기본 프로그램 실행


🔎 Airflow 설치

Airflow 설치 방법 두가지

  • 직접 설치하고 운영하는 방법

    • 개인이 사용하기에 적합
  • 클라우드를 사용하는 방법

    • 회사 차원에서 사용하기에 적합
    • 프로덕션 환경에서 선호됨.
    • 종류
      • AWS : MWAA(Managed Workflows for Apache Airflow)
      • 구글 클라우드 : Cloud Composer
      • MS Azure : Azure Data Factory의 Airflow DAGs 기능

📃 EC2 리눅스 서버에 설치하는 방법

Ubuntu 20.04 사용

  • AWS EC2 t3.small나 t3a.small 인스턴스를 선택
    ( 2 CPU, 2 GB Memory, 8GB SSD Disk )

  • AWS 계정 필요

    • Free Tier가 아니기에 비용이 발생

      t3.small 인스턴스의 경우 한달 비용이 $18.72
      t3a.small 인스턴스의 경우 한달 비용이 $16.85

리눅스 용어

  • ubuntu : 리눅스 타입 중의 하나

    • 리눅스 타입 : 데비안, 레드헷, 페도라, ...
  • ssh : 리눅스 혹은 유닉스 서버에 로그인해주는 프로그램 (터미널)

    • private key와 public key 사용
  • sudo : 보통 슈퍼유저로서 프로그램을 구동할 수 있도록 하는 프로그램

  • apt-get: 우분투 계열의 리눅스에서 프로그램 설치/삭제를 관리해주는 프로그램

    • apt-get update
    • apt-get install
  • su : substitue user의 약자로 현재 사용 중인 사용자 계정을 로그아웃하지 않고 다른 사용자의 권한을 얻을 때 사용.

  • vi : 텍스트 에디터.

설치 과정

아래 링크의 Readme를 참고하여 설치를 진행

https://github.com/keeyong/airflow-setup/blob/main/docs/Airflow%202%20Installation.md

  • 위 버전은 2.5.1을 기준으로 설치
  • Ubuntu : 메인 Account
  • Postgres
    • Metadata DB에 Postgres를 설치
    • 설치시 만들어지는 계정을 이용해 postgres 액세스를 위한 Airflow 사용자를 추가
  • Airflow : Airflow용 Account.
    (Airflow 서비스는 이 계정으로 실행됨)

📃 도커를 사용하여 설치하는 방법

  • Docker Engine 리소스
    • 맥에서는 메모리 4GB가 필요
    • 윈도우에서는 6GB가 필요

설치 과정

  • airflow-setup Github repo 클론

    • git clone https://github.com/keeyong/airflow-setup.git
  • 2.5.1 이미지 관련 yml 파일 다운로드

    cd airflow-setup
    curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
  • 이미지 다운 및 컨테이너 실행

    docker-compose -f docker-compose.yaml pull
    docker-compose -f docker-compose.yaml up
  • http://localhost:8080으로 웹 UI 로그인

    • airflow:airflow 사용 (ID/PW)

🔎 Airflow 기본 프로그램 실행

Airflow 코드의 기본 구조

  1. DAG 대표하는 객체를 먼저 생성
    ( DAG 이름, 실행 주기, 실행 날짜, 오너 등 )

  2. 다음으로 DAG를 구성하는 태스크를 생성
    2-1. 태스크 별로 적합한 오퍼레이터를 선택
    2-2. 태스크 ID를 부여하고 작업의 세부사항 지정

  3. 최종적으로 태스크들 간의 실행 순서를 결정

📃 DAG 설정 예제

DAG를 설정할 때 가장 먼저 해야하는 작업이 있습니다.

이 모든 태스크들에 공통으로 적용이 되는 설정을 Dictionary로 만들어 두어야합니다.
( 보통 default_args라 지칭 )

default_args 생성

from datetime import datetime, timedelta
default_args = {
 # 오너가 누구인지
 'owner': 'keeyong',
 # 오너의 이메일은 무엇인지
 'email': ['keeyonghan@hotmail.com'],
 # 작업이 실패했을 때 재시도를 몇 번 할 것인지
 'retries': 1,
 # 재시도 간에 얼마나 기다릴지 (3분)
 'retry_delay': timedelta(minutes=3),
}
  • 여기에 지정되는 인자들은 모든 태스크들에 공통으로 적용되는 설정이 됩니다.

  • on_success_callback, on_failure_callback
    태스크가 성공 혹은 실패했을 때, 내가 어떤 함수를 호출하고 싶은 경우 사용합니다.

DAG 생성

from airflow import DAG
dag = DAG(
 # DAG 이름
 "dag_v1",
 # DAG 시작 날짜
 start_date=datetime(2020,8,7,hour=0,minute=00),
 # 이 스케줄이 뜻하는 바는 모든 0분
 # 즉, 매 시간마다의 0분에 실행
 schedule="0 * * * *",
 # 태그 선택
 tags=["example"],
 catchup=False,
 # 위에서 지정한 default_args를 세팅
 default_args=default_args 
)
  • 스케줄 범위

    앞에서부터 분,시간,월,년,요일 순으로 사용합니다.

    만약, 실행을 주기적으로 하지않겠다면
    None 혹은 @once로 세팅 후 다른 DAG가 종료되었을 때, 해당 DAG를 트리거하는 방식으로 사용도 가능합니다.

  • Schedule의 cron expression

    • None, @once, @hourly, @daily, @weekly, @monthly, @yearly

    • e.g.) 매년 매일 매시 정각에 실행을 하고 싶은 경우
      @hourly

  • catchup 이란?

    예를 들어, start_date가 과거로 설정됐을 경우 Airflow는 설정된 과거부터 현재의 시간까지의 기간동안 실행이 밀렸다고 생각합니다.

    따라서, 밀린 해당 태스크를 실행하기 위해 그 태스크들의 스케줄에 해당되는 시간이 오면 밀린 태스크를 실행하는 것이 catchup입니다.

    -> Default가 catchup=True이므로,
    원치 않는다면 catchup=False로 바꿔서 사용해줘야합니다.

  • e.g.)

    Full Refresh를 하는 DAG의 경우 catchup은 항상 False로 하는 것이 좋습니다.

    그 이유는 과거의 태스크들을 모두 실행하는 것이나 지금 한번 DAG를 실행하는 것이나 똑같기 때문입니다.

    결국, 이 catchup은 incremental update를 하는 Data Pipeline에서만 의미가 있습니다.
    대부분의 경우에서는 False가 좋습니다.

Bash Operator 사용 예제

  • 3개의 태스크로 구성
  • t1은 현재 시간 출력
  • t2는 5초간 대기 후 종료
  • t3는 서버의 /tmp 디렉토리의 내용 출력
  • t1이 끝나고 t2,t3를 병렬로 실행
# DAG import
from airflow import DAG
# Bash Operator import
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
 'owner': 'keeyong',
 'start_date': datetime(2023, 5, 27, hour=0, minute=00),
 'email': ['keeyonghan@hotmail.com'],
 'retries': 1,
 'retry_delay': timedelta(minutes=3),
}

test_dag = DAG(
 "dag_v1", # DAG name
 schedule="0 9 * * *", 
 tags=['test'],
 catchUp=False,
 default_args=default_args 
)

# BashOperator로 Task 생성

t1 = BashOperator(
 task_id='print_date',
 # 지금 시간 출력
 bash_command='date',
 dag=test_dag)
 
t2 = BashOperator(
 task_id='sleep',
 # 5초 동안 정지
 bash_command='sleep 5',
 dag=test_dag)
 
t3 = BashOperator(
 task_id='ls',
 # /tmp 내용 출력
 bash_command='ls /tmp',
 dag=test_dag)
 
 # t1이 끝나면 t2,t3를 동시 실행
 t1 >> [ t2, t3 ]

DAG - 터미널에서 실행

  • 터미널에서 Docker로 Airflow 로그인

    # docker의 컨테이너들을 확인해서
    # 스케줄러에 접근하기 위한 ID를 찾음
    docker ps
    
    # 컨테이너 안으로 로그인.(airflow)
    docker exec -it container_id sh
    
  • Airflow 로그인 후 명령어 실행

    # DAG 리스트 출력
    airflow dags list
    
    # 선택한 DAG 내의 태스크 리스트 출력
    airflow tasks list DAG이름
    
    # DAG의 특정 Task를 실행하고 싶은 경우
    airflow tasks test DAG이름 Task이름 날짜
    # test와 run은 동일한 기능.
    # test의 경우 metadata에 기록이 됨.
    # run의 경우 기록이 되지 않음
profile
데이터 엔지니어를 꿈꾸는 거북이, 한걸음 한걸음

0개의 댓글