[프로그래머스] 데브코스 데이터엔지니어링 TIL Day 42

주재민·2023년 12월 12일
0
post-thumbnail

📖 학습주제

데이터 파이프라인, Airflow (2)


Airflow 설치

  1. 직접 설치하고 운영

  2. 클라우드 사용 (프로덕션 환경에서 선호됨)

  • AWS: MWAA (Managed Workflows for Apache Airflow) 사용
  • 구글 클라우드: Cloud Composer 사용
  • Microsoft Azure: Azure Data Factory에 Airflow DAGs 기능 존재

직접 설치 방법

  1. 개인 랩탑에 도커 설치 후 Airflow 설치

  2. AWS EC2 등의 리눅스 서버에 직접 설치

  • 우분투 20.04 사용

리눅스 이해하기

  • 우분투 (ubuntu) : 리눅스 타입 중의 하나. 다른 타입은 데비안, 레드햇, 페도라, ...
  • ssh : 리눅스 혹은 유닉스 서버에 로그인해주는 프로그램 (터미널)
    - private key와 public key를 사용
  • sudo : 보통 슈퍼유저로서 프로그램을 구동할 수 있도록 하는 프로그램
  • apt-get : 우분투/데비안 계열의 리눅스에서 프로그램 설치/삭제를 관리해주는 프로그램 (apt-get update, apt-get install)
  • su : substitue user의 약자로 현재 사용 중인 사용자 계정을 로그아웃하지 않고 다른 사용자의 권한을 얻을 때 사용
  • vi : 텍스트 에디터. https://withcoding.com/112

Airflow 기본 프로그램 실행

Airflow 코드의 기본 구조

  • DAG 대표하는 객체를 먼저 만듬
    - DAG 이름, 실행주기, 실행날짜, 오너 등등
  • 다음으로 DAG를 구성하는 태스크들을 만듬
    - 태스크별로 적합한 오퍼레이터를 선택
    - 태스크 ID를 부여하고 해야할 작업의 세부사항 지정
  • 최종적으로 태스크들간의 실행 순서를 결정

DAG 설정 예

from datetime import datetime, timedelta

default_args = {
 'owner': 'keeyong',
 'email': ['keeyonghan@hotmail.com'],
 'retries': 1,
 'retry_delay': timedelta(minutes=3),
}
  • 여기에 지정되는 인자들은 모든 태스크들에 공통으로 적용되는 설정이 됨(default_args)
  • 뒤에서 DAG 객체를 만들 때 지정함

retries : 태스크가 실패하면 재시도 할지 말지, 한다면 몇 번 할지
retry_delay : 재시도시 얼마나 기다릴지

그 외
on_failure_callback : 실패 시 호출할 함수
on_success_callback : 성공 시 호출할 함수

from airflow import DAG

dag = DAG(
 "dag_v1", # DAG name
 start_date=datetime(2020,8,7,hour=0,minute=00),
 schedule="0 * * * *",
 tags=["example"],
 catchup=False,
 # common settings
 default_args=default_args
)

start_date : 시작 날짜
catchup : start_date부터 활성화 시점까지 실행 안된 날짜에 대해서 실행하려 함
full refresh 때는 false로
schedule="0 * * * *" : 매시 0분에 시작

schedule

None, @once, @hourly, @daily, @weekly, @monthly, @yearly 등으로 세팅 가능

Bash Operator를 사용한 예

  • 3개의 태스크로 구성
  • t1은 현재 시간 출력
  • t2는 5초간 대기 후 종료
  • t3는 서버의 /tmp 디렉토리의 내용 출력
  • t1이 끝나고 t2와 t3를 병렬로 실행
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
 'owner': 'keeyong',
 'start_date': datetime(2023, 5, 27, hour=0, minute=00),
 'email': ['keeyonghan@hotmail.com'],
 'retries': 1,
 'retry_delay': timedelta(minutes=3),
}

test_dag = DAG(
 "dag_v1", # DAG name
 schedule="0 9 * * *",
 tags=['test'],
 catchUp=False,
 default_args=default_args
)


t1 = BashOperator(
 task_id='print_date',
 bash_command='date',
 dag=test_dag)
 
t2 = BashOperator(
 task_id='sleep',
 bash_command='sleep 5',
 dag=test_dag)
 
t3 = BashOperator(
 task_id='ls',
 bash_command='ls /tmp',
 dag=test_dag)
 
t1 >> [ t2, t3 ]

DAG 실행 - UI

DAG 실행 - Terminal

Airflow 서버에 로그인하고 다음 명령 실행

  • airflow dags list : 대그 목록
  • airflow tasks list DAG이름 : 해당 대그의 태스크 목록
  • airflow tasks test DAG이름 Task이름 날짜 # test vs. run
    - 날짜는 YYYY-MM-DD
         start_date보다 과거인 경우는 실행이 되지만 오늘 날짜보다 미래인 경우 실행 안됨
         이게 바로 execution_date의 값이 됨

0개의 댓글