Introduction to Airflow in Python

All We Need is Data, itself !·2022년 4월 10일
0

DataCamp

목록 보기
5/13

Quick introduction to DAGs

A DAG stands for Directed Acyclic Graph

  • In Airflow, this represents the set of tasks that make up your workflow
  • Consists of the tasks and the dependencies between tasks
  • created with various details about DAG

Airflow DAGs

What is a DAG?

  • Directed Acyclic Graph
  • Directed, there is an inherent flow representing dependencies between components
  • Acyclic, does not loop/cycle/repeat
  • Graph, the actual set of components

  • written in Python
  • Are made up of components(tasks)
  • contain dependencies defined explicitly or implicitly
from airflow.models import DAG
from datetime import datetime

default_arguments = {
	'owner' : 'jee'
    'start_date' : datetime(2022, 4, 10)
}

etl_dag =DAG('etl_workflow', default_args = default_arguments )

DAGs on the command line

  • airflow -h : description
  • airflow list_dags : all recognized DAGs

Command line vs. Python

command linePython
Start Aiflow processesCreate a DAG
Manually run DAGs / TasksEdit the individual properties of a DAG
Get logging info from Airflow

Operators

  • represent a single task in a workflow
  • Run independently
  • Generally do not share information
  • Various operators to perform different tasks

BashOperator

from airflow.operators.bash_operator import BashOperator

BashOperator(
	task_id = 'bash_example',
    bash_command = 'echo "Example!"',
    dag = ml_dag
)
bask_task = BashOperator(task_id = 'clean_addresses', bash_command = 'cat addresses.txt | awk "NF==10" > 'cleaned.txt', dag=dag)

Tasks

task dependency

Multiple dependencies

# chained dependencies
task1 >> task2 >> task3 >> task4

# Mixed dependencies
task1 >> task2 << task3

# OR
task1 >> task2
task3 >> task2

Example of Python task

# Add another Python task
parse_file_task = PythonOperator(
    task_id='parse_file',
    # Set the function to call
    python_callable=parse_file,
    # Add the arguments
    op_kwargs={'inputfile':'latestsales.json', 'outputfile':'parsedfile.json'},
    # Add the DAG
    dag=process_sales_dag
)
    

Airflow scheduling

schedule_interval :

  • how often to schedule the DAG
  • Between the start_date and end_date

  • schedule the task at start_date + schedule_interval
'start_date' : datetime(2020, 2, 25)
'schedule_interval' : @daily

example

# Update the scheduling arguments as defined
default_args = {
  'owner': 'Engineering',
  'start_date': datetime(2019, 11, 1),
  'email': ['airflowresults@datacamp.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 3,
  'retry_delay': timedelta(minutes=20)
}

# 12:30 pm every Wednesday
dag = DAG('update_dataflows', default_args=default_args, schedule_interval='30 12 * * 3')

Airflow sensors

What is sensor?

  • An operator that waits for a certain condition to be true

kinds

  • mode : how to check for the condition
    • mode='poke' : the default, run repeatedly
    • mode='reschedule' : give up task slot and try again later
  • poke_interval : how often to wait between checks
  • timeout :how long to wait between failing task

File sensor

: checks for the existence of a file at a certain location


Airflow executors

What is an executor?

  • Executors run tasks

kinds

  • SequentialExecutor

    • the default Airflow executor
    • Runs one task at a time
    • Useful for debugging
    • While functional, not really recommended for production
  • LocalExecutor

    • runs on a single system
    • treats tasks as processes
    • Parallelism defined by the user
    • can utilize all resources of a given host system
  • CeleryExecutor

    • uses a Celery backend as task manager
    • Multiple worker systems can be defined
    • Is significantly more difficult to setup & configure

what is Celery ?
Celery: 분산 메시지 전달을 기반으로 동작하는 비동기 작업 큐(Asynchronous Task/Job Queue)
작업 > 작업자 : 작업 > 브로커 > 작업자

refs: https://jonnung.dev/python/2018/12/22/celery-distributed-task-queue/


SLAs

What is an SLA?

  • An SLA stands for Service Level Agreement
  • Within Airflow, the amount of time a task or a DAG should require to run
  • An SLA Miss is any time the task/DAG does not meet the expected timing

timedelta obj

  • timedelta(seconds = 30)

Example

# Import the timedelta object
from datetime import timedelta

test_dag = DAG('test_workflow', start_date=datetime(2020,2,20), schedule_interval='@None')

# Create the task with the SLA
task1 = BashOperator(task_id='first_task',
                     sla=timedelta(hours = 3),
                     bash_command='initialize_data.sh',
                     dag=test_dag)

General reporting

  • options for success, failure, error

Branching

  • provides conditional logic
  • using BranchPythonOperator

example

def branch_test(**kwargs):
	if int([kwargs['ds_nodash']) % 2 == 0:
    	return 'even_day_task'
    else:
    	return 'odd_day_task'

branch_task = BranchPythonOperator(task_id = 'branch_task', dag=dag, provide_context = True, python_callable = branch_test

start_task >> branch_task >> even_day_task >> even_day_task2
branch_task >> odd_day_task >> odd_day_task2


Creating a production pipeline

Running DAGs & Tasks

airflow run <dag_id> <task_id> <date>
: running a specific task

airflow trigger_dag -e <date> <dag_id>
: running a full DAG

pipeline executing Example

from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from dags.process import process_data
from datetime import timedelta, datetime

# Update the default arguments and apply them to the DAG
default_args = {
  'start_date': datetime(2019,1,1),
  'sla': timedelta(minutes=90)
}

dag = DAG(dag_id='etl_update', default_args=default_args)

sensor = FileSensor(task_id='sense_file', 
                    filepath='/home/repl/workspace/startprocess.txt',
                    poke_interval=45,
                    dag=dag)

bash_task = BashOperator(task_id='cleanup_tempfiles', 
                         bash_command='rm -f /home/repl/*.tmp',
                         dag=dag)

python_task = PythonOperator(task_id='run_processing', 
                             python_callable=process_data,
                             provide_context=True,
                             dag=dag)

sensor >> bash_task >> python_task
from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email_operator import EmailOperator
from dags.process import process_data
from datetime import datetime, timedelta

# Update the default arguments and apply them to the DAG.

default_args = {
  'start_date': datetime(2019,1,1),
  'sla': timedelta(minutes=90)
}
    
dag = DAG(dag_id='etl_update', default_args=default_args)

sensor = FileSensor(task_id='sense_file', 
                    filepath='/home/repl/workspace/startprocess.txt',
                    poke_interval=45,
                    dag=dag)

bash_task = BashOperator(task_id='cleanup_tempfiles', 
                         bash_command='rm -f /home/repl/*.tmp',
                         dag=dag)

python_task = PythonOperator(task_id='run_processing', 
                             python_callable=process_data,
                             provide_context=True,
                             dag=dag)

email_subject="""
  Email report for {{ params.department }} on {{ ds_nodash }}
"""

email_report_task = EmailOperator(task_id='email_report_task',
                                  to='sales@mycompany.com',
                                  subject=email_subject,
                                  html_content='',
                                  params={'department': 'Data subscription services'},
                                  dag=dag)

no_email_task = DummyOperator(task_id='no_email_task', dag=dag)

def check_weekend(**kwargs):
    dt = datetime.strptime(kwargs['execution_date'],"%Y-%m-%d")
    # If dt.weekday() is 0-4, it's Monday - Friday. If 5 or 6, it's Sat / Sun.
    if (dt.weekday() < 5):
        return 'email_report_task'
    else:
        return 'no_email_task'
    
branch_task = BranchPythonOperator(task_id='check_if_weekend',
                                   python_callable=check_weekend,
                                   provide_context=True,
                                   dag=dag)

    
sensor >> bash_task >> python_task

python_task >> branch_task >> [email_report_task, no_email_task]
profile
분명히 처음엔 데린이었는데,, 이제 개린이인가..

0개의 댓글