dag 이란
- Directred Acyclic Graph
- 방향성 있는 비순환 그래프
- 실행할 작업들을 순서에 맞게 구성한 워크플로우
- dag을 구성하는 각 작업들을 task라고 부름
- 분기 실행과 병렬실행이 가능함.
data:image/s3,"s3://crabby-images/94bab/94bab85bc4d4ba6862ed71a7b41b30b8cd6727c4" alt=""
dag 주의사항
- dag id와 python 파일 이름은 보통 동일하게 생성하기
- task_id 의 값과 task 의 변수 값도 동일하게 생성하기
- bash shell 이용시 실행 권한을 줘야함
airflow dag 만드는 방법
with 문 이용하기
from airflow import DAG
import datetime
import pendulum
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_operator",
schedule="0 0 * * *",
start_date=pendulum.datetime(2024, 6, 1, tz="Asia/Seoul"),
catchup=True
) as dag:
bash_t1 = BashOperator(
task_id="bash_t1",
bash_command="echo whoami",
)
bash_t2 = BashOperator(task_id="bash_t2",
bash_command="echo $HOSTNAME", )
bash_t1 >> bash_t2
#!/bin/bash
FRUIT=$1
if [ $FRUIT == APPLE ];then
echo "You selected Apple!"
elif [ $FRUIT == ORANGE ];then
echo "You selected Orange!"
elif [ $FRUIT == GRAPE ];then
echo "You selected Grape!"
else
echo "You selected other Fruit!"
fi
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_select_fruit",
schedule="10 0 * * 6#1",
start_date=pendulum.datetime(2024, 6, 14, tz="Asia/Seoul"),
catchup=False
) as dag:
t0_grant_permission = BashOperator(
task_id="t0_grant_permission",
bash_command="chmod +x /opt/airflow/plugins/shell/select_fruit.sh ",
)
t1_orange = BashOperator(
task_id="t1_orange",
bash_command="/opt/airflow/plugins/shell/select_fruit.sh ORANGE",
)
t2_avocado = BashOperator(
task_id="t2_avocado",
bash_command="/opt/airflow/plugins/shell/select_fruit.sh AVOCADO",
)
t0_grant_permission >> [t1_orange >> t2_avocado]
standard 방식
from airflow import DAG
import datetime
import pendulum
from airflow.operators.bash import BashOperator
my_dag = DAG(
dag_id="dags_bash_operator_standard",
start_date=pendulum.datetime(2024, 6, 14, tz="Asia/Seoul"),
schedule="0 9 * * 1,5",
tags=['homework']
)
bash_hello = BashOperator(
task_id="bash_hello",
bash_command="echo hello >> /opt/airflow/test_dif/test.txt",
dag=my_dag
)
bash_world = BashOperator(
task_id="bash_world",
bash_command="echo world >> /opt/airflow/test_dif/test.txt ",
dag=my_dag
)
bash_hello >> bash_world
decorator 방식
from airflow.decorators import dag
import pendulum
from airflow.operators.bash import BashOperator
@dag(start_date=pendulum.datetime(2024, 5, 1, tz="Asia/Seoul")
, schedule="0 13 * * 5#2"
, tags=['homework']
, catchup= False)
def dags_bash_operator_decorator():
int_task = BashOperator(
task_id="int_task",
bash_command="bash /opt/airflow/plugins/shell/is_equal.sh 1 2 3"
)
str_task = BashOperator(
task_id="str_task",
bash_command="bash /opt/airflow/plugins/shell/is_equal.sh a a b",
)
int_task >> str_task
dags_bash_operator_decorator()