baseoperator 의 on_failure_callback 이용
~callback : 특정 조건이 되었을때 만들었던 함수를 실행시켜라
Task 실패시 알람을 받기 위한 KakaoTalk 연동
앱설정
1) 카카오 Developer 로그인
카카오 Developer 링크
2) 앱만들기
회원가입 -> 내 애플리케이션 -> 애플리케이션 추가하기
3) 카카오 로그인 활성화
4) Redirect URL 등록
5) 동의항목 설정
인가코드 받기
https://kauth.kakao.com/oauth/authorize?response_type=code&client_id={client_id}&redirect_uri={redirect_uri}&response_type=code
작성후 브라우저 열기
로그인이 필요하면 로그인하고 동의하기
이 화면이 나오면 위쪽 url에 oauth?code= ~~~ 하는 부분이 나옴 그게 인가코드
토큰 받기
토큰받는 코드 작성
import requests
client_id = '{client_id}'
redirect_uri = 'https://example.com/oauth'
authorize_code = '{authorize_code}'
token_url = 'https://kauth.kakao.com/oauth/token'
data = {
'grant_type': 'authorization_code',
'client_id': client_id,
'redirect_uri': redirect_uri,
'code': authorize_code,
}
response = requests.post(token_url, data=data)
tokens = response.json()
print(tokens)
실행시 access_token 이 발급됨(dict 구조)
-> 이를 어딘가에 복사해 놓기
값 저장하기
airflow variable 에 저장
소스코드 작성
import pendulum
import os
import json
import requests
from airflow.models import Variable
REDIRECT_URL = 'https://example.com/oauth'
def _refresh_token_to_variable():
client_id = Variable.get("kakao_client_secret")
tokens = eval(Variable.get("kakao_tokens"))
refresh_token = tokens.get('refresh_token')
url = "https://kauth.kakao.com/oauth/token"
data = {
"grant_type": "refresh_token",
"client_id": f"{client_id}",
"refresh_token": f"{refresh_token}"
}
response = requests.post(url, data=data)
rslt = response.json()
new_access_token = rslt.get('access_token')
# Refresh 토큰 만료기간이 30일 미만이면 refresh_token 값이 포함되어 리턴됨.
new_refresh_token = rslt.get('refresh_token')
if new_access_token:
tokens['access_token'] = new_access_token
if new_refresh_token:
tokens['refresh_token'] = new_refresh_token
now = pendulum.now('Asia/Seoul').strftime('%Y-%m-%d %H:%M:%S')
tokens['updated'] = now
# airflow variables 업데이트 해주기
os.system(f'airflow variables set kakao_tokens "{tokens}"')
print('variable 업데이트 완료(key: kakao_tokens)')
def send_kakao_msg(talk_title: str, content: dict):
'''
content:{'tltle1':'content1', 'title2':'content2'...}
'''
try_cnt = 0
while True:
# get Access 토큰
tokens = eval(Variable.get("kakao_tokens"))
access_token = tokens.get('access_token')
content_lst = []
button_lst = []
for title, msg in content.items():
content_lst.append({
'title': f'{title}',
'description': f'{msg}',
'image_url': '',
'image_width': 40,
'image_height': 40,
'link': {
'web_url': '',
'mobile_web_url': ''
}
})
button_lst.append({
'title': '',
'link': {
'web_url': '',
'mobile_web_url': ''
}
})
list_data = {
'object_type': 'list',
'header_title': f'{talk_title}',
'header_link': {
'web_url': '',
'mobile_web_url': '',
'android_execution_params': 'main',
'ios_execution_params': 'main'
},
'contents': content_lst,
'buttons': button_lst
}
send_url = "https://kapi.kakao.com/v2/api/talk/memo/default/send"
headers = {
"Authorization": f'Bearer {access_token}'
}
data = {'template_object': json.dumps(list_data)}
response = requests.post(send_url, headers=headers, data=data)
print(f'try횟수: {try_cnt}, reponse 상태:{response.status_code}')
try_cnt += 1
if response.status_code == 200: # 200: 정상
return response.status_code
# 400: Bad Request (잘못 요청시), 무조건 break 하도록 return
elif response.status_code == 400:
return response.status_code
# 401: Unauthorized (토큰 만료 등)
elif response.status_code == 401 and try_cnt <= 2:
_refresh_token_to_variable()
elif response.status_code != 200 and try_cnt >= 3: # 400, 401 에러가 아닐 경우 3회 시도때 종료
return response.status_code
from apis.kakao_api import send_kakao_msg
def on_failure_callback_to_kakao(context):
exception = context.get('exception') or 'exception 없음'
ti = context.get('ti')
dag_id = ti.dag_id
task_id = ti.task_id
data_interval_end = context.get(
'data_interval_end').in_timezone('Asia/Seoul')
content = {f'{dag_id}.{task_id}': f'에러내용: {
exception}', '': ''} # Content 길이는 2 이상
send_kakao_msg(talk_title=f'task 실패 알람({data_interval_end})',
content=content)
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import timedelta
import pendulum
from callbacks.on_failure_callback_to_kakao import on_failure_callback_to_kakao
with DAG(
dag_id='dags_on_failure_callback_to_kakao',
start_date=pendulum.datetime(2024, 6, 17, tz='Asia/Seoul'),
schedule='*/20 * * * *',
catchup=False,
default_args={
'on_failure_callback': on_failure_callback_to_kakao,
'execution_timeout': timedelta(seconds=60)
}
) as dag:
task_slp_90 = BashOperator(
task_id='task_slp_90',
bash_command='sleep 90',
)
task_ext_1 = BashOperator(
trigger_rule='all_done',
task_id='task_ext_1',
bash_command='exit 1'
)
task_slp_90 >> task_ext_1
결과