airflow 카카오 연동

yoon__0_0·2024년 6월 19일
0

이어드림 수업

목록 보기
75/103

baseoperator 의 on_failure_callback 이용
~callback : 특정 조건이 되었을때 만들었던 함수를 실행시켜라

Task 실패시 알람을 받기 위한 KakaoTalk 연동

kakao 준비


앱설정

1) 카카오 Developer 로그인
카카오 Developer 링크

2) 앱만들기
회원가입 -> 내 애플리케이션 -> 애플리케이션 추가하기

3) 카카오 로그인 활성화

  • 앱이 카카오 로그인 할 수 있도록 하는 것

4) Redirect URL 등록

  • 다른 app 에서 인증정보를 요청시 카카오에서 토큰값을 주게 되는데, 그걸 어디로 받을지에 대한 등록
  • 우리의 app url 을 넣어주면 되는데, 우리는 따로 없으니까 https://example.com/oauth 이거 넣어주기

5) 동의항목 설정

  • 카카오톡 메시지 전송 -> 선택동의

인가코드 받기

https://kauth.kakao.com/oauth/authorize?response_type=code&client_id={client_id}&redirect_uri={redirect_uri}&response_type=code

작성후 브라우저 열기
로그인이 필요하면 로그인하고 동의하기


이 화면이 나오면 위쪽 url에 oauth?code= ~~~ 하는 부분이 나옴 그게 인가코드


토큰 받기

토큰받는 코드 작성


import requests

client_id = '{client_id}'
redirect_uri = 'https://example.com/oauth'
authorize_code = '{authorize_code}'


token_url = 'https://kauth.kakao.com/oauth/token'
data = {
    'grant_type': 'authorization_code',
    'client_id': client_id,
    'redirect_uri': redirect_uri,
    'code': authorize_code,
    }

response = requests.post(token_url, data=data)
tokens = response.json()
print(tokens)
  • client id : 앱키 -> restAPI 키 복사
  • authorize_code : 인가코드

실행시 access_token 이 발급됨(dict 구조)
-> 이를 어딘가에 복사해 놓기

  • 토큰은 생명주기가 6시간이며 계속 변경해줘야한다. 여기서 필요한게 refresh_token
  • refresh_token의 유효기간은 60일
  • refresh_token도 refresh_token으로 갱신 가능

값 저장하기

airflow variable 에 저장

  • key : kakao_client_secret / value : {kakao client id }
  • key : kakao_tokens / value : 아까 복사해놓은거 전체

airflow 연동

소스코드 작성

  • plugins/apis/kakao_api.py
import pendulum
import os
import json
import requests
from airflow.models import Variable

REDIRECT_URL = 'https://example.com/oauth'


def _refresh_token_to_variable():
    client_id = Variable.get("kakao_client_secret")
    tokens = eval(Variable.get("kakao_tokens"))
    refresh_token = tokens.get('refresh_token')
    url = "https://kauth.kakao.com/oauth/token"
    data = {
        "grant_type": "refresh_token",
        "client_id": f"{client_id}",
        "refresh_token": f"{refresh_token}"
    }
    response = requests.post(url, data=data)
    rslt = response.json()
    new_access_token = rslt.get('access_token')
    # Refresh 토큰 만료기간이 30일 미만이면 refresh_token 값이 포함되어 리턴됨.
    new_refresh_token = rslt.get('refresh_token')
    if new_access_token:
        tokens['access_token'] = new_access_token
    if new_refresh_token:
        tokens['refresh_token'] = new_refresh_token

    now = pendulum.now('Asia/Seoul').strftime('%Y-%m-%d %H:%M:%S')
    tokens['updated'] = now
    # airflow variables 업데이트 해주기
    os.system(f'airflow variables set kakao_tokens "{tokens}"')
    print('variable 업데이트 완료(key: kakao_tokens)')


def send_kakao_msg(talk_title: str, content: dict):
    '''
    content:{'tltle1':'content1', 'title2':'content2'...}
    '''

    try_cnt = 0
    while True:
        # get Access 토큰
        tokens = eval(Variable.get("kakao_tokens"))
        access_token = tokens.get('access_token')
        content_lst = []
        button_lst = []

        for title, msg in content.items():
            content_lst.append({
                'title': f'{title}',
                'description': f'{msg}',
                'image_url': '',
                'image_width': 40,
                'image_height': 40,
                'link': {
                    'web_url': '',
                    'mobile_web_url': ''
                }
            })
            button_lst.append({
                'title': '',
                'link': {
                    'web_url': '',
                    'mobile_web_url': ''
                }
            })

        list_data = {
            'object_type': 'list',
            'header_title': f'{talk_title}',
            'header_link': {
                'web_url': '',
                'mobile_web_url': '',
                'android_execution_params': 'main',
                'ios_execution_params': 'main'
            },
            'contents': content_lst,
            'buttons': button_lst
        }

        send_url = "https://kapi.kakao.com/v2/api/talk/memo/default/send"
        headers = {
            "Authorization": f'Bearer {access_token}'
        }
        data = {'template_object': json.dumps(list_data)}
        response = requests.post(send_url, headers=headers, data=data)
        print(f'try횟수: {try_cnt}, reponse 상태:{response.status_code}')
        try_cnt += 1

        if response.status_code == 200:         # 200: 정상
            return response.status_code
        # 400: Bad Request (잘못 요청시), 무조건 break 하도록 return
        elif response.status_code == 400:
            return response.status_code
        # 401: Unauthorized (토큰 만료 등)
        elif response.status_code == 401 and try_cnt <= 2:
            _refresh_token_to_variable()
        elif response.status_code != 200 and try_cnt >= 3:      # 400, 401 에러가 아닐 경우 3회 시도때 종료
            return response.status_code
  • plugins/callbacks/on_failure_callback_to_kakao.py
from apis.kakao_api import send_kakao_msg


def on_failure_callback_to_kakao(context):
    exception = context.get('exception') or 'exception 없음'
    ti = context.get('ti')
    dag_id = ti.dag_id
    task_id = ti.task_id
    data_interval_end = context.get(
        'data_interval_end').in_timezone('Asia/Seoul')

    content = {f'{dag_id}.{task_id}': f'에러내용: {
        exception}', '': ''}      # Content 길이는 2 이상
    send_kakao_msg(talk_title=f'task 실패 알람({data_interval_end})',
                   content=content)
  • dags/dags_on_failure_callback_to_kakao.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import timedelta
import pendulum
from callbacks.on_failure_callback_to_kakao import on_failure_callback_to_kakao


with DAG(
    dag_id='dags_on_failure_callback_to_kakao',
    start_date=pendulum.datetime(2024, 6, 17, tz='Asia/Seoul'),
    schedule='*/20 * * * *',
    catchup=False,
    default_args={
        'on_failure_callback': on_failure_callback_to_kakao,
        'execution_timeout': timedelta(seconds=60)
    }

) as dag:
    task_slp_90 = BashOperator(
        task_id='task_slp_90',
        bash_command='sleep 90',
    )

    task_ext_1 = BashOperator(
        trigger_rule='all_done',
        task_id='task_ext_1',
        bash_command='exit 1'
    )

    task_slp_90 >> task_ext_1

결과

profile
신윤재입니다

0개의 댓글