A DAG stands for Directed Acyclic Graph
from airflow.models import DAG
from datetime import datetime
default_arguments = {
'owner' : 'jee'
'start_date' : datetime(2022, 4, 10)
}
etl_dag =DAG('etl_workflow', default_args = default_arguments )
airflow -h
: descriptionairflow list_dags
: all recognized DAGscommand line | Python |
---|---|
Start Aiflow processes | Create a DAG |
Manually run DAGs / Tasks | Edit the individual properties of a DAG |
Get logging info from Airflow |
from airflow.operators.bash_operator import BashOperator
BashOperator(
task_id = 'bash_example',
bash_command = 'echo "Example!"',
dag = ml_dag
)
bask_task = BashOperator(task_id = 'clean_addresses', bash_command = 'cat addresses.txt | awk "NF==10" > 'cleaned.txt', dag=dag)
# chained dependencies
task1 >> task2 >> task3 >> task4
# Mixed dependencies
task1 >> task2 << task3
# OR
task1 >> task2
task3 >> task2
# Add another Python task
parse_file_task = PythonOperator(
task_id='parse_file',
# Set the function to call
python_callable=parse_file,
# Add the arguments
op_kwargs={'inputfile':'latestsales.json', 'outputfile':'parsedfile.json'},
# Add the DAG
dag=process_sales_dag
)
schedule_interval
:
start_date
and end_date
start_date
+ schedule_interval
'start_date' : datetime(2020, 2, 25)
'schedule_interval' : @daily
# Update the scheduling arguments as defined
default_args = {
'owner': 'Engineering',
'start_date': datetime(2019, 11, 1),
'email': ['airflowresults@datacamp.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=20)
}
# 12:30 pm every Wednesday
dag = DAG('update_dataflows', default_args=default_args, schedule_interval='30 12 * * 3')
mode
: how to check for the conditionmode='poke'
: the default, run repeatedlymode='reschedule'
: give up task slot and try again laterpoke_interval
: how often to wait between checkstimeout
:how long to wait between failing task: checks for the existence of a file at a certain location
SequentialExecutor
LocalExecutor
CeleryExecutor
what is Celery ?
Celery: 분산 메시지 전달을 기반으로 동작하는 비동기 작업 큐(Asynchronous Task/Job Queue)
작업 > 작업자 : 작업 > 브로커 > 작업자refs: https://jonnung.dev/python/2018/12/22/celery-distributed-task-queue/
SLA Miss
is any time the task/DAG does not meet the expected timingtimedelta(seconds = 30)
# Import the timedelta object
from datetime import timedelta
test_dag = DAG('test_workflow', start_date=datetime(2020,2,20), schedule_interval='@None')
# Create the task with the SLA
task1 = BashOperator(task_id='first_task',
sla=timedelta(hours = 3),
bash_command='initialize_data.sh',
dag=test_dag)
BranchPythonOperator
def branch_test(**kwargs):
if int([kwargs['ds_nodash']) % 2 == 0:
return 'even_day_task'
else:
return 'odd_day_task'
branch_task = BranchPythonOperator(task_id = 'branch_task', dag=dag, provide_context = True, python_callable = branch_test
start_task >> branch_task >> even_day_task >> even_day_task2
branch_task >> odd_day_task >> odd_day_task2
airflow run <dag_id> <task_id> <date>
: running a specific task
airflow trigger_dag -e <date> <dag_id>
: running a full DAG
from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from dags.process import process_data
from datetime import timedelta, datetime
# Update the default arguments and apply them to the DAG
default_args = {
'start_date': datetime(2019,1,1),
'sla': timedelta(minutes=90)
}
dag = DAG(dag_id='etl_update', default_args=default_args)
sensor = FileSensor(task_id='sense_file',
filepath='/home/repl/workspace/startprocess.txt',
poke_interval=45,
dag=dag)
bash_task = BashOperator(task_id='cleanup_tempfiles',
bash_command='rm -f /home/repl/*.tmp',
dag=dag)
python_task = PythonOperator(task_id='run_processing',
python_callable=process_data,
provide_context=True,
dag=dag)
sensor >> bash_task >> python_task
from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email_operator import EmailOperator
from dags.process import process_data
from datetime import datetime, timedelta
# Update the default arguments and apply them to the DAG.
default_args = {
'start_date': datetime(2019,1,1),
'sla': timedelta(minutes=90)
}
dag = DAG(dag_id='etl_update', default_args=default_args)
sensor = FileSensor(task_id='sense_file',
filepath='/home/repl/workspace/startprocess.txt',
poke_interval=45,
dag=dag)
bash_task = BashOperator(task_id='cleanup_tempfiles',
bash_command='rm -f /home/repl/*.tmp',
dag=dag)
python_task = PythonOperator(task_id='run_processing',
python_callable=process_data,
provide_context=True,
dag=dag)
email_subject="""
Email report for {{ params.department }} on {{ ds_nodash }}
"""
email_report_task = EmailOperator(task_id='email_report_task',
to='sales@mycompany.com',
subject=email_subject,
html_content='',
params={'department': 'Data subscription services'},
dag=dag)
no_email_task = DummyOperator(task_id='no_email_task', dag=dag)
def check_weekend(**kwargs):
dt = datetime.strptime(kwargs['execution_date'],"%Y-%m-%d")
# If dt.weekday() is 0-4, it's Monday - Friday. If 5 or 6, it's Sat / Sun.
if (dt.weekday() < 5):
return 'email_report_task'
else:
return 'no_email_task'
branch_task = BranchPythonOperator(task_id='check_if_weekend',
python_callable=check_weekend,
provide_context=True,
dag=dag)
sensor >> bash_task >> python_task
python_task >> branch_task >> [email_report_task, no_email_task]