DAG 생성(BASH)

우상욱·2024년 3월 19일

Airflow Master Class

목록 보기
3/24

Airflow DAG 생성(Bash Operator)

  • 오퍼레이터
    특정행위를 할 수 있는 기능을 모아 놓은 클래스, 설계도

    • Bash 오퍼레이터
      쉘 스크립트 명령을 수행하는 오퍼레이터
    • Python Operator
      파이썬 코드를 실행시켜주는 오퍼레이터
    • S3
      아마존 S3 관련 오퍼레이터
    • GCS
      구글 클라우드의 오브젝트 스토리지 관련 오퍼레이터
  • Task
    오퍼레이터에서 객체화(인스턴스화)되어 DAG에서 실행 가능한 오브젝트

Task의 수행 주체

  • 스케줄러
    스케줄러는 DAG 파일을 읽어들이고, TASK들에 문제가 없는지 확인합니다. 이후 확인한 정보들을 메타 DB에 넣고, Start 시간을 확인해서 이를 워커에 시작을 지시합니다. 에어플로우의 뇌라고 이해하면 좋습니다.
    • DAG Parsing 후 DB에 정보 저장
    • DAG 시작시간 결정
  • 워커
    • 실제 작업 수행

실습

from airflow import DAG
from airflow.operators.bash import BashOperator
import datetime
import pendulum

with DAG(
    # dag 파일명과 dag_id를 일치시키는 것이 좋습니다.
    dag_id="dags_bash_operator",
    
    # 분 시 일 월 요일
    schedule="0 0 * * *",
    
    # dag가 언제부터 시작 할 건지 결정
    # utc는 세계 표준시라고 부르고, 한국보다 9시간 느리다.
    # Asia/Seoul은 한국 시간으로
    start_date=pendulum.datetime(2021, 1, 1, tz="Asia/Seoul"),
    
    # catchup 파라미터
    # catchup 변수가 True면 시작 시간부터 오늘까지 그 사이에 누락된 구간을 전부 한꺼번에 돌립니다.
    # 일반적으로는 catchup 변수는 False로 두는 것이 좋습니다.
    catchup=False,
    
    # dagrun_timeout은 이 dag가 60분 이상 지나면, dag를 종료한다는 것입니다.
    # dagrun_timeout=datetime.timedelta(minutes=60),
    
    # tag들은 DAG 들을 모아서 보기 좋게 하기 위해서 설정하는 것입니다.
    # tags=["example", "example2"],
) as dag:
    # bash_t1은 태스크명
    bash_t1 = BashOperator(
        # 이 task_id는 graph에 나오는 이름
        # 이 아이디는 변수명과 사실 관계는 없지만,
        # 객체명과 동일하게 줘야 찾기 편합니다.
        task_id="bash_t1",
        bash_command="echo whoami",
    )

    bash_t2 = BashOperator(
        task_id="bash_t2",
        bash_command="echo $HOSTNAME",
    )

    # 태스크들의 실행 순서
    bash_t1 >> bash_t2

DAG 실행 및 확인


DAG 실행해보기

TASK 확인

TASK를 실제로 확인하려면, dags_bash_operator DAG를 클릭해서, Grid 탭의 왼쪽을 봅니다. 해당 task별로 나와있는 그래프를 확인할 수 있고, 각 네모를 눌러주면, started_date를 확인할 수 있습니다.

또한 로그도 확인할 수 있습니다.

실제 실행 컨테이너 확인

앞서 bash_t2 태스크에서 echo $HOSTNAME을 통해서, hostname을 확인했습니다. 해당 로그를 먼저 확인해보면, 이렇게 나와있는 것을 볼 수 있습니다.

앞서 말했듯이, 실제 TASK를 처리하는 역할은 Worker가 진행한다고 했습니다. 그럼 worker 컨테이너의 실제 HOST를 보고 일치하는지 확인해보겠습니다.

docker exec -it airflow-airflow-worker-1 /bin/bash
echo $HOSTNAM

이를 통해서, airflow의 실제 task를 수행하는 주체는 worker인 것을 확인할 수 있습니다. 또한 추가적으로, 도커 컨테이너의 컨테이너 ID는 host 이름이 되는 것 또한 확인할 수 있습니다.

profile
데이터엔지니어

0개의 댓글