[Airflow] Tasks

JeongChaeJin·2022년 8월 17일
0
  • DAGs로 arrange 되고, Airflow의 execution unit이다.
    • Operators
      • 미리 정의된 task templat이다. (DAG의 task 대부분이 이러하다.)
    • Sensors
      • Operator의 특별한 subclass로 외부 이벤트가 발생하는 것에대해 기다리는 것이다.
    • TaskFlow - @task
      • python function을 task로 packaging 해주는 것이다.

Relationships

  • task 사이 관계를 어떻게 정의해주냐가 핵심이다.
    • dependencies, upstream, downstream etc..
first_task >> second_task >> [third_task, fourth_task]
  • realtionship 방법 1
first_task.set_downstream(second_task)
third_task.set_upstream(second_task)
  • relationship 방법 2
  • 보통 방법 1(bitshift) 를 추천한다. Easy 하니까
  • Task들은 default로 서로 정보를 전달하지 않고, 독립적으로 run한다.
    • information을 전달하고 싶다면, XComs를 사용해야된다 한다.

Task Instances

  • docs에서 말하는 Task Instance의 states이다.

  • 이상적으로는 none -> schduled -> queued -> running -> success라고 한다.

Relationship Terminology

task1 >> task2 >> task3
  • DAG runs 시, 각 task에 대한 instance가 만들어진다. 하지만 같은 data interval을 가지고 있다.
  • 같은 task임에도 다른 data interval을 가질 수도 있다.

Timeouts

  • execution_timeout 속성을 Date.time.timedelta로 최대 runtime timeout을 지정할 수 있다.
    • timeout 시 AirflowTaskTimeout이 raise 된다.
  • Sensor에서는 reschdule mode일 때 timeout이 중요한데, 이는 sensor(task)가 성공할때까지 재스케줄링, 실행을 하는데 도움이 된다.
sensor = SFTPSensor(
    task_id="sensor",
    path="/root/test",
    execution_timeout=timedelta(seconds=60),
    timeout=3600,
    retries=2,
    mode="reschedule",
)

SLAs

  • Service Level Agreement
    • Task 실행 최대 회수
  • Task를 더이상 실행할 수 없으면 SLA Misses라는 것을 User Interface에 시각화 할 수 있다. 이메일로도 보낼 수 있다.
  • 특정 runtime에 도달 이후 task를 취소하려면 Timouts 대신에 사용하면 된다.
  • Setting 하기 위해서는 dateitme.timedelta object를 task/operator의 sla parameter로 전달해주면 된다.
    • 추가로 sla_miss_callback을 적용한다.
    • SLA missed 시 자신만의 logic을 처리할 수 있다.
  • 스케줄링된 task만이 SLA check할 필요가 있다. 수동으로 triggered 하는 Task는 SLA miss를 포함하고 있지 않다.

sla_miss_callback

  • SLA가 missed 됐을 때, 내가 정의한 logic이 호출된다.
  • 5개 paramter를 요구한다.
      1. dag (SLA missing task를 포함하고 있는 Parent DAG Object)
      1. task_list (SLA를 missed할 모든 태스크에 대한 String list)
      1. blocking_task_list (같은 시점에 SUCCESS 상태에 진입하지 못한 DAGRuns안의 task
      1. slas (task_list 파라미터안에 있는 task와 관련된 SlaMiss List)
    • blocking_tis (blocking_task_list 파라미터 안에 있는 task와 연관된 TaskInstance Object 목록)
def my_sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    ...
def my_sla_miss_callback(*args):
    ...
def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    print(
        "The callback arguments are: ",
        {
            "dag": dag,
            "task_list": task_list,
            "blocking_task_list": blocking_task_list,
            "slas": slas,
            "blocking_tis": blocking_tis,
        },
    )


@dag(
    schedule_interval="*/2 * * * *",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    sla_miss_callback=sla_callback,
    default_args={'email': "email@example.com"},
)
def example_sla_dag():
    @task(sla=datetime.timedelta(seconds=10))
    def sleep_20():
        """Sleep for 20 seconds"""
        time.sleep(20)

    @task
    def sleep_30():
        """Sleep for 30 seconds"""
        time.sleep(30)

    sleep_20() >> sleep_30()


dag = example_sla_dag()

Zombie/Undead Tasks

  • Airflow는 Zombie tasks를 알아서 클리닝해준다.
  • Undead task는 running 중이지만 그렇지 않은 tasks이다. 보통 UI로 편집한 task들이 그렇다. Airflow는 그래도 알아서 찾아서 종료시켜준다.

Executor Configuartion

MyOperator(...,
    executor_config={
        "KubernetesExecutor":
            {"image": "myCustomDockerImage"}
    }
)
  • Optional configuration이다.
    • image를 set해서 task를 실행하도록 해준다.
profile
OnePunchLotto

0개의 댓글