해당 문서는 Airflow 실패에 대한 모니터링을 위해 slack webhook을 이용해서 구축한 내용을 정리하기 위해 작성된 문서이다.
환경
- Ubuntu 20.04
- airflow 2.3.3
from datetime import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
with DAG(
dag_id='test_slack_notification',
start_date=datetime(2022, 9, 3),
catchup=False,
schedule_interval="@daily",
tags=['slack_notification'],
) as dag:
run_this_first = EmptyOperator(
task_id='run_this_first',
)
success_job = BashOperator(
task_id='success_job',
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
) # 반드시 성공하는 job
failed_job = BashOperator(
task_id='failed_job',
bash_command="exit (1)"
) # 반드시 실패하는 job
run_this_first >> success_job >> failed_job
물론 App을 이용해서 하면 더 다양하게 진행할 수 있겠지만 간단하게 제작해서 모니터링 하기에는 Webhook도 나쁘지 않아보인다.
https://hooks.slack.com/services/T*********Z/B*********N/8*********************k
{"text" : "hello world! slack notification :thanks:"}
아래 정보를 다운받으면 된다.
pip install apache-airflow-providers-slack # 작성일 기준 버전 5.1.0
기본 적으로 아래와 같이 2가지가 있는데, 이 중 1번 방법으로 진행하고자 한다.
(이유는 2번에 대해서는 진행을 해보았고, 보편적으로 1번을 사용하는 것 같아서..)
Admin > Connection 에서 아래와 같이 새로운 거 생성
뭔가 Optional 하게 값을 추가 하고 싶었고, 아래와 같이 Parameter를 받게 해놓음
# slack_notification.py
from datetime import tzinfo
import pytz
from typing import List
from airflow.models.dag import DagContext
from airflow.contrib.hooks.slack_webhook_hook import SlackWebhookHook
KST = pytz.timezone('Asia/Seoul')
def on_failure(
context: DagContext,
mention_user_list: List[str],
channel: str = 'airflow-monitoring'
) -> None:
"""
mention_user_list is None then mention here else mention list of user
"""
print(context.keys())
task_id = context.get('task_instance').task_id
dag_id = context.get('task_instance').dag_id
excution_date = context.get('execution_date')
excution_date_kst = excution_date.replace(tzinfo=KST) # KST (이 부분이 안먹음)
log_url = context.get('task_instance').log_url # for link log
mention_user = ','.join(map(lambda x : f'<@{x}>', mention_user_list)) \
if mention_user_list is not None else '<!here>'
message = f"""
:red_circle: Task Failed
- task_id : {task_id}
- dag_id : {dag_id}
- execution_date : {excution_date} (KST : {excution_date_kst})
- log_url : <{log_url}|link>
{mention_user}
"""
alert = SlackWebhookHook(
http_conn_id="slack_connection",
channel=channel,
username='airflow-failure-alert',
message = message
)
return alert.execute()
from datetime import datetime
from functools import partial
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from slack_notification import on_failure
# mention_user_list or channel
on_failure_args = {
"mention_user_list" : None,
}
default_args = {
"owner" : "2h-kim",
"on_failure_callback" : partial(on_failure, **on_failure_args)
}
with DAG(
dag_id='test_slack_notification',
start_date=datetime(2021, 1, 1),
catchup=False,
schedule_interval="@daily",
default_args=default_args,
tags=['slack_notification'],
) as dag:
run_this_first = EmptyOperator(
task_id='run_this_first',
)
success_job = BashOperator(
task_id='success_job',
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
)
failed_job = BashOperator(
task_id='failed_job',
bash_command="exit (1)"
)
run_this_first >> success_job >> failed_job
Block을 이용하면 좀 예쁘게 정리할 수 있다. 하지만 아직 버튼에 대한 이벤트 등을 적용하는 방법을 모르기도 하고... ROI도 안나올 것 같아서 패스..