airflow slack 연동하기

박단이·2024년 1월 3일
0

Airflow

목록 보기
1/2

airflow dag에서 오류가 발생했을 때 내가 직접 airflow ui에 들어가서 확인하지 않으면 실시간으로 오류의 발생을 확인할 수 없다. 실시간으로 DAG의 오류를 추적하기 위해 slack으로 알림을 주는 과정에 대해서 정리한다.

[ 대략적인 개요 ]
1. Slack에서 사용할 workspace에 실시간으로 오류를 보여줄 App을 만든다.
2. Slack과 Airflow를 연동할 수 있는 함수를 만든다.
3. DAG에 default_args 항목에 {'on_failure_callback':연동함수}를 추가한다.

1) Slack workspace에 App 만들기

이 과정을 위해서 내가 원하는 workspace와 channel을 만들어둔다.

  1. https://api.slack.com/apps 페이지로 이동한다.
  2. Create an App을 클릭한다.
    App 1
  3. 우리는 App을 처음부터 만들 것이기 때문에 From scratch를 선택한다.
    App 2
  4. 원하는 App의 이름을 작성하고 사용할 workspace를 선택한다. Create App을 눌러 App을 생성한다.
    App 3
  5. Incoming Webhooks 선택하기
    App 4
  6. Active Incoming Webhooks의 off를 클릭해서 on으로 변경한다. 그러면 아래에 창이 생긴다.
    App 5
  7. Sample curl request는 이런 방식으로 내 슬랙에 요청을 보내면 된다고 하는 예시이다. YOUR_WEBHOOK_URL HERE부분에 내 슬랙 채널의 webhook url을 넣으면 되는데 지금은 아직 생성이 되지 않았기 때문에 Add New Webhook to Workspace 버튼을 클릭해 생성한다.
    App 6
  8. 원하는 채널을 선택하고 허용을 클릭한다.
    App 7
  9. 다시 앞의 화면으로 돌아가서YOUR_WEBHOOK_URL HERE위치(빨간색 박스)에 내 webhook url 이 생성되어 있는 것을 볼 수 있다.
    App 8
  10. Sample curl request를 커맨드창에서 실행하여 제대로 slack에서 작동하는지 확인한다.
    App 9
  11. 빨간색 박스를 복사하여 airflow Variables에 추가한다.
    App 10

2) 연동 함수 만들기

내가 넣어줄 on_failure_callback는 에러가 나면 부여받은 함수에 error 내용을 자동으로 넣어서 넘겨준다. 그렇기 때문에 연동 함수에는 넘겨받은 error를 분해해서 사용하기만 하면 된다.

  1. 넘겨받은 error를 어떤 식으로 slack에서 보여줄 것인지 텍스트를 정리한다.
  2. 만들어진 텍스트를 slack에 보낸다.
def on_failure_callback(context):
	# 1번. text 정리
    text = str(context['task_instance'])
    text += "```" + str(context.get('exception')) +"```"
    send_message_to_a_slack_channel(text, ":scream:")
    
    # 2번. slack으로 보내기
    url = "https://hooks.slack.com/services/"+Variable.get("slack_url")
    headers = {
        'content-type': 'application/json',
    }
    data = { "username": "Danee", "text": message, "icon_emoji": emoji }
    r = requests.post(url, json=data, headers=headers)
    
    return r

3) DAG에 함수 사용하기

DAG 를 생성할 때 default_args인자를 주고 그 안에 on_failure_callback 인자를 추가한다.
dag 1

에러가 나면 아래의 캡처처럼 알림이 온다.
dag 2


너무...너무 신기하다!!!! on_failure_callback의 인자로 받는 context를 뜯어보고 싶다. 그 안에 더 알찬 내용이 있는지 확인하고 나만의 text로 변경해보고 싶다!

profile
데이터 엔지니어를 꿈꾸는 주니어 입니다!

0개의 댓글