airflow slack 연동

yoon__0_0·2024년 6월 19일
0

이어드림 수업

목록 보기
76/103

slack app 생성

slack 사전작업

1) 슬랙 로그인
2) 슬랙 워크스페이스 생성
3) 슬랙 앱 만들기 슬랙 앱 url

  • from scratch 로 만들기

4) 설정에서 incoming Webhooks 를 on으로 변경
5) 맨아래의 Add New Webhook to Workspace 를 누르면 url 생성됨 + 엑세스 권한 설정

connection 등록

  • slack은 기본적으로 provider를 제공하고 그러면, hook도 제공해주기 때문에 더 편리하게 사용할 수 있음.
  • airflow connection에 들어가서 설정해주기
등록대상
Connection Idconn_slack_airflow_bot
Connection TypeSlack Incomming Webhook
Slack Webhook Endpointhttps://hooks.slack.com/services
Webhook Token5번에서 저장한 값 (T 뒤 모두)

연동 코드 작성

  • plugins/callbacks/on_failure_callback_to_slack.py
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook


def on_failure_callback_to_slack(context):
    ti = context.get('ti')
    dag_id = ti.dag_id
    task_id = ti.task_id
    err_msg = context.get('exception')
    batch_date = context.get('data_interval_end').in_timezone('Asia/Seoul')

    slack_hook = SlackWebhookHook(
        slack_webhook_conn_id='conn_slack_airflow_bot')
    text = "실패 알람"
    blocks = [
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*{dag_id}.{task_id} 실패 알람*"
            }
        },
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": f"*배치 시간*: {batch_date}"
                },
                {
                    "type": "mrkdwn",
                    "text": f"*에러 내용*: {err_msg}"
                }
            ]
        }
    ]

    slack_hook.send(text=text, blocks=blocks)
  • dags_on_failure_callback_to_slack.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import timedelta
import pendulum
from callbacks.on_failure_callback_to_slack import on_failure_callback_to_slack


with DAG(
    dag_id='dags_on_failure_callback_to_slack',
    start_date=pendulum.datetime(2024, 6, 17, tz='Asia/Seoul'),
    schedule='0 * * * *',
    catchup=False,
    default_args={
        'on_failure_callback': on_failure_callback_to_slack,
        'execution_timeout': timedelta(seconds=60)
    }

) as dag:
    task_slp_90 = BashOperator(
        task_id='task_slp_90',
        bash_command='sleep 90',
    )

    task_ext_1 = BashOperator(
        trigger_rule='all_done',
        task_id='task_ext_1',
        bash_command='exit 1'
    )

    task_slp_90 >> task_ext_1

실행 결과

profile
신윤재입니다

0개의 댓글