airflow dag에서 오류가 발생했을 때 내가 직접 airflow ui에 들어가서 확인하지 않으면 실시간으로 오류의 발생을 확인할 수 없다. 실시간으로 DAG의 오류를 추적하기 위해 slack으로 알림을 주는 과정에 대해서 정리한다.
[ 대략적인 개요 ]
1. Slack에서 사용할 workspace에 실시간으로 오류를 보여줄 App을 만든다.
2. Slack과 Airflow를 연동할 수 있는 함수를 만든다.
3. DAG에 default_args
항목에 {'on_failure_callback':연동함수}
를 추가한다.
이 과정을 위해서 내가 원하는 workspace와 channel을 만들어둔다.
Create an App
을 클릭한다.From scratch
를 선택한다.Create App
을 눌러 App을 생성한다.Incoming Webhooks
선택하기Active Incoming Webhooks
의 off를 클릭해서 on으로 변경한다. 그러면 아래에 창이 생긴다.Sample curl request
는 이런 방식으로 내 슬랙에 요청을 보내면 된다고 하는 예시이다. YOUR_WEBHOOK_URL HERE
부분에 내 슬랙 채널의 webhook url을 넣으면 되는데 지금은 아직 생성이 되지 않았기 때문에 Add New Webhook to Workspace
버튼을 클릭해 생성한다.허용
을 클릭한다.YOUR_WEBHOOK_URL HERE
위치(빨간색 박스)에 내 webhook url 이 생성되어 있는 것을 볼 수 있다.Sample curl request
를 커맨드창에서 실행하여 제대로 slack에서 작동하는지 확인한다.내가 넣어줄 on_failure_callback
는 에러가 나면 부여받은 함수에 error 내용을 자동으로 넣어서 넘겨준다. 그렇기 때문에 연동 함수에는 넘겨받은 error를 분해해서 사용하기만 하면 된다.
def on_failure_callback(context):
# 1번. text 정리
text = str(context['task_instance'])
text += "```" + str(context.get('exception')) +"```"
send_message_to_a_slack_channel(text, ":scream:")
# 2번. slack으로 보내기
url = "https://hooks.slack.com/services/"+Variable.get("slack_url")
headers = {
'content-type': 'application/json',
}
data = { "username": "Danee", "text": message, "icon_emoji": emoji }
r = requests.post(url, json=data, headers=headers)
return r
DAG 를 생성할 때 default_args
인자를 주고 그 안에 on_failure_callback
인자를 추가한다.
에러가 나면 아래의 캡처처럼 알림이 온다.
너무...너무 신기하다!!!! on_failure_callback
의 인자로 받는 context를 뜯어보고 싶다. 그 안에 더 알찬 내용이 있는지 확인하고 나만의 text로 변경해보고 싶다!