사용계기
- 에어플로우 덱이 주기적으로 실행할 경우 실패하거나 성공한 내역을 쉽게 확인하기 위해 다음 설정을 한다.
- 성공 내역과 실패 내역을 알림으로 확인할 수 있다.
1. slack 웹에서 설정하기
https://api.slack.com/
slack api 사용 설정
- 일단 사용 하기위한 채널을 만들고 api 사이트로 들어간다.
- get started를 누르고 create an App을 눌러준다.
- app을 추가해준다.
- From scratch
- 원하는 채널을 추가해준다.

app_mentions:read
View messages that directly mention moochu in conversations that the app is in
chat:write
Send messages as moochu
channels:join
Join public channels in a workspace
im:write
Start direct messages with people
incoming-webhook
Post messages to specific channels in Slack
소켓모드 설정

- OAuth 토큰을 복사한다. (에어플로우에 넣어야함)
2. slack과 airflow 설정
- slack에서 위에서 설정해놓은곳에 앱을 추가한다.
slack_sdk 패키지를 설치
- slack_sdk 패키지의 WebClient를 이용하기때문에 해당 패키지를 설치해줘야한다.
- docker-compose 로 실행했기때문에 requirements에 추가한다.
slack_notifications.py 파일 설정
from slack_sdk import WebClient
from datetime import datetime
class SlackAlert:
def __init__(self, channel, token):
self.channel = channel
self.client = WebClient(token=token)
def success_msg(self, msg):
text = f"""
date : {datetime.today().strftime('%Y-%m-%d')}
alert :
Success!
task id : {msg.get('task_instance').task_id},
dag id : {msg.get('task_instance').dag_id},
log url : {msg.get('task_instance').log_url}
"""
self.client.chat_postMessage(channel=self.channel, text=text)
def fail_msg(self, msg):
text = f"""
date : {datetime.today().strftime('%Y-%m-%d')}
alert :
Fail!
task id : {msg.get('task_instance').task_id},
dag id : {msg.get('task_instance').dag_id},
log url : {msg.get('task_instance').log_url}
"""
self.client.chat_postMessage(channel=self.channel, text=text)
3. 기존 덱파일에 추가하기
- 기존 파일에 slack_notifications 에 정의해놓은 SlackAlert 를 임포트한다.
- 그리고 airflow에서 미리 등록해놓은 api 토큰을 불러오기 위하여 Variable도 임포트한다.
- 이후 기존 덱에 추가를 해준다.

from slack_notifications import SlackAlert
from airflow.models import Variable
slack_api_token = Variable.get('slack_api_token')
alert = SlackAlert('#선택한 채널명', slack_api_token)
작동확인

참고
https://velog.io/@jaekyu_lim/airflow-%EC%98%A4%EB%A5%98%EB%A9%94%EC%8B%9C%EC%A7%80-%EB%B3%B4%EB%82%B4%EA%B8%B0-slack