Slack Webhook을 이용한 Airflow 모니터링 테스트

Log·2022년 9월 3일
2

문서 목적

해당 문서는 Airflow 실패에 대한 모니터링을 위해 slack webhook을 이용해서 구축한 내용을 정리하기 위해 작성된 문서이다.

문서 내용

환경
- Ubuntu 20.04
- airflow 2.3.3

AS-IS

Dag Script

from datetime import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator

with DAG(
    dag_id='test_slack_notification',
    start_date=datetime(2022, 9, 3),
    catchup=False,
    schedule_interval="@daily",
    tags=['slack_notification'],
) as dag:
    run_this_first = EmptyOperator(
        task_id='run_this_first',
    )

    success_job = BashOperator(
        task_id='success_job',
        bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
    ) # 반드시 성공하는 job

    failed_job = BashOperator(
        task_id='failed_job',
        bash_command="exit (1)"
    ) # 반드시 실패하는 job

    run_this_first >> success_job >> failed_job

문제점

  • 실패했는지 직접 모니터링 하는 것이 아니면 발견하기 어렵다!
  • 성공된 Job에 대해서의 별도의 모니터링의 경우 이번엔 pass...

TO-BE

Slack Webhook 생성

물론 App을 이용해서 하면 더 다양하게 진행할 수 있겠지만 간단하게 제작해서 모니터링 하기에는 Webhook도 나쁘지 않아보인다.

  1. slack 설정 및 관리 > 앱 관리로 접속 > 사용자 지정 통합 앱 > 수신 웹 후크 > slack에 추가
    * 없는 경우가 있을 수도 있는데, 이 경우는 앱 설치 진행
  2. 원하는 설정에 따라 웹후크 생성! 생성이 완료되면 아래와 같은 형식의 URL이 생성 된다.
    https://hooks.slack.com/services/T*********Z/B*********N/8*********************k
  3. 잘 생성이 되었는지 테스트를 위해 Post-man 접속!
  4. 아래와 같은 조건으로 전송했을 때 잘 수신되면 우선 Webhook 생성 종료
    • header
      - Content-Type : application/json
    • Body
      {"text" : "hello world! slack notification :thanks:"}

Airflow에서 Webhook 전송을 위한 라이브러리 설치

아래 정보를 다운받으면 된다.

pip install apache-airflow-providers-slack # 작성일 기준 버전 5.1.0

Airflow에 Webhook 연결 방법

기본 적으로 아래와 같이 2가지가 있는데, 이 중 1번 방법으로 진행하고자 한다.
(이유는 2번에 대해서는 진행을 해보았고, 보편적으로 1번을 사용하는 것 같아서..)

  • Connection 생성하는 방식
  • Slack을 보내는 함수에서 외부 conf(환경 변수, conf_base 등)로 부터 가져오는 방식

Connection 생성

Admin > Connection 에서 아래와 같이 새로운 거 생성

On failure 함수 생성

뭔가 Optional 하게 값을 추가 하고 싶었고, 아래와 같이 Parameter를 받게 해놓음

Script File

# slack_notification.py
from datetime import tzinfo
import pytz
from typing import List
from airflow.models.dag import DagContext
from airflow.contrib.hooks.slack_webhook_hook import SlackWebhookHook

KST = pytz.timezone('Asia/Seoul')

def on_failure(
    context: DagContext, 
    mention_user_list: List[str],
    channel: str = 'airflow-monitoring'
) -> None:
    """
    mention_user_list is None then mention here else mention list of user
    """
    print(context.keys())
    task_id = context.get('task_instance').task_id 
    dag_id = context.get('task_instance').dag_id
    excution_date = context.get('execution_date')
    excution_date_kst = excution_date.replace(tzinfo=KST) # KST (이 부분이 안먹음)
    log_url = context.get('task_instance').log_url # for link log
    mention_user = ','.join(map(lambda x : f'<@{x}>', mention_user_list)) \
        if mention_user_list is not None else '<!here>'
    message = f"""
    :red_circle: Task Failed
        - task_id : {task_id}
        - dag_id : {dag_id}
        - execution_date : {excution_date} (KST : {excution_date_kst})
        - log_url : <{log_url}|link> 
        {mention_user}       
    """
    alert = SlackWebhookHook(
        http_conn_id="slack_connection",
        channel=channel,
        username='airflow-failure-alert',
        message = message
    )
    return alert.execute()

Dag Script

from datetime import datetime
from functools import partial

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator

from slack_notification import on_failure

# mention_user_list or channel
on_failure_args = {
    "mention_user_list" : None,
}

default_args = {
    "owner" : "2h-kim",
    "on_failure_callback" : partial(on_failure, **on_failure_args)
}

with DAG(
    dag_id='test_slack_notification',
    start_date=datetime(2021, 1, 1),
    catchup=False,
    schedule_interval="@daily",
    default_args=default_args,
    tags=['slack_notification'],
) as dag:
    run_this_first = EmptyOperator(
        task_id='run_this_first',
    )

    success_job = BashOperator(
        task_id='success_job',
        bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
    )

    failed_job = BashOperator(
        task_id='failed_job',
        bash_command="exit (1)"
    )

    run_this_first >> success_job >> failed_job  

결과

  • kst 부분 변환이 안된다.. 이 부분 좀 서치해봐야 할 듯

추가 정보

Block을 이용하면 좀 예쁘게 정리할 수 있다. 하지만 아직 버튼에 대한 이벤트 등을 적용하는 방법을 모르기도 하고... ROI도 안나올 것 같아서 패스..

참조 문서

profile
열심히 정리하는 습관 기르기..

0개의 댓글