slack app 생성
1) 슬랙 로그인
2) 슬랙 워크스페이스 생성
3) 슬랙 앱 만들기 슬랙 앱 url
4) 설정에서 incoming Webhooks 를 on으로 변경
5) 맨아래의 Add New Webhook to Workspace 를 누르면 url 생성됨 + 엑세스 권한 설정
등록대상 | 값 |
---|---|
Connection Id | conn_slack_airflow_bot |
Connection Type | Slack Incomming Webhook |
Slack Webhook Endpoint | https://hooks.slack.com/services |
Webhook Token | 5번에서 저장한 값 (T 뒤 모두) |
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
def on_failure_callback_to_slack(context):
ti = context.get('ti')
dag_id = ti.dag_id
task_id = ti.task_id
err_msg = context.get('exception')
batch_date = context.get('data_interval_end').in_timezone('Asia/Seoul')
slack_hook = SlackWebhookHook(
slack_webhook_conn_id='conn_slack_airflow_bot')
text = "실패 알람"
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{dag_id}.{task_id} 실패 알람*"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*배치 시간*: {batch_date}"
},
{
"type": "mrkdwn",
"text": f"*에러 내용*: {err_msg}"
}
]
}
]
slack_hook.send(text=text, blocks=blocks)
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import timedelta
import pendulum
from callbacks.on_failure_callback_to_slack import on_failure_callback_to_slack
with DAG(
dag_id='dags_on_failure_callback_to_slack',
start_date=pendulum.datetime(2024, 6, 17, tz='Asia/Seoul'),
schedule='0 * * * *',
catchup=False,
default_args={
'on_failure_callback': on_failure_callback_to_slack,
'execution_timeout': timedelta(seconds=60)
}
) as dag:
task_slp_90 = BashOperator(
task_id='task_slp_90',
bash_command='sleep 90',
)
task_ext_1 = BashOperator(
trigger_rule='all_done',
task_id='task_ext_1',
bash_command='exit 1'
)
task_slp_90 >> task_ext_1