- Aiflow의 Task를 모으는 core concept이다.
- Tasks는 A,B,C,D 이다. run 시 해당 task 들은 의존성을 갖고 순서대로 수행된다.
- 내일부터 5분 마다, 새해 첫날 마다 등으로 수행할 수도 있다.
- DAG는 task에서 어떤 작업을 수행하는지 신경쓰지 안흔ㄴ다. 오직 어떻게 그것들이 실행될 것인지에만 신경쓴다.
- 어떤 순서로, 얼마나 재수행할지, timeout은 등
Declaring a DAG
- DAG를 declaring 하는 세가지 방법이있다.
with DAG(
"my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_interval="@daily", catchup=False
) as dag:
op = EmptyOperator(task_id="task")
my_dag = DAG("my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_interval="@daily", catchup=False)
op = EmptyOperator(task_id="task", dag=my_dag)
@dag(start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_interval="@daily", catchup=False)
def generate_dag():
op = EmptyOperator(task_id="task")
dag = generate_dag()
- @dag를 통한 방법
- DAG generator로 function을 turon 해준다.
Task Dependencies
first_task >> [second_task, third_task]
third_task << fourth_task
- Task/Operator는 보통 혼자 있지않고, 서로 의존성을 가진다.
- Task들 사이 의존성을 선언하는 방법은 DAG 구조로 만드는 것이다.
first_task.set_downstream(second_task, third_task)
third_task.set_upstream(fourth_task)
- 해당 method를 통해서 명시적으로 적용하는 방법도 있다.
from airflow.models.baseoperator import cross_downstream
cross_downstream([op1, op2], [op3, op4])
- cross_downstream이라는 shortcut으 사용하면 위의 task를 쉽게 대체할 수 있다.
from airflow.models.baseoperator import chain
chain(op1, op2, op3, op4)
chain(*[EmptyOperator(task_id='op' + i) for i in range(1, 6)])
- chain을 사용해서 의존성을 함께 묶는 shortcut도 있다.
from airflow.models.baseoperator import chain
chain(op1, [op2, op3], [op4, op5], op6)
Loading DAGs
dag_1 = DAG('this_dag_will_be_discovered')
def my_function():
dag_2 = DAG('but_this_dag_will_not')
my_function()
- aiflow는 python source 파일에서 DAGs를 load 한다. 이는 DAG_FOLDER에 설정되어있다.
- 이는 해당 파일로 어떠한 DAG 객체도 load 하고 실행할 수 있다는 것을 의미한다.
dag_1 = DAG('this_dag_will_be_discovered')
def my_function():
dag_2 = DAG('but_this_dag_will_not')
my_function()
- load되는 순서에 유의해야되는데, 위처럼 코드를 작성했을 경우 dag_1이 적용되고 dag_2는 load되지 않는다. dag_1이 top level에 있는 것이다.
Running DAGs
- 2가지 방법이 있다.
- API로 수동으로 triggering
- schdule을 정의
with DAG("my_daily_dag", schedule_interval="@daily"):
...
- schedule_interval argument를 통해 스케줄을 정의할 수 있다.
with DAG("my_daily_dag", schedule_interval="0 * * * *"):
...
DAG Assignment
- 명시적으로 Task/Operator를 pass 하지 않아도 되는몇가지 방법이 있다.
- with DAG block안에 Operator를 선언
- @dag decordator로 Operator 선언
- DAG를 갖고 있는 Operator의 upstream, downstream에 Operator를 넣는다.
Default Arguments
- retires와 같은 default argument는 대부분 공통 set으로 많이 쓴다.
- defulat_ags를 통해 이러한 과정을 진행할 수 있다.
import pendulum
with DAG(
dag_id='my_dag',
start_date=pendulum.datetime(2016, 1, 1, tz="UTC"),
schedule_interval='@daily',
catchup=False,
default_args={'retries': 2},
) as dag:
op = BashOperator(task_id='dummy', bash_command='Hello World!')
print(op.retries)
The DAG decorator
@dag(
schedule_interval=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
)
def example_dag_decorator(email: str = 'example@example.com'):
"""
DAG to send server IP to email.
:param email: Email to send IP to. Defaults to example@example.com.
"""
get_ip = GetRequestOperator(task_id='get_ip', url="http://httpbin.org/get")
@task(multiple_outputs=True)
def prepare_email(raw_json: Dict[str, Any]) -> Dict[str, str]:
external_ip = raw_json['origin']
return {
'subject': f'Server connected from {external_ip}',
'body': f'Seems like today your server executing Airflow is connected from IP {external_ip}<br>',
}
email_info = prepare_email(get_ip.output)
EmailOperator(
task_id='send_email', to=email, subject=email_info['subject'], html_content=email_info['body']
)
dag = example_dag_decorator()
- @dag decordator는 DAG generator function으로 바꿔준다.
- 코드가 깔끔해지고 DAG 파라미터로 function에 setup 해줄 수 도 있다.
- Jinja template을 통해 Python Code나 {{ }} 형태로 파라미터를 접근할 수 있게된다.
Control Flow
- Branching
- 선택한 Task를 상황에 따라 이동시킬 수 있다.
- Latest Only
- Depnds On Past
- Trigger Rules
- DAG가 run할 Taskㄹ르 상황에 따라 지정해준다.
branching
- 모든 곳에 의존성이 없는 task를 branching 할 수 있다.
def branch_func(ti):
xcom_value = int(ti.xcom_pull(task_ids="start_task"))
if xcom_value >= 5:
return "continue_task"
elif xcom_value >= 3:
return "stop_task"
else:
return None
start_op = BashOperator(
task_id="start_task",
bash_command="echo 5",
xcom_push=True,
dag=dag,
)
branch_op = BranchPythonOperator(
task_id="branch_task",
python_callable=branch_func,
dag=dag,
)
continue_op = EmptyOperator(task_id="continue_task", dag=dag)
stop_op = EmptyOperator(task_id="stop_task", dag=dag)
start_op >> branch_op >> [continue_op, stop_op]