context manager
with DAG("dag_id1", start_date=days_ago(2), schedule_interval=None) as dag:
op = DummyOperator(task_id="task_id1")
standard constructor
first_dag = DAG("dag_id2", start_date=days_ago(2), schedule_interval=None)
op = DummyOperator(task_id="task_id2", dag=first_dag)
decorator
@dag(start_date=days_ago(2), schedule_interval=None)
def generate_dag():
op = DUmmyOperator(task_id="task_id3")
dag = generate_dag()
<<
, >>
연산자를 사용
first_task >> [second_task, third_task]
third_task << fourth_task
set_upstream()
, set_downstream()
메소드 사용
first_task.set_downstream(second_task, third_task)
third_task.set_upstream(fourth_task)
chain()
from airflow.models.baseoperator import chain
# Replaces
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
cahin(op1, [op2, op3], [op4, op5], op6)
cross_downstream()
, 더 복잡한 의존 관계 표현
from airflow.models.baseoperator import cross_downstream
# Replaces
# [op1, op2] >> op3
# [op1, op2] >> op4
cross_downstream([op1, op2], [op3, op4])
DAG_FOLDER
에서 python source file들을 각각 가져와 실행하고 DAG object로 로딩dag_1 = DAG('loaded')
def my_func():
dag_2 = DAG('not_loaded')
my_func()
위 예시에서 dag_1 은 최상위 레벨(globals()
) 의 DAG instance이기 때문에 로딩 되지만, dag_2는 로딩되지 않음DAG_FOLDER
내의 .airflowignore
파일로 무시할 파일이나 subfolder 설정 가능start_date
)를 선언하기 위해 DAG에 default_args
로 전달default_args = {
'start_date': datetime(2022, 1, 1),
'owner': 'airflow'
}
with DAG('dag_id', default_args=default_args) as dag:
op = DummyOperator(task_id='dummy')
BranchPythonOperator
는PythonOperator와 유사하나(PythonOperator를 상속), python_callable 인자에서 호출 이후 task_id 또는 task_id의 list를 returnfrom airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
default_args = {
'start_date': datetime(2022, 1, 15),
'catchup': False,
'owner': 'Airflow'
}
with DAG('control_flow', schedule_interval=None, default_args=default_args) as dag:
def branch_func(ti):
xcom_value = int(ti.xcom_pull(task_ids="start_task"))
if xcom_value >= 5:
return "continue_task"
else:
return "stop_task"
start_op = BashOperator(
task_id="start_task",
bash_command="echo 5",
do_xcom_push=True,
dag=dag,
)
branch_op = BranchPythonOperator(
task_id="branch_task",
python_callable=branch_func,
dag=dag,
)
continue_op = DummyOperator(task_id="continue_task", dag=dag)
stop_op = DummyOperator(task_id="stop_task", dag=dag)
start_op >> branch_op >> [continue_op, stop_op]
LatestOnlyOperator
를 사용import datetime as dt
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule
with DAG(
dag_id='latest_only_with_trigger',
schedule_interval=dt.timedelta(hours=4),
start_date=dt.datetime(2021, 1, 1),
catchup=False,
tags=['example3'],
) as dag:
latest_only = LatestOnlyOperator(task_id='latest_only')
task1 = DummyOperator(task_id='task1')
task2 = DummyOperator(task_id='task2')
task3 = DummyOperator(task_id='task3')
task4 = DummyOperator(task_id='task4', trigger_rule=TriggerRule.ALL_DONE)
latest_only >> task1 >> [task3, task4]
task2 >> [task3, task4]
ALL_SUCCESS
)이므로 task1이 최근 날짜의 실행이 아니면 skip하여 task3도 skip하게 됨ALL_DONE
으로 설정되었으므로 실행depends_on_past
를 True
로 설정trigger_rule
=something
으로 설정with DAG(
dag_id='Documentation_Example',
schedule_interval=None,
start_date=dt.datetime(2022, 1, 15),
catchup=False,
) as dag:
dag.doc_md = """
# DAG Documentation
DAG Explaination
"""
task_documentation_example = DummyOperator(task_id='task_documentation_example')
task_documentation_example.doc_md = """\
# Task Documentation
Task Explaination
# """
task_documentation_example
참고 자료
https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html
Udemy - The Complete Hands-on Introduction to Apache Airflow