[7월프로젝트] airflow 알림 메시지 보내기 - slack

임재규·2023년 8월 7일
1

프로젝트

목록 보기
5/11

slack_sdk를 활용하여 airflow 알리미 만들기

목표 : 프로젝트 중 무기 데이터의 집계를 하기 위한 데이터를 주기적으로 받아 GCP 스토리지에 저장해야 하는 작업이 필요했다. Raw Data를 받아와 우리가 사용할 데이터로 전처리하고 파싱하는 dag를 짰고, 이것이 잘 수행 중인지에 대해 slack bot을 통해 알림을 받고자 한다.

봇 만들기

  1. https://api.slack.com/ 사이트에서 로그인 후 Your apps

  1. Create New App 버튼 클릭 후 첫번째 클릭 (From scratch)

  1. 앱이름을 설정 한 후 본인이 사용할 워크스페이스를 선택

  1. 왼쪽 화면에서 권한설정하는 곳으로 가서 Scopes에서 권한을 부여한다.
  1. 소켓모드 활성화

    Slack API 소켓모드란?
    실시간 메시징 및 이벤트 기능을 구현하는 데 사용되는 기술로, 기본적으로 Slack API의 웹 소켓 연결을 통해 실기산으로 메시지 및 이벤트를 주고 받을 수 있게 해주며 이를 통해 애플리케이션은 더 빠르게 사용자와 상호작용하고, 이벤트를 감지하며, 실시간 업데이트를 제공
    소켓 모드를 사용하면 회사 방화벽 뒤에서 허용되지 않을 수 있는 공용 HTTP 끝점을 노출하지 않고 봇이 작업 공간에서 상호 작용이 가능

  1. 워크스페이스에 앱 설치
    처음 만들고 나면 Install to Workspace에 눌러 자신이 원하는 워크스페이스에 등록하면 된다.

    권한 수정과 같이 수정하였을 때는 Reinstall을 이용하여 수정 해주면 된다.

  2. 채널에 봇 추가하기

@봇이름 을 하여 채널에 참여 시킨다.

Airflow 세팅

airflow를 켠 후 admin - Variavles 설정

key값에 본인이 쓸 token 변수명을 적고, val에 토큰값을 입력 (xoxb로 시작하는 토큰값)

slack_nofitications.py 작성

여러 개의 dag에 쉽게 적용 시키기 위해서 slack_nofitications.py 작성

from slack_sdk import WebClient
from datetime import datetime

class SlackAlert:
	# 클래스 인스턴스 초기화
	# 채널정보와 slack 인증 토큰을 인자로 받음
    def __init__(self, channel, token):
        self.channel = channel
        self.client = WebClient(token=token)

    def success_msg(self, msg):
    	# 성공메시지를 작성하고 일자와 task id, dag id, log url을 슬랙 메세지로 출력
        text = f"""
            date : {datetime.today().strftime('%Y-%m-%d')}
            alert : 
                Success! 
                    task id : {msg.get('task_instance').task_id}, 
                    dag id : {msg.get('task_instance').dag_id}, 
                    log url : {msg.get('task_instance').log_url}
            """
        self.client.chat_postMessage(channel=self.channel, text=text)

    def fail_msg(self, msg):
    	# 실패메시지를 작성하고 일자와 task id, dag id, log url을 슬랙 메세지로 출력
        text = f"""
            date : {datetime.today().strftime('%Y-%m-%d')}  
            alert : 
                Fail! 
                    task id : {msg.get('task_instance').task_id}, 
                    dag id : {msg.get('task_instance').dag_id}, 
                    log url : {msg.get('task_instance').log_url}
        """
        self.client.chat_postMessage(channel=self.channel, text=text)

내 dag에 적용하기

앞서 slack_notifications.py를 이용하여 SlackAlert 함수 사용
토큰 값에 노출을 막기 위해 Variable 지정

Variable.get을 통해 지정해두었던 slack_api_token을 불러옴
SlackAlert에 첫째 인자는 메시지를 보낼 채널명을 입력하고, 두번째 인자에는 slack token을 입력

dag에 on_success_callback과 on_failure_callback에
각각 slack_notifications.py에서 만든 함수인 sucess_msg, fail_msg를 넣어준다.

성공화면

message 채널에 notice-bot이 성공 여부를 알리는 메시지를 보내준다.

Raw data를 분석하기 위해 데이터 파싱 및 전처리를 한 후 파케이 파일로 logs_weapon에 넣는 것이 목적
logs_weapon파일에 가보니 잘 들어온 것을 확인할 수 있었음
gcp 스토리지에도 파케이 파일로 잘 들어온 모습

파케이 파일을 꺼내서 본 모습

참고 blog
https://www.twilio.com/blog/how-to-build-a-slackbot-in-socket-mode-with-python

profile
공부 기록

1개의 댓글

comment-user-thumbnail
2023년 8월 7일

유익한 자료 감사합니다.

답글 달기