airflow slack_sdk api 활용하여 알림설정

hyeok2·2023년 8월 11일
1

Airflow_stu

목록 보기
4/5
post-thumbnail

사용계기

  • 에어플로우 덱이 주기적으로 실행할 경우 실패하거나 성공한 내역을 쉽게 확인하기 위해 다음 설정을 한다.
  • 성공 내역과 실패 내역을 알림으로 확인할 수 있다.

1. slack 웹에서 설정하기

https://api.slack.com/

slack api 사용 설정

  • 일단 사용 하기위한 채널을 만들고 api 사이트로 들어간다.
  • get started를 누르고 create an App을 눌러준다.
  • app을 추가해준다.
  • From scratch
  • 원하는 채널을 추가해준다.

  • 인증과 권한 설정을 다음을 추가해준다.
app_mentions:read
View messages that directly mention moochu in conversations that the app is in

chat:write
Send messages as moochu

channels:join
Join public channels in a workspace

im:write
Start direct messages with people

incoming-webhook
Post messages to specific channels in Slack
  • 일반에 추가한다.

소켓모드 설정

  • OAuth 토큰을 복사한다. (에어플로우에 넣어야함)

2. slack과 airflow 설정

  • slack에서 위에서 설정해놓은곳에 앱을 추가한다.

slack_sdk 패키지를 설치

  • slack_sdk 패키지의 WebClient를 이용하기때문에 해당 패키지를 설치해줘야한다.
    - docker-compose 로 실행했기때문에 requirements에 추가한다.

slack_notifications.py 파일 설정

from slack_sdk import WebClient
from datetime import datetime

class SlackAlert:
	# 클래스 인스턴스 초기화
	# 채널정보와 slack 인증 토큰을 인자로 받음
    def __init__(self, channel, token):
        self.channel = channel
        self.client = WebClient(token=token)

    def success_msg(self, msg):
    	# 성공메시지를 작성하고 일자와 task id, dag id, log url을 슬랙 메세지로 출력
        text = f"""
            date : {datetime.today().strftime('%Y-%m-%d')}
            alert : 
                Success! 
                    task id : {msg.get('task_instance').task_id}, 
                    dag id : {msg.get('task_instance').dag_id}, 
                    log url : {msg.get('task_instance').log_url}
            """
        self.client.chat_postMessage(channel=self.channel, text=text)

    def fail_msg(self, msg):
    	# 실패메시지를 작성하고 일자와 task id, dag id, log url을 슬랙 메세지로 출력
        text = f"""
            date : {datetime.today().strftime('%Y-%m-%d')}  
            alert : 
                Fail! 
                    task id : {msg.get('task_instance').task_id}, 
                    dag id : {msg.get('task_instance').dag_id}, 
                    log url : {msg.get('task_instance').log_url}
        """
        self.client.chat_postMessage(channel=self.channel, text=text)   

3. 기존 덱파일에 추가하기

  • 기존 파일에 slack_notifications 에 정의해놓은 SlackAlert 를 임포트한다.
  • 그리고 airflow에서 미리 등록해놓은 api 토큰을 불러오기 위하여 Variable도 임포트한다.
  • 이후 기존 덱에 추가를 해준다.

from slack_notifications import SlackAlert
from airflow.models import Variable


slack_api_token = Variable.get('slack_api_token')
alert = SlackAlert('#선택한 채널명', slack_api_token)

작동확인

참고

https://velog.io/@jaekyu_lim/airflow-%EC%98%A4%EB%A5%98%EB%A9%94%EC%8B%9C%EC%A7%80-%EB%B3%B4%EB%82%B4%EA%B8%B0-slack

profile
땅을 파다보면 흙과 물을 보겠지만, 코드를 파다보면 답이 보일것이다.

0개의 댓글