Airflow Study4- Bash Operator & 외부 쉘파일

박성현·2024년 5월 28일

Airflow

목록 보기
10/28

why 쉘 스크립트 ?

복잡한 로직을 처리하는 경우
ex )
1. sftp를 통해 파일을 받은 후 DB에 insert & tar.gz로 압축
2. 쉘 명령어 재사용 가능 ( like db insert 시 서버 정보만 변수로 받고 insert 하는 부분은 쉘로 재사용가능)

worker 컨테이너가 쉘 스크립트 수행 ?

issus :
1. 컨테이너는 외부 파일을 인식할 수 없다.
2. 컨테이너 안에 파일을 만들어주면 컨테이너 재시작시 파일이 사라진다.
=> git을 통한 dags폴더를 sharing 과 동일한 방식으로, 서버와 컨테이너 볼륨을 연결한다(docker_compose.ymal)
외부 .py 파일과 shell 파일은 plugins은 폴더에 저장하는것을 공식 문서에서 추천

참고
$1 은 쉘스크립에 첫번째 파라미터 값을 나타냄


select_fruit.sh



FRUIT=$1
if [ $FRUIT == APPLE ]; then
        echo "You selected Apple!"
elif [ $FRUIT == ORAGNE ]; then
        echo "You selected Oragne!"
elif [ $FRUIT == GRAPE ]; then
        echo "You selected Grape!"
else
        echo "You selected other Fruit!"
fi


dags_bash_select_fruit.py

from airflow import DAG
from airflow.operators.bash import BashOperator
import datetime
import pendulum

with DAG (
    dag_id = "dags_bash_select_fruit",
    schedule= "10 0 * * 6#1", # 0 시 10분 첫번째주 토요일 
    start_date= pendulum.datetime(2024,3,1 , tz="Asia/Seoul"),
    catchup= False
) as dag:
    
    t1_orange = BashOperator(
        task_id = "t1_orage",
        bash_command="/opt/airflow/plugins/shell/select_fruit.sh ORANGE"
    )
    
    t2_avocado = BashOperator(
        task_id = "t2_avocado",
        bash_command="/opt/airflow/plugins/shell/select_fruit.sh AVOCADO"
    )
    
    t1_orange >> t2_avocado

참고 Unpause 시 수행여부

매월 4일 스케쥴 걸어놓음
start_date= pendulum.datetime(2024,3,1 , tz="Asia/Seoul"),
위와 같이 시작일이 3/1이면
다음 수행 날짜는 3/4일이 될것임 (Next Run ~ (우상단))
이전 수행 날짜는 2/4일이 었을것이고

2/4 ~ (Case1) ~ 3/4 ~ (Case2)

오늘 날짜가가 (Case1) or (Case2) 인지에 따라 unpause시 자동으로 한번 돌지 말지 결정됨
Case1 : 한번 수행됨
Case2 : unpause해도 수행 안됨
-> catchup False로 줬기 때문

profile
다소Good한 데이터 엔지니어

0개의 댓글