[Airflow] custom operator(DAG run time check)

v K_Y v·2023년 11월 28일
0

AIRFLOW

목록 보기
4/5

컨셉

  • DAG 실행시간 모니터용 operator
  • 현재 DAG의 수행시간이 평균 DAG 수행시간보다 2배 이상 걸릴 경우 slack에 alert을 합니다.
    • 히스토리 DAG는 success 상태를 기준으로 합니다.

사용방법

  • CheckRunTimeOperator 를 DAG 파일에 import 후 가장 마지막 task로 지정합니다.
  • parameter
    • num_dags : 평균 실행시간 계산의 기준이 되는 DAG의 개수(최신순)

소스

  • CheckRunTimeOperator
from datetime import timedelta, datetime, timezone

import common.constant.Global
from airflow.models import BaseOperator
from airflow.models import DagRun

from airflow import settings



class CheckRunTimeOperator(BaseOperator):

    def __init__(
        self,
        num_dags: int = 3,
        *args, **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.num_dags = num_dags

    def execute(self, context):
        dag_id = context['dag_run'].dag_id
        num_dags = self.num_dags
        session = settings.Session()
        last_runs = session.query(DagRun).filter(
            DagRun.dag_id == dag_id,
            DagRun.state == 'success'
        ).order_by(DagRun.execution_date.desc()).limit(num_dags).all()

        now_run = session.query(DagRun).filter(
            DagRun.dag_id == dag_id,
            DagRun.state == 'running'
        ).order_by(DagRun.execution_date.desc()).limit(1).all()

        if len(last_runs) < num_dags:
            return

        start_date = datetime.fromisoformat(str(now_run[0].start_date))
        duration = (datetime.now(timezone.utc) - start_date).seconds

        total_run_time = sum([run.end_date - run.start_date for run in last_runs], timedelta(0))
        avg_run_time = (total_run_time / num_dags).seconds

        if duration > avg_run_time * 2:
            context['avg_run_time'] = f'{avg_run_time // 60}m {avg_run_time % 60}s'
            context['duration'] = f'{duration // 60}m {duration % 60}s'
            # 슬랙 alert
  • TEST DAG
import os
from datetime import datetime

import pendulum
from airflow.decorators import dag
from airflow.operators.python import PythonOperator
from custom.dag.check_run_time import CheckRunTimeOperator

local_tz = pendulum.timezone('Asia/Seoul')

home_path = os.path.expanduser('~')

args = {
    'owner': 'owner',
    'depends_on_past': False,
    'start_date': datetime(2023, 11, 7, tzinfo=local_tz)
}


@dag(
    default_args=args,
    description='DAG의 실행시간 모니터 예시',
    schedule_interval='0 1 * * *',
    catchup=False,
    doc_md="""
        # **example_check_run_time**
        DAG의 실행시간 모니터 operator의 사용 예시입니다.
        * CheckRunTimeOperator
            * 최근 DAG(num_dags수) 평균 실행시간 보다 현재 DAG의 실행시간이 2배 이상 소요될 경우, slack에 warning alert을 합니다.
            * parameter
                * `task_id`(str) : task id 
                * `num_dags`(int) : 평균 실행시간을 계산할 최근 DAG의 개수(defult : 3)
    """
)
def example_check_run_time():

    def test_func():
        print('test func')

    test_operator = PythonOperator(
        task_id='test_operator',
        python_callable=test_func
    )

    check_time_operator = CheckRunTimeOperator(
        task_id='check_time_operator',
        num_dags=4
    )

    test_operator >> check_time_operator


dag = example_check_run_time()
profile
📌 기억하기 위해 남기는 기록들

0개의 댓글