컨셉
- DAG 실행시간 모니터용 operator
- 현재 DAG의 수행시간이 평균 DAG 수행시간보다 2배 이상 걸릴 경우 slack에 alert을 합니다.
- 히스토리 DAG는 success 상태를 기준으로 합니다.
사용방법
CheckRunTimeOperator
를 DAG 파일에 import 후 가장 마지막 task로 지정합니다.
- parameter
num_dags
: 평균 실행시간 계산의 기준이 되는 DAG의 개수(최신순)
소스
from datetime import timedelta, datetime, timezone
import common.constant.Global
from airflow.models import BaseOperator
from airflow.models import DagRun
from airflow import settings
class CheckRunTimeOperator(BaseOperator):
def __init__(
self,
num_dags: int = 3,
*args, **kwargs
):
super().__init__(*args, **kwargs)
self.num_dags = num_dags
def execute(self, context):
dag_id = context['dag_run'].dag_id
num_dags = self.num_dags
session = settings.Session()
last_runs = session.query(DagRun).filter(
DagRun.dag_id == dag_id,
DagRun.state == 'success'
).order_by(DagRun.execution_date.desc()).limit(num_dags).all()
now_run = session.query(DagRun).filter(
DagRun.dag_id == dag_id,
DagRun.state == 'running'
).order_by(DagRun.execution_date.desc()).limit(1).all()
if len(last_runs) < num_dags:
return
start_date = datetime.fromisoformat(str(now_run[0].start_date))
duration = (datetime.now(timezone.utc) - start_date).seconds
total_run_time = sum([run.end_date - run.start_date for run in last_runs], timedelta(0))
avg_run_time = (total_run_time / num_dags).seconds
if duration > avg_run_time * 2:
context['avg_run_time'] = f'{avg_run_time // 60}m {avg_run_time % 60}s'
context['duration'] = f'{duration // 60}m {duration % 60}s'
import os
from datetime import datetime
import pendulum
from airflow.decorators import dag
from airflow.operators.python import PythonOperator
from custom.dag.check_run_time import CheckRunTimeOperator
local_tz = pendulum.timezone('Asia/Seoul')
home_path = os.path.expanduser('~')
args = {
'owner': 'owner',
'depends_on_past': False,
'start_date': datetime(2023, 11, 7, tzinfo=local_tz)
}
@dag(
default_args=args,
description='DAG의 실행시간 모니터 예시',
schedule_interval='0 1 * * *',
catchup=False,
doc_md="""
# **example_check_run_time**
DAG의 실행시간 모니터 operator의 사용 예시입니다.
* CheckRunTimeOperator
* 최근 DAG(num_dags수) 평균 실행시간 보다 현재 DAG의 실행시간이 2배 이상 소요될 경우, slack에 warning alert을 합니다.
* parameter
* `task_id`(str) : task id
* `num_dags`(int) : 평균 실행시간을 계산할 최근 DAG의 개수(defult : 3)
"""
)
def example_check_run_time():
def test_func():
print('test func')
test_operator = PythonOperator(
task_id='test_operator',
python_callable=test_func
)
check_time_operator = CheckRunTimeOperator(
task_id='check_time_operator',
num_dags=4
)
test_operator >> check_time_operator
dag = example_check_run_time()