자동으로 원하는 시간에 돌아가도록 만든 DAG는 에러가 발생해도 직접 Airflow에 접속해서 일일이 확인하지 않는다면 에러가 발생했는지 알 수가 없다. 에러가 발생했을 때 실시간으로 알 수 있는 방법으로 Slack과 연동하는 방법이 있다.
전반적인 순서는 아래와 같다.
default_args
인자에 {'on_failure_callback':slack연동함수}
를 설정한다.자세한 내용 : https://velog.io/@qkr471/airflow-slack-연동하기
Airflow는 전적으로 데이터 엔지니어의 영역이다. 하지만 Data는 비개발자와 개발자 모두 보기 편해야하고 데이터를 실시간으로 처리 되길 원하는 것도 모두다 원한다. 비개발자, 개발자 모두 편한 구글 스프레드시트의 데이터를 airflow와 연동하여 바로 DB에 적재하고 DB에 있는 내용을 바로 구글 스프레드시트로 뿌려준다면 사용자의 니즈를 충족시킬 수 있을 것이다.
전반적인 순서는 아래와 같다.
자세한 내용 : https://velog.io/@qkr471/Airflow-구글-스프레드시트-연동
지금까지 Airlfow 조작을 CLI와 Web UI에서 진행했다. Airflow에도 API를 제공해서 이 기능을 사용할 수도 있다.
하지만 airflow의 내용은 기밀사항을 아주아주 많이 담고 있기 때문에 VPN 안에서 진행하길 바란다.
우선, API를 활성화 시켜야 한다. 방법은 2가지가 있다.
airflow.cfg
수정하기api
섹션에서 auth_backend
값을 변경한다.[API]
auth_backend=airflow.api.auth.backend.basic_auth
docker-compose.yml
수정하기airflow.cfg
를 docker-compose
단계에서 override하는 방법이다.AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
으로 설정한다.docker-compose
에서 airflow.cfg
를 override하고 싶다면 AIRFLOW__section__key
를 사용한다.이렇게 API를 활성화 시키고 나면 Airflow web ui에 가서 사용자를 추가한다. Security > List Users > +
를 눌러서 권한으로 User
을 주고 이 사용자를 통해 API를 사용한다.
curl -X GET --user "airflow:airflow" http://localhost:8080/health
curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d
'{"execution_date":"2023-05-24T00:00:00Z"}' "http://localhost:8080/api/v1/dags/dag-id/dagRuns"
is_active
는 그 DAG의 존재 유무에 대한 내용이고 is_paused
는 DAG의 활성화 유무에 대한 내용이다. 헷갈리지 말 것!curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/dags
docker-compose
를 통해 등록한 환경 변수들은 보이지 않는다.curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/variables
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/connections
airflow.cfg
내용을 모두 표출해주는 API이기 때문에 환경설정을 하나 더 해줘야한다.airflow.cfg
에서 webserver
섹션을 찾아서 expose_config = True
로 수정한다. 이 방법은 아주아주 보안에 취약하므로 되도록 사용 Xcurl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/config
이 정보들은 날라가면 안되는 정보들이기 때문에 주기적으로 Meta DB를 백업해주던지 CLI를 통해 만들어진 파일을 백업한다.
$ airflow variables export variables.json
$ airflow variables import variables.json
$ airflow connections export connections.json
$ airflow connections import connections.json
이번 숙제가 Python을 사용하여 활성화된 dag만 확인할 수 있는 코드를 짜는 것이라서 내 TIL에 올려본다,
import requests
from requests.auth import HTTPBasicAuth
url = "http://localhost:8080/api/v1/dags"
dags = requests.get(url, auth=HTTPBasicAuth('airflow', 'airflow'))
dags = dags.json()['dags']
for dag in dags:
if not dag['is_paused']:
print(dag['dag_id'])
airflow와 slack을 연결하는 것은 과정 하나하나가 너무 신기했다. 이렇게 세상이 좋아졌다니! 하면서 박수를 혼자 짝짝 쳐대며 실습을 했고, 찾아보니 on_success_callback
함수도 적용할 수 있는 것 같다. 넘겨지는 context에 어떤 내용이 들어가는지 까보고 적용해봐야겠다.
구글 스프레드와 연동하는 것은 목차에서 보고 '엥? 필요한 기술이라고?' 라는 생각을 했다. 하지만 교육을 모두 듣고 나서는 이게 굉장히 유용한 기술이라는 것을 알게됐다. 비개발자들과 개발자들의 경계를 넘나드는 엑셀... 정말 누가 만들었는지 모르겠지만 이 분은 이렇게 잘 쓰는 우리들을 보며 행복하시지 않을까? 생각이 든다.