Airflow - DAGs

Yeo Myung Ro·2022년 2월 10일
0

Apache-Airflow

목록 보기
2/2
  • DAG(Directed Acyclic Graph)는 Airflow의 핵심 컨셉
  • Task들 간의 dependency와 relationship을 통해 Task들을 어떻게 실행해야 하는지를 알려줌

DAG 선언

  1. context manager

    with DAG("dag_id1", start_date=days_ago(2), schedule_interval=None) as dag:
    	op = DummyOperator(task_id="task_id1")
  2. standard constructor

    first_dag = DAG("dag_id2", start_date=days_ago(2), schedule_interval=None)
    op = DummyOperator(task_id="task_id2", dag=first_dag)
  3. decorator

    @dag(start_date=days_ago(2), schedule_interval=None)
    def generate_dag():
    	op = DUmmyOperator(task_id="task_id3")
    
    dag = generate_dag()

    참고

Task 의존성

  • Task/Operator는 일반적으로 다른 Task들과 의존 관계를 갖음
  • Task들 간의 이런 의존 관계를 선언하는 것이 DAG 구조를 구성하는 것
  • 의존 관계를 표현하는 방법
    1. <<, >> 연산자를 사용

      first_task >> [second_task, third_task]
      third_task << fourth_task
    2. set_upstream(), set_downstream() 메소드 사용

      first_task.set_downstream(second_task, third_task)
      third_task.set_upstream(fourth_task)
    3. chain()

      from airflow.models.baseoperator import chain
      # Replaces
      # op1 >> op2 >> op4 >> op6
      # op1 >> op3 >> op5 >> op6
      cahin(op1, [op2, op3], [op4, op5], op6)
    4. cross_downstream(), 더 복잡한 의존 관계 표현

      from airflow.models.baseoperator import cross_downstream
      # Replaces
      # [op1, op2] >> op3
      # [op1, op2] >> op4
      cross_downstream([op1, op2], [op3, op4])

DAG 로딩

  • Airflow는 DAG_FOLDER 에서 python source file들을 각각 가져와 실행하고 DAG object로 로딩
  • python file에서 DAG를 로드할 때, 최상위 레벨의 DAG instance만 가져옴
    dag_1 = DAG('loaded')
    
    def my_func():
    	dag_2 = DAG('not_loaded')
    
    my_func()
    위 예시에서 dag_1 은 최상위 레벨(globals()) 의 DAG instance이기 때문에 로딩 되지만, dag_2는 로딩되지 않음
  • DAG_FOLDER 내의 .airflowignore 파일로 무시할 파일이나 subfolder 설정 가능

DAG 실행

  • API나 수동으로 실행
  • DAG에 정의된 스케줄에 의해 실행

Default Arguments

  • DAG 내의 많은 Operator들에게 동일한 arguments(eg. start_date)를 선언하기 위해 DAG에 default_args 로 전달
default_args = {
	'start_date': datetime(2022, 1, 1),
	'owner': 'airflow'
}

with DAG('dag_id', default_args=default_args) as dag:
	op = DummyOperator(task_id='dummy')

Control Flow

  • 기본적으로 Task는 앞 Task가 성공했을 때 실행되나, 여러 가지 변경이 가능
    • Branching : 조건에 따라 실행할 Task를 선택
    • Latest Only : Branching의 특별한 형태로, 최근 실행한 DAG만 실행
    • Depends On Past : 이전에 실행한 Task의 결과 여부에 따라 현재 실행 여부가 나뉨
    • Trigger Rules : Task를 실행할 조건 설정

Branching

  • 조건에 따라 실행할 Task를 선택
  • BranchPythonOperator 는PythonOperator와 유사하나(PythonOperator를 상속), python_callable 인자에서 호출 이후 task_id 또는 task_id의 list를 return
  • return한 task_id의 Task를 실행하고 다른 경로는 skip
  • 예시 코드
    from airflow import DAG
    from datetime import datetime
    from airflow.operators.python import PythonOperator
    from airflow.operators.bash import BashOperator
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.python_operator import BranchPythonOperator
    
    default_args = {
        'start_date': datetime(2022, 1, 15),
        'catchup': False,
        'owner': 'Airflow'
    }
    
    with DAG('control_flow', schedule_interval=None, default_args=default_args) as dag:
    
        def branch_func(ti):
            xcom_value = int(ti.xcom_pull(task_ids="start_task"))
            if xcom_value >= 5:
                return "continue_task"
            else:
                return "stop_task"
    
        start_op = BashOperator(
            task_id="start_task",
            bash_command="echo 5",
            do_xcom_push=True,
            dag=dag,
        )
    
        branch_op = BranchPythonOperator(
            task_id="branch_task",
            python_callable=branch_func,
            dag=dag,
        )
    
        continue_op = DummyOperator(task_id="continue_task", dag=dag)
        stop_op = DummyOperator(task_id="stop_task", dag=dag)
    
        start_op >> branch_op >> [continue_op, stop_op]

Latest Only

  • Airflow의 DAG는 backfill과 같이 현재 시점이 아닌 다른 시점에 실행될 수 있음
  • 이 때 특정 날짜가 실행되지 않기를 원하면 LatestOnlyOperator를 사용
  • 최근 실행한 DAG가 아닌 경우, 모든 downstream Task를 skip
  • 예시 코드
    import datetime as dt
    
    from airflow import DAG
    from airflow.operators.dummy import DummyOperator
    from airflow.operators.latest_only import LatestOnlyOperator
    from airflow.utils.trigger_rule import TriggerRule
    
    with DAG(
        dag_id='latest_only_with_trigger',
        schedule_interval=dt.timedelta(hours=4),
        start_date=dt.datetime(2021, 1, 1),
        catchup=False,
        tags=['example3'],
    ) as dag:
        latest_only = LatestOnlyOperator(task_id='latest_only')
        task1 = DummyOperator(task_id='task1')
        task2 = DummyOperator(task_id='task2')
        task3 = DummyOperator(task_id='task3')
        task4 = DummyOperator(task_id='task4', trigger_rule=TriggerRule.ALL_DONE)
    
        latest_only >> task1 >> [task3, task4]
        task2 >> [task3, task4]
    • task1은 latest_only 와 바로 의존관계가 있는 downstream이므로 최근 날짜의 실행이 아니면 skip
    • task2는 latest_only와 바로 의존관계가 있지 않으므로 과거 날짜도 실행
    • task3은 trigger rule이 default(ALL_SUCCESS)이므로 task1이 최근 날짜의 실행이 아니면 skip하여 task3도 skip하게 됨
    • task4는 trigger rule이 ALL_DONE으로 설정되었으므로 실행

Depends On Past

  • 이전 DAGRun에서 Task가 성공하거나 skip된 경우에만 다음 DAG의 Task가 실행
  • 이전에 실행한 Task의 결과 여부에 따라 현재 실행 여부가 나뉨
  • Operater의 파라미터로 depends_on_pastTrue로 설정

Trigger Rules

  • Operator의 파라미터로 trigger_rule=something 으로 설정
    • all_success(default)
    • all_failed
    • all_done
    • one_failed
    • one_success
    • none_failed
    • always
    • etc...

DAG & Task Documentation

  • Airflow UI에서 볼 수 있도록 DAG와 Task object에 documentation을 추가할 수 있음
  • DAG는 Graph & Tree View, Task는 Task Instance Details
with DAG(
    dag_id='Documentation_Example',
    schedule_interval=None,
    start_date=dt.datetime(2022, 1, 15),
    catchup=False,
) as dag:
    
    dag.doc_md = """
        # DAG Documentation
        DAG Explaination
    """
    task_documentation_example = DummyOperator(task_id='task_documentation_example')
    task_documentation_example.doc_md = """\
        # Task Documentation
        Task Explaination
        # """
    task_documentation_example

DAG Documentation in Tree View

DAG Documentation in Graph View

Task Documentation in Task Instance Detail

참고 자료
https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html

Airflow decorator

Udemy - The Complete Hands-on Introduction to Apache Airflow

profile
# data engineering

0개의 댓글