- DAGs로 arrange 되고, Airflow의 execution unit이다.
- Operators
- 미리 정의된 task templat이다. (DAG의 task 대부분이 이러하다.)
- Sensors
- Operator의 특별한 subclass로 외부 이벤트가 발생하는 것에대해 기다리는 것이다.
- TaskFlow - @task
- python function을 task로 packaging 해주는 것이다.
Relationships
- task 사이 관계를 어떻게 정의해주냐가 핵심이다.
- dependencies, upstream, downstream etc..
first_task >> second_task >> [third_task, fourth_task]
first_task.set_downstream(second_task)
third_task.set_upstream(second_task)
- relationship 방법 2
- 보통 방법 1(bitshift) 를 추천한다. Easy 하니까
- Task들은 default로 서로 정보를 전달하지 않고, 독립적으로 run한다.
- information을 전달하고 싶다면, XComs를 사용해야된다 한다.
Task Instances
- docs에서 말하는 Task Instance의 states이다.
- 이상적으로는 none -> schduled -> queued -> running -> success라고 한다.
Relationship Terminology
task1 >> task2 >> task3
- DAG runs 시, 각 task에 대한 instance가 만들어진다. 하지만 같은 data interval을 가지고 있다.
- 같은 task임에도 다른 data interval을 가질 수도 있다.
Timeouts
- execution_timeout 속성을 Date.time.timedelta로 최대 runtime timeout을 지정할 수 있다.
- timeout 시 AirflowTaskTimeout이 raise 된다.
- Sensor에서는 reschdule mode일 때 timeout이 중요한데, 이는 sensor(task)가 성공할때까지 재스케줄링, 실행을 하는데 도움이 된다.
sensor = SFTPSensor(
task_id="sensor",
path="/root/test",
execution_timeout=timedelta(seconds=60),
timeout=3600,
retries=2,
mode="reschedule",
)
SLAs
- Service Level Agreement
- Task를 더이상 실행할 수 없으면 SLA Misses라는 것을 User Interface에 시각화 할 수 있다. 이메일로도 보낼 수 있다.
- 특정 runtime에 도달 이후 task를 취소하려면 Timouts 대신에 사용하면 된다.
- Setting 하기 위해서는 dateitme.timedelta object를 task/operator의 sla parameter로 전달해주면 된다.
- 추가로 sla_miss_callback을 적용한다.
- SLA missed 시 자신만의 logic을 처리할 수 있다.
- 스케줄링된 task만이 SLA check할 필요가 있다. 수동으로 triggered 하는 Task는 SLA miss를 포함하고 있지 않다.
sla_miss_callback
- SLA가 missed 됐을 때, 내가 정의한 logic이 호출된다.
- 5개 paramter를 요구한다.
- dag (SLA missing task를 포함하고 있는 Parent DAG Object)
- task_list (SLA를 missed할 모든 태스크에 대한 String list)
- blocking_task_list (같은 시점에 SUCCESS 상태에 진입하지 못한 DAGRuns안의 task
- slas (task_list 파라미터안에 있는 task와 관련된 SlaMiss List)
- blocking_tis (blocking_task_list 파라미터 안에 있는 task와 연관된 TaskInstance Object 목록)
def my_sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
...
def my_sla_miss_callback(*args):
...
def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
print(
"The callback arguments are: ",
{
"dag": dag,
"task_list": task_list,
"blocking_task_list": blocking_task_list,
"slas": slas,
"blocking_tis": blocking_tis,
},
)
@dag(
schedule_interval="*/2 * * * *",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
sla_miss_callback=sla_callback,
default_args={'email': "email@example.com"},
)
def example_sla_dag():
@task(sla=datetime.timedelta(seconds=10))
def sleep_20():
"""Sleep for 20 seconds"""
time.sleep(20)
@task
def sleep_30():
"""Sleep for 30 seconds"""
time.sleep(30)
sleep_20() >> sleep_30()
dag = example_sla_dag()
Zombie/Undead Tasks
- Airflow는 Zombie tasks를 알아서 클리닝해준다.
- Undead task는 running 중이지만 그렇지 않은 tasks이다. 보통 UI로 편집한 task들이 그렇다. Airflow는 그래도 알아서 찾아서 종료시켜준다.
Executor Configuartion
MyOperator(...,
executor_config={
"KubernetesExecutor":
{"image": "myCustomDockerImage"}
}
)
- Optional configuration이다.
- image를 set해서 task를 실행하도록 해준다.