airflow dag

yoon__0_0·2024년 6월 17일
0

이어드림 수업

목록 보기
62/103

dag 이란

  • Directred Acyclic Graph
  • 방향성 있는 비순환 그래프
  • 실행할 작업들을 순서에 맞게 구성한 워크플로우
  • dag을 구성하는 각 작업들을 task라고 부름
  • 분기 실행과 병렬실행이 가능함.

dag 주의사항

  • dag id와 python 파일 이름은 보통 동일하게 생성하기
  • task_id 의 값과 task 의 변수 값도 동일하게 생성하기
  • bash shell 이용시 실행 권한을 줘야함

airflow dag 만드는 방법

with 문 이용하기

  • 가장 많이 사용함.
from airflow import DAG
import datetime
import pendulum
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_operator",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2024, 6, 1, tz="Asia/Seoul"),
    catchup=True
) as dag:
    bash_t1 = BashOperator(
        task_id="bash_t1",
        bash_command="echo whoami",
    )

    bash_t2 = BashOperator(task_id="bash_t2",
                           bash_command="echo $HOSTNAME", )

    bash_t1 >> bash_t2
  • bash shell 이용하기
#!/bin/bash

FRUIT=$1
if [ $FRUIT == APPLE ];then
	echo "You selected Apple!"
elif [ $FRUIT == ORANGE ];then
	echo "You selected Orange!"
elif [ $FRUIT == GRAPE ];then
	echo "You selected Grape!"
else
	echo "You selected other Fruit!"
fi
  • python 파일
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_select_fruit",
    schedule="10 0 * * 6#1",
    start_date=pendulum.datetime(2024, 6, 14, tz="Asia/Seoul"),
    catchup=False
) as dag:
    t0_grant_permission = BashOperator(
        task_id="t0_grant_permission",
        bash_command="chmod +x /opt/airflow/plugins/shell/select_fruit.sh ",
    )
    t1_orange = BashOperator(
        task_id="t1_orange",
        bash_command="/opt/airflow/plugins/shell/select_fruit.sh ORANGE",
    )

    t2_avocado = BashOperator(
        task_id="t2_avocado",
        bash_command="/opt/airflow/plugins/shell/select_fruit.sh AVOCADO",
    )

    t0_grant_permission >> [t1_orange >> t2_avocado]

standard 방식

  • 가장 직관적인 방식
from airflow import DAG
import datetime
import pendulum
from airflow.operators.bash import BashOperator

my_dag = DAG(
    dag_id="dags_bash_operator_standard",
    start_date=pendulum.datetime(2024, 6, 14, tz="Asia/Seoul"),
    schedule="0 9 * * 1,5",
    tags=['homework']
)

bash_hello = BashOperator(
    task_id="bash_hello",
    bash_command="echo hello >> /opt/airflow/test_dif/test.txt",
    dag=my_dag
)

bash_world = BashOperator(
    task_id="bash_world",
    bash_command="echo world >> /opt/airflow/test_dif/test.txt ",
    dag=my_dag
)


bash_hello >> bash_world

decorator 방식

  • 데코레이터 방식
  • 많이 사용되지는 않음
from airflow.decorators import dag
import pendulum
from airflow.operators.bash import BashOperator


@dag(start_date=pendulum.datetime(2024, 5, 1, tz="Asia/Seoul")
     , schedule="0 13 * * 5#2"
     , tags=['homework']
     , catchup= False)
def dags_bash_operator_decorator():
    int_task = BashOperator(
        task_id="int_task",
        bash_command="bash /opt/airflow/plugins/shell/is_equal.sh 1 2 3"
    )

    str_task = BashOperator(
        task_id="str_task",
        bash_command="bash /opt/airflow/plugins/shell/is_equal.sh a a b",
    )

    int_task >> str_task


dags_bash_operator_decorator()
profile
신윤재입니다

0개의 댓글