[Airflow] Bash Operator

minyeamer·2025년 5월 30일
0

Apache Airflow 배우기

목록 보기
3/13
post-thumbnail

Operator

  • 특정 행위를 할 수 있는 기능을 모아 놓은 클래스
    • Task : Operator를 객체화하여 DAG에서 실행 가능한 오브젝트
  • Bash Operator : 쉘 스크립트 명령을 수행하는 Operator
  • Python Operator : 파이썬 함수를 실행하는 Operator

개발 환경 설정

Git & Github

% git init
% git add .gitignore
% git commit -m "init"
% git remote add origin https://github.com/<username>/<repository>
% git push
  • Git 설치 및 Github에 리포지토리를 생성
  • 로컬 Airflow 설치 경로에서 Git 저장소를 생성하고 원격 저장소와 연결
  • Airflow에서 제공하는 .gitignore 를 로컬 Airflow 설치 경로에 복제하여 불필요한 /logs 경로 등을 Git에서 제외

https://github.com/apache/airflow/blob/main/.gitignore

디렉토리 구조

% tree -a -F .             
./
├── .env
├── .git/
├── .gitignore
├── config/
│   └── airflow.cfg
├── dags/
├── docker-compose.yaml
├── logs/
└── plugins/

Example DAGs 삭제

# docker-compose.yaml

AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
  • 기본 설정으로 Airflow 실행 시 거슬리는 Example DAG를 삭제
  • 컨테이너를 재시작해도 여전히 예제가 남아있어 airflow db reset 등 여러가지 초기화 방법을 탐색했지만, 결과적으로 localhost:8080 에 대한 크롬 브라우저 캐시를 삭제하니 해결
    remove-examples

BashOperator

https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/operators/bash.html

  • dags/ 경로 아래에 bash_operator.py 파일을 생성
  • Airflow에서 제공하는 예시 example_bash_operator 를 복제 및 일부를 수정하여 DAG 선언
# dags/bash_operator.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id="bash_operator",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "bash"],
) as dag:
    bash_task1 = BashOperator(
        task_id="bash_task1",
        bash_command="echo whoami",
    )

    bash_task2 = BashOperator(
        task_id="bash_task2",
        bash_command="echo $HOSTNAME",
    )

    bash_task1 >> bash_task2

DAG 선언

  • dag_id : UI에서 표시되는 DAG 명칭, 관리의 용이성을 위해 파일 명칭과 동일하게 지정
  • schedule : 크론탭(Crontab)을 사용하여 스케줄 지정
  • start_date : 작업 시작일을 지정, 예시에는 시간대가 UTC로 적용되어 Asia/Seoul로 변경
  • catchup : 시작일(start_date)부터 현재까지 실행되지 않은 작업을 일괄로 실행 (True)
  • dagrun_timeout : DAG 실행 시간을 제한
  • tags : UI에서 DAG를 구별하기 위한 태그 지정

BashOperator 활용

  • BashOperator 를 사용하여 두 개의 bash 명령어를 실행하는 Task를 정의
  • task_id : UI에서 표시되는 Task 명칭, 관리의 용이성을 위해 객체명과 동일하게 지정
  • bash_command : 실행할 bash 명령어
  • Task 간 >> 기호로 연결하여 종속성을 표시
    • 여러 개의 Task를 묶고 싶을 때는 배열을 사용 (task1 >> [task2, task3])
    • 종속성은 연속해서 표시 가능 (task1 >> task2 >> task3)

https://github.com/apache/airflow/blob/providers-standard/1.2.0/providers/standard/tests/system/standard/example_bash_operator.py

DAG 등록

  • Airflow 컨테이너를 중지 후 다시 실행해 UI에서 DAG가 올라온 것을 확인
    bash-operator-dag

DAG 실행

  • DAG를 실행하여 정상적으로 수행되는지 확인
  • DAG를 클릭해서 이동하는 페이지에서 bash_taskbash_task2 의 관계를 그래프로 조회 가능
    bash-operator-task

로그 조회

  • bash_task1bash_task2 에 대해 각각의 로그를 확인
  • bash_task1 에는 echo whoami 명령어가 전달되어 whoami 를 결과로 출력
  • bash_task2 에는 echo $HOSTNAME 명령어가 전달되어 컨테이너의 HOSTNAME 을 출력
# bash_task1

[2025-05-30, 00:16:56] INFO - Running command: ['/usr/bin/bash', '-c', 'echo whoami']: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-30, 00:16:56] INFO - Output:: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-30, 00:16:56] INFO - whoami: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
# bash_task2

[2025-05-30, 00:16:57] INFO - Running command: ['/usr/bin/bash', '-c', 'echo $HOSTNAME']: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-30, 00:16:57] INFO - Output:: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-30, 00:16:57] INFO - wha439072926: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
profile
데이터의 모든 것을 추구합니다.

0개의 댓글