jupyter notebook --ip= local ip주소
%%writefile ./dags/tutorial.py
from datetime import timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
작업 생성시 사용할 기본 매개변수를 Dictionary 타입으로 정의
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['yje14800@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
작업을 연결할 DAG 객체 생성
with DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(2), # => 이해 안감
tags=['example'],
) as dag:
작업 인스턴스 생성
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beggining of the DAG
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
)
job 하나 더 추가
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
)
set_downstream & set_upstream
>> & <<
t1 >> [t2, t3]
t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
airflow tasks list tutorial --tree
airflow tasks test
명령어airflow tasks test tutorial print_date 2022-01-01
airflow dags test [dag_id] [execution_date]
명령어airflow dags test tutorial 2022-01-01
Backfill 이란?
Backfill
이다. # start your backfill on a date range
airflow dags backfill tutorial \
--start-date 2022-01-01 \
--end-date 2022-01-07
이렇게 나오면 성공
[2022-01-29 18:42:11,902] {backfill_job.py:388} INFO - [backfill progress] | finished run 7 of 7 | tasks waiting: 0 | succeeded: 21 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2022-01-29 18:42:12,119] {local_executor.py:386} INFO - Shutting down LocalExecutor; waiting for running tasks to finish. Signal again if you don't want to wait.
[2022-01-29 18:42:13,864] {backfill_job.py:831} INFO - Backfill done. Exiting.
참고 실습 : https://www.comtec.kr/2021/08/09/airflow-tutorial/
airflow 개념 참고 : https://jwon.org/airflow-tips/
https://airflow.apache.org/docs/apache-airflow/2.1.4/tutorial_taskflow_api.html